172 lines
6.2 KiB
Python
172 lines
6.2 KiB
Python
"""MCP Bridge integration for Home Assistant.
|
|
|
|
This integration provides services to expose filtered entities and scripts
|
|
to MCP (Model Context Protocol) servers for AI agent control.
|
|
"""
|
|
import logging
|
|
from typing import Any
|
|
|
|
from homeassistant.core import HomeAssistant, ServiceCall
|
|
from homeassistant.helpers import entity_registry as er
|
|
from homeassistant.helpers.typing import ConfigType
|
|
from homeassistant.const import CONF_ENTITY_ID
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
DOMAIN = "mcp_bridge"
|
|
|
|
# Label to mark scripts as MCP-accessible
|
|
MCP_ACCESSIBLE_LABEL = "mcp_accessible"
|
|
|
|
|
|
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|
"""Set up the MCP Bridge integration."""
|
|
_LOGGER.info("Setting up MCP Bridge integration")
|
|
|
|
async def get_exposed_entities(call: ServiceCall) -> dict[str, Any]:
|
|
"""Get all entities exposed to voice assistants."""
|
|
entity_reg = er.async_get(hass)
|
|
|
|
exposed_entities = []
|
|
|
|
# Iterate through all entities
|
|
for state in hass.states.async_all():
|
|
entity_id = state.entity_id
|
|
entity_entry = entity_reg.async_get(entity_id)
|
|
|
|
# Check if entity is exposed to conversation (voice assistant)
|
|
if entity_entry and entity_entry.options.get("conversation", {}).get("should_expose", False):
|
|
# Get area name if available
|
|
area_name = None
|
|
if entity_entry.area_id:
|
|
area_reg = hass.helpers.area_registry.async_get(hass)
|
|
area = area_reg.async_get_area(entity_entry.area_id)
|
|
if area:
|
|
area_name = area.name
|
|
|
|
# Build entity data
|
|
entity_data = {
|
|
"entity_id": entity_id,
|
|
"state": state.state,
|
|
"friendly_name": state.attributes.get("friendly_name", entity_id),
|
|
"area": area_name,
|
|
"domain": entity_id.split(".")[0],
|
|
"device_class": state.attributes.get("device_class"),
|
|
"supported_features": state.attributes.get("supported_features", 0),
|
|
"last_changed": state.last_changed.isoformat(),
|
|
"last_updated": state.last_updated.isoformat(),
|
|
"attributes": dict(state.attributes)
|
|
}
|
|
|
|
exposed_entities.append(entity_data)
|
|
|
|
_LOGGER.debug(f"Found {len(exposed_entities)} exposed entities")
|
|
|
|
return {
|
|
"entities": exposed_entities,
|
|
"count": len(exposed_entities)
|
|
}
|
|
|
|
async def get_exposed_scripts(call: ServiceCall) -> dict[str, Any]:
|
|
"""Get all scripts marked as MCP-accessible."""
|
|
entity_reg = er.async_get(hass)
|
|
|
|
exposed_scripts = []
|
|
|
|
# Get all script entities
|
|
for entity_id in hass.states.async_entity_ids("script"):
|
|
entity_entry = entity_reg.async_get(entity_id)
|
|
|
|
# Check if script has the mcp_accessible label
|
|
if entity_entry and MCP_ACCESSIBLE_LABEL in entity_entry.labels:
|
|
state = hass.states.get(entity_id)
|
|
if not state:
|
|
continue
|
|
|
|
# Get script attributes
|
|
script_data = {
|
|
"entity_id": entity_id,
|
|
"friendly_name": state.attributes.get("friendly_name", entity_id),
|
|
"description": state.attributes.get("description", ""),
|
|
"fields": state.attributes.get("fields", {}),
|
|
}
|
|
|
|
exposed_scripts.append(script_data)
|
|
|
|
_LOGGER.debug(f"Found {len(exposed_scripts)} exposed scripts")
|
|
|
|
return {
|
|
"scripts": exposed_scripts,
|
|
"count": len(exposed_scripts)
|
|
}
|
|
|
|
async def get_entity_metadata(call: ServiceCall) -> dict[str, Any]:
|
|
"""Get detailed metadata for a specific entity."""
|
|
entity_id = call.data.get(CONF_ENTITY_ID)
|
|
|
|
if not entity_id:
|
|
_LOGGER.error("entity_id not provided")
|
|
return {"error": "entity_id required"}
|
|
|
|
entity_reg = er.async_get(hass)
|
|
entity_entry = entity_reg.async_get(entity_id)
|
|
state = hass.states.get(entity_id)
|
|
|
|
if not state:
|
|
_LOGGER.error(f"Entity {entity_id} not found")
|
|
return {"error": f"Entity {entity_id} not found"}
|
|
|
|
# Get area name if available
|
|
area_name = None
|
|
if entity_entry and entity_entry.area_id:
|
|
area_reg = hass.helpers.area_registry.async_get(hass)
|
|
area = area_reg.async_get_area(entity_entry.area_id)
|
|
if area:
|
|
area_name = area.name
|
|
|
|
metadata = {
|
|
"entity_id": entity_id,
|
|
"state": state.state,
|
|
"friendly_name": state.attributes.get("friendly_name", entity_id),
|
|
"area": area_name,
|
|
"domain": entity_id.split(".")[0],
|
|
"device_class": state.attributes.get("device_class"),
|
|
"supported_features": state.attributes.get("supported_features", 0),
|
|
"last_changed": state.last_changed.isoformat(),
|
|
"last_updated": state.last_updated.isoformat(),
|
|
"attributes": dict(state.attributes)
|
|
}
|
|
|
|
if entity_entry:
|
|
metadata["labels"] = list(entity_entry.labels)
|
|
metadata["is_exposed_to_conversation"] = entity_entry.options.get(
|
|
"conversation", {}
|
|
).get("should_expose", False)
|
|
|
|
return metadata
|
|
|
|
# Register services
|
|
hass.services.async_register(
|
|
DOMAIN,
|
|
"get_exposed_entities",
|
|
get_exposed_entities,
|
|
schema=None
|
|
)
|
|
|
|
hass.services.async_register(
|
|
DOMAIN,
|
|
"get_exposed_scripts",
|
|
get_exposed_scripts,
|
|
schema=None
|
|
)
|
|
|
|
hass.services.async_register(
|
|
DOMAIN,
|
|
"get_entity_metadata",
|
|
get_entity_metadata,
|
|
schema=None
|
|
)
|
|
|
|
_LOGGER.info("MCP Bridge services registered successfully")
|
|
|
|
return True |