179 lines
5.8 KiB
Python

"""MCP Bridge integration for Home Assistant."""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.const import CONF_ENTITY_ID
from homeassistant.core import HomeAssistant, ServiceCall, SupportsResponse
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers import area_registry as ar
from homeassistant.helpers.typing import ConfigType
_LOGGER = logging.getLogger(__name__)
DOMAIN = "mcp_bridge"
MCP_ACCESSIBLE_LABEL = "mcp_accessible"
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
_LOGGER.info("Setting up MCP Bridge integration")
async def get_exposed_entities(call: ServiceCall) -> dict[str, Any]:
entity_reg = er.async_get(hass)
area_reg = ar.async_get(hass)
entities: list[dict[str, Any]] = []
for state in hass.states.async_all():
entity_entry = entity_reg.async_get(state.entity_id)
if not entity_entry:
continue
if not entity_entry.options.get("conversation", {}).get(
"should_expose", False
):
continue
area_name = None
if entity_entry.area_id:
area = area_reg.async_get_area(entity_entry.area_id)
if area:
area_name = area.name
entities.append(
{
"entity_id": state.entity_id,
"state": state.state,
"friendly_name": state.attributes.get(
"friendly_name", state.entity_id
),
"area": area_name,
"domain": state.entity_id.split(".", 1)[0],
"device_class": state.attributes.get("device_class"),
"supported_features": state.attributes.get(
"supported_features", 0
),
"last_changed": state.last_changed.isoformat(),
"last_updated": state.last_updated.isoformat(),
"attributes": dict(state.attributes),
}
)
return {
"entities": entities,
"count": len(entities),
}
async def get_exposed_scripts(call: ServiceCall) -> dict[str, Any]:
entity_reg = er.async_get(hass)
scripts: list[dict[str, Any]] = []
for entity_id in hass.states.async_entity_ids("script"):
entity_entry = entity_reg.async_get(entity_id)
if not entity_entry:
continue
if MCP_ACCESSIBLE_LABEL not in entity_entry.labels:
continue
state = hass.states.get(entity_id)
if not state:
continue
# Get script name from entity_id
script_name = entity_id.split(".", 1)[1]
# Get service description which includes field definitions
fields = {}
try:
service_desc = hass.services.async_describe_service("script", script_name)
if service_desc and "fields" in service_desc:
fields = service_desc["fields"]
except Exception as e:
_LOGGER.debug(f"Could not get service description for {entity_id}: {e}")
scripts.append(
{
"entity_id": entity_id,
"friendly_name": state.attributes.get(
"friendly_name", entity_id
),
"description": state.attributes.get("description", ""),
"fields": fields,
}
)
return {
"scripts": scripts,
"count": len(scripts),
}
async def get_entity_metadata(call: ServiceCall) -> dict[str, Any]:
entity_id = call.data.get(CONF_ENTITY_ID)
if not entity_id:
return {"error": "entity_id is required"}
entity_reg = er.async_get(hass)
area_reg = ar.async_get(hass)
state = hass.states.get(entity_id)
if not state:
return {"error": f"Entity {entity_id} not found"}
entity_entry = entity_reg.async_get(entity_id)
area_name = None
if entity_entry and entity_entry.area_id:
area = area_reg.async_get_area(entity_entry.area_id)
if area:
area_name = area.name
data: dict[str, Any] = {
"entity_id": entity_id,
"state": state.state,
"friendly_name": state.attributes.get(
"friendly_name", entity_id
),
"area": area_name,
"domain": entity_id.split(".", 1)[0],
"device_class": state.attributes.get("device_class"),
"supported_features": state.attributes.get(
"supported_features", 0
),
"last_changed": state.last_changed.isoformat(),
"last_updated": state.last_updated.isoformat(),
"attributes": dict(state.attributes),
}
if entity_entry:
data["labels"] = list(entity_entry.labels)
data["is_exposed_to_conversation"] = entity_entry.options.get(
"conversation", {}
).get("should_expose", False)
return data
hass.services.async_register(
DOMAIN,
"get_exposed_entities",
get_exposed_entities,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
"get_exposed_scripts",
get_exposed_scripts,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
"get_entity_metadata",
get_entity_metadata,
supports_response=SupportsResponse.ONLY,
)
_LOGGER.info("MCP Bridge services registered successfully")
return True