172 lines
5.4 KiB
Python
172 lines
5.4 KiB
Python
"""MCP Bridge integration for Home Assistant."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from homeassistant.const import CONF_ENTITY_ID
|
|
from homeassistant.core import HomeAssistant, ServiceCall, SupportsResponse
|
|
from homeassistant.helpers import entity_registry as er
|
|
from homeassistant.helpers import area_registry as ar
|
|
from homeassistant.helpers.typing import ConfigType
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
DOMAIN = "mcp_bridge"
|
|
MCP_ACCESSIBLE_LABEL = "mcp_accessible"
|
|
|
|
|
|
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|
_LOGGER.info("Setting up MCP Bridge integration")
|
|
|
|
async def get_exposed_entities(call: ServiceCall) -> dict[str, Any]:
|
|
entity_reg = er.async_get(hass)
|
|
area_reg = ar.async_get(hass)
|
|
|
|
entities: list[dict[str, Any]] = []
|
|
|
|
for state in hass.states.async_all():
|
|
entity_entry = entity_reg.async_get(state.entity_id)
|
|
if not entity_entry:
|
|
continue
|
|
|
|
if not entity_entry.options.get("conversation", {}).get(
|
|
"should_expose", False
|
|
):
|
|
continue
|
|
|
|
area_name = None
|
|
if entity_entry.area_id:
|
|
area = area_reg.async_get_area(entity_entry.area_id)
|
|
if area:
|
|
area_name = area.name
|
|
|
|
entities.append(
|
|
{
|
|
"entity_id": state.entity_id,
|
|
"state": state.state,
|
|
"friendly_name": state.attributes.get(
|
|
"friendly_name", state.entity_id
|
|
),
|
|
"area": area_name,
|
|
"domain": state.entity_id.split(".", 1)[0],
|
|
"device_class": state.attributes.get("device_class"),
|
|
"supported_features": state.attributes.get(
|
|
"supported_features", 0
|
|
),
|
|
"last_changed": state.last_changed.isoformat(),
|
|
"last_updated": state.last_updated.isoformat(),
|
|
"attributes": dict(state.attributes),
|
|
}
|
|
)
|
|
|
|
return {
|
|
"entities": entities,
|
|
"count": len(entities),
|
|
}
|
|
|
|
async def get_exposed_scripts(call: ServiceCall) -> dict[str, Any]:
|
|
entity_reg = er.async_get(hass)
|
|
script_comp = hass.data.get("script")
|
|
|
|
scripts: list[dict[str, Any]] = []
|
|
|
|
if not script_comp:
|
|
return {"scripts": [], "count": 0}
|
|
|
|
for entity_id in hass.states.async_entity_ids("script"):
|
|
entity_entry = entity_reg.async_get(entity_id)
|
|
if not entity_entry:
|
|
continue
|
|
|
|
if MCP_ACCESSIBLE_LABEL not in entity_entry.labels:
|
|
continue
|
|
|
|
script_obj = script_comp.scripts.get(entity_id)
|
|
if not script_obj:
|
|
continue
|
|
|
|
scripts.append(
|
|
{
|
|
"entity_id": entity_id,
|
|
"friendly_name": script_obj.name or entity_id,
|
|
"description": script_obj.description or "",
|
|
"fields": script_obj.fields or {},
|
|
"mode": script_obj.mode,
|
|
}
|
|
)
|
|
|
|
return {
|
|
"scripts": scripts,
|
|
"count": len(scripts),
|
|
}
|
|
|
|
|
|
async def get_entity_metadata(call: ServiceCall) -> dict[str, Any]:
|
|
entity_id = call.data.get(CONF_ENTITY_ID)
|
|
if not entity_id:
|
|
return {"error": "entity_id is required"}
|
|
|
|
entity_reg = er.async_get(hass)
|
|
area_reg = ar.async_get(hass)
|
|
|
|
state = hass.states.get(entity_id)
|
|
if not state:
|
|
return {"error": f"Entity {entity_id} not found"}
|
|
|
|
entity_entry = entity_reg.async_get(entity_id)
|
|
|
|
area_name = None
|
|
if entity_entry and entity_entry.area_id:
|
|
area = area_reg.async_get_area(entity_entry.area_id)
|
|
if area:
|
|
area_name = area.name
|
|
|
|
data: dict[str, Any] = {
|
|
"entity_id": entity_id,
|
|
"state": state.state,
|
|
"friendly_name": state.attributes.get(
|
|
"friendly_name", entity_id
|
|
),
|
|
"area": area_name,
|
|
"domain": entity_id.split(".", 1)[0],
|
|
"device_class": state.attributes.get("device_class"),
|
|
"supported_features": state.attributes.get(
|
|
"supported_features", 0
|
|
),
|
|
"last_changed": state.last_changed.isoformat(),
|
|
"last_updated": state.last_updated.isoformat(),
|
|
"attributes": dict(state.attributes),
|
|
}
|
|
|
|
if entity_entry:
|
|
data["labels"] = list(entity_entry.labels)
|
|
data["is_exposed_to_conversation"] = entity_entry.options.get(
|
|
"conversation", {}
|
|
).get("should_expose", False)
|
|
|
|
return data
|
|
|
|
hass.services.async_register(
|
|
DOMAIN,
|
|
"get_exposed_entities",
|
|
get_exposed_entities,
|
|
supports_response=SupportsResponse.ONLY,
|
|
)
|
|
|
|
hass.services.async_register(
|
|
DOMAIN,
|
|
"get_exposed_scripts",
|
|
get_exposed_scripts,
|
|
supports_response=SupportsResponse.ONLY,
|
|
)
|
|
|
|
hass.services.async_register(
|
|
DOMAIN,
|
|
"get_entity_metadata",
|
|
get_entity_metadata,
|
|
supports_response=SupportsResponse.ONLY,
|
|
)
|
|
|
|
_LOGGER.info("MCP Bridge services registered successfully")
|
|
return True |