188 lines
6.3 KiB
Python

"""MCP Bridge integration for Home Assistant."""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.const import CONF_ENTITY_ID
from homeassistant.core import HomeAssistant, ServiceCall, SupportsResponse
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers import area_registry as ar
from homeassistant.helpers.typing import ConfigType
_LOGGER = logging.getLogger(__name__)
DOMAIN = "mcp_bridge"
MCP_ACCESSIBLE_LABEL = "mcp_accessible"
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
_LOGGER.info("Setting up MCP Bridge integration")
async def get_exposed_entities(call: ServiceCall) -> dict[str, Any]:
entity_reg = er.async_get(hass)
area_reg = ar.async_get(hass)
entities: list[dict[str, Any]] = []
for state in hass.states.async_all():
entity_entry = entity_reg.async_get(state.entity_id)
if not entity_entry:
continue
if not entity_entry.options.get("conversation", {}).get(
"should_expose", False
):
continue
area_name = None
if entity_entry.area_id:
area = area_reg.async_get_area(entity_entry.area_id)
if area:
area_name = area.name
entities.append(
{
"entity_id": state.entity_id,
"state": state.state,
"friendly_name": state.attributes.get(
"friendly_name", state.entity_id
),
"area": area_name,
"domain": state.entity_id.split(".", 1)[0],
"device_class": state.attributes.get("device_class"),
"supported_features": state.attributes.get(
"supported_features", 0
),
"last_changed": state.last_changed.isoformat(),
"last_updated": state.last_updated.isoformat(),
"attributes": dict(state.attributes),
}
)
return {
"entities": entities,
"count": len(entities),
}
async def get_exposed_scripts(call: ServiceCall) -> dict[str, Any]:
entity_reg = er.async_get(hass)
scripts: list[dict[str, Any]] = []
for entity_id in hass.states.async_entity_ids("script"):
entity_entry = entity_reg.async_get(entity_id)
if not entity_entry:
continue
if MCP_ACCESSIBLE_LABEL not in entity_entry.labels:
continue
state = hass.states.get(entity_id)
if not state:
continue
# Get script name from entity_id (e.g., "script.my_script" -> "my_script")
script_name = entity_id.split(".", 1)[1]
# Get service definition from services registry to access fields
service_data = hass.services.async_services().get("script", {}).get(script_name)
fields = {}
if service_data and hasattr(service_data, "fields"):
# Convert service fields to our format
fields = {
field_name: {
"name": field_info.get("name", field_name),
"description": field_info.get("description", ""),
"required": field_info.get("required", False),
"example": field_info.get("example"),
"default": field_info.get("default"),
"selector": field_info.get("selector"),
}
for field_name, field_info in service_data.fields.items()
}
scripts.append(
{
"entity_id": entity_id,
"friendly_name": state.attributes.get(
"friendly_name", entity_id
),
"description": state.attributes.get("description", ""),
"fields": fields,
}
)
return {
"scripts": scripts,
"count": len(scripts),
}
async def get_entity_metadata(call: ServiceCall) -> dict[str, Any]:
entity_id = call.data.get(CONF_ENTITY_ID)
if not entity_id:
return {"error": "entity_id is required"}
entity_reg = er.async_get(hass)
area_reg = ar.async_get(hass)
state = hass.states.get(entity_id)
if not state:
return {"error": f"Entity {entity_id} not found"}
entity_entry = entity_reg.async_get(entity_id)
area_name = None
if entity_entry and entity_entry.area_id:
area = area_reg.async_get_area(entity_entry.area_id)
if area:
area_name = area.name
data: dict[str, Any] = {
"entity_id": entity_id,
"state": state.state,
"friendly_name": state.attributes.get(
"friendly_name", entity_id
),
"area": area_name,
"domain": entity_id.split(".", 1)[0],
"device_class": state.attributes.get("device_class"),
"supported_features": state.attributes.get(
"supported_features", 0
),
"last_changed": state.last_changed.isoformat(),
"last_updated": state.last_updated.isoformat(),
"attributes": dict(state.attributes),
}
if entity_entry:
data["labels"] = list(entity_entry.labels)
data["is_exposed_to_conversation"] = entity_entry.options.get(
"conversation", {}
).get("should_expose", False)
return data
hass.services.async_register(
DOMAIN,
"get_exposed_entities",
get_exposed_entities,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
"get_exposed_scripts",
get_exposed_scripts,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
"get_entity_metadata",
get_entity_metadata,
supports_response=SupportsResponse.ONLY,
)
_LOGGER.info("MCP Bridge services registered successfully")
return True