[FIX] Update script to use SupportsResponse and return ServiceResponse

This commit is contained in:
Martin 2026-02-10 16:33:16 -05:00
parent 09a500904c
commit 15f5918e20

View File

@ -3,13 +3,20 @@
This integration provides services to expose filtered entities and scripts
to MCP (Model Context Protocol) servers for AI agent control.
"""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.const import CONF_ENTITY_ID
from homeassistant.core import (
HomeAssistant,
ServiceCall,
ServiceResponse,
SupportsResponse,
)
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.typing import ConfigType
from homeassistant.const import CONF_ENTITY_ID
_LOGGER = logging.getLogger(__name__)
@ -23,150 +30,172 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the MCP Bridge integration."""
_LOGGER.info("Setting up MCP Bridge integration")
async def get_exposed_entities(call: ServiceCall) -> dict[str, Any]:
"""Get all entities exposed to voice assistants."""
async def get_exposed_entities(call: ServiceCall) -> ServiceResponse:
"""Get all entities exposed to conversation / voice assistants."""
entity_reg = er.async_get(hass)
exposed_entities = []
# Iterate through all entities
area_reg = hass.helpers.area_registry.async_get(hass)
exposed_entities: list[dict[str, Any]] = []
for state in hass.states.async_all():
entity_id = state.entity_id
entity_entry = entity_reg.async_get(entity_id)
# Check if entity is exposed to conversation (voice assistant)
if entity_entry and entity_entry.options.get("conversation", {}).get("should_expose", False):
# Get area name if available
area_name = None
if entity_entry.area_id:
area_reg = hass.helpers.area_registry.async_get(hass)
area = area_reg.async_get_area(entity_entry.area_id)
if area:
area_name = area.name
# Build entity data
entity_data = {
if not entity_entry:
continue
if not entity_entry.options.get("conversation", {}).get(
"should_expose", False
):
continue
area_name = None
if entity_entry.area_id:
area = area_reg.async_get_area(entity_entry.area_id)
if area:
area_name = area.name
exposed_entities.append(
{
"entity_id": entity_id,
"state": state.state,
"friendly_name": state.attributes.get("friendly_name", entity_id),
"friendly_name": state.attributes.get(
"friendly_name", entity_id
),
"area": area_name,
"domain": entity_id.split(".")[0],
"device_class": state.attributes.get("device_class"),
"supported_features": state.attributes.get("supported_features", 0),
"supported_features": state.attributes.get(
"supported_features", 0
),
"last_changed": state.last_changed.isoformat(),
"last_updated": state.last_updated.isoformat(),
"attributes": dict(state.attributes)
"attributes": dict(state.attributes),
}
exposed_entities.append(entity_data)
_LOGGER.debug(f"Found {len(exposed_entities)} exposed entities")
return {
"entities": exposed_entities,
"count": len(exposed_entities)
}
)
async def get_exposed_scripts(call: ServiceCall) -> dict[str, Any]:
_LOGGER.debug("Found %s exposed entities", len(exposed_entities))
return ServiceResponse(
{
"entities": exposed_entities,
"count": len(exposed_entities),
}
)
async def get_exposed_scripts(call: ServiceCall) -> ServiceResponse:
"""Get all scripts marked as MCP-accessible."""
entity_reg = er.async_get(hass)
exposed_scripts = []
# Get all script entities
exposed_scripts: list[dict[str, Any]] = []
for entity_id in hass.states.async_entity_ids("script"):
entity_entry = entity_reg.async_get(entity_id)
# Check if script has the mcp_accessible label
if entity_entry and MCP_ACCESSIBLE_LABEL in entity_entry.labels:
state = hass.states.get(entity_id)
if not state:
continue
# Get script attributes
script_data = {
if not entity_entry:
continue
if MCP_ACCESSIBLE_LABEL not in entity_entry.labels:
continue
state = hass.states.get(entity_id)
if not state:
continue
exposed_scripts.append(
{
"entity_id": entity_id,
"friendly_name": state.attributes.get("friendly_name", entity_id),
"friendly_name": state.attributes.get(
"friendly_name", entity_id
),
"description": state.attributes.get("description", ""),
"fields": state.attributes.get("fields", {}),
}
exposed_scripts.append(script_data)
_LOGGER.debug(f"Found {len(exposed_scripts)} exposed scripts")
return {
"scripts": exposed_scripts,
"count": len(exposed_scripts)
}
)
async def get_entity_metadata(call: ServiceCall) -> dict[str, Any]:
_LOGGER.debug("Found %s exposed scripts", len(exposed_scripts))
return ServiceResponse(
{
"scripts": exposed_scripts,
"count": len(exposed_scripts),
}
)
async def get_entity_metadata(call: ServiceCall) -> ServiceResponse:
"""Get detailed metadata for a specific entity."""
entity_id = call.data.get(CONF_ENTITY_ID)
if not entity_id:
_LOGGER.error("entity_id not provided")
return {"error": "entity_id required"}
return ServiceResponse(
{"error": "entity_id is required"},
success=False,
)
entity_reg = er.async_get(hass)
area_reg = hass.helpers.area_registry.async_get(hass)
entity_entry = entity_reg.async_get(entity_id)
state = hass.states.get(entity_id)
if not state:
_LOGGER.error(f"Entity {entity_id} not found")
return {"error": f"Entity {entity_id} not found"}
# Get area name if available
return ServiceResponse(
{"error": f"Entity {entity_id} not found"},
success=False,
)
area_name = None
if entity_entry and entity_entry.area_id:
area_reg = hass.helpers.area_registry.async_get(hass)
area = area_reg.async_get_area(entity_entry.area_id)
if area:
area_name = area.name
metadata = {
metadata: dict[str, Any] = {
"entity_id": entity_id,
"state": state.state,
"friendly_name": state.attributes.get("friendly_name", entity_id),
"friendly_name": state.attributes.get(
"friendly_name", entity_id
),
"area": area_name,
"domain": entity_id.split(".")[0],
"device_class": state.attributes.get("device_class"),
"supported_features": state.attributes.get("supported_features", 0),
"supported_features": state.attributes.get(
"supported_features", 0
),
"last_changed": state.last_changed.isoformat(),
"last_updated": state.last_updated.isoformat(),
"attributes": dict(state.attributes)
"attributes": dict(state.attributes),
}
if entity_entry:
metadata["labels"] = list(entity_entry.labels)
metadata["is_exposed_to_conversation"] = entity_entry.options.get(
"conversation", {}
).get("should_expose", False)
return metadata
# Register services
return ServiceResponse(metadata)
hass.services.async_register(
DOMAIN,
"get_exposed_entities",
get_exposed_entities,
schema=None
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
"get_exposed_scripts",
get_exposed_scripts,
schema=None
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
"get_entity_metadata",
get_entity_metadata,
schema=None
supports_response=SupportsResponse.ONLY,
)
_LOGGER.info("MCP Bridge services registered successfully")
return True
return True