From 15f5918e20c3e391dc2f305e180ec925b5121df8 Mon Sep 17 00:00:00 2001 From: Martin Date: Tue, 10 Feb 2026 16:33:16 -0500 Subject: [PATCH] [FIX] Update script to use SupportsResponse and return ServiceResponse --- custom_components/mcp_bridge/__init__.py | 199 +++++++++++++---------- 1 file changed, 114 insertions(+), 85 deletions(-) diff --git a/custom_components/mcp_bridge/__init__.py b/custom_components/mcp_bridge/__init__.py index f16784c..a50c8e0 100644 --- a/custom_components/mcp_bridge/__init__.py +++ b/custom_components/mcp_bridge/__init__.py @@ -3,13 +3,20 @@ This integration provides services to expose filtered entities and scripts to MCP (Model Context Protocol) servers for AI agent control. """ +from __future__ import annotations + import logging from typing import Any -from homeassistant.core import HomeAssistant, ServiceCall +from homeassistant.const import CONF_ENTITY_ID +from homeassistant.core import ( + HomeAssistant, + ServiceCall, + ServiceResponse, + SupportsResponse, +) from homeassistant.helpers import entity_registry as er from homeassistant.helpers.typing import ConfigType -from homeassistant.const import CONF_ENTITY_ID _LOGGER = logging.getLogger(__name__) @@ -23,150 +30,172 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the MCP Bridge integration.""" _LOGGER.info("Setting up MCP Bridge integration") - async def get_exposed_entities(call: ServiceCall) -> dict[str, Any]: - """Get all entities exposed to voice assistants.""" + async def get_exposed_entities(call: ServiceCall) -> ServiceResponse: + """Get all entities exposed to conversation / voice assistants.""" entity_reg = er.async_get(hass) - - exposed_entities = [] - - # Iterate through all entities + area_reg = hass.helpers.area_registry.async_get(hass) + + exposed_entities: list[dict[str, Any]] = [] + for state in hass.states.async_all(): entity_id = state.entity_id entity_entry = entity_reg.async_get(entity_id) - - # Check if entity is exposed to conversation (voice assistant) - if entity_entry and entity_entry.options.get("conversation", {}).get("should_expose", False): - # Get area name if available - area_name = None - if entity_entry.area_id: - area_reg = hass.helpers.area_registry.async_get(hass) - area = area_reg.async_get_area(entity_entry.area_id) - if area: - area_name = area.name - - # Build entity data - entity_data = { + + if not entity_entry: + continue + + if not entity_entry.options.get("conversation", {}).get( + "should_expose", False + ): + continue + + area_name = None + if entity_entry.area_id: + area = area_reg.async_get_area(entity_entry.area_id) + if area: + area_name = area.name + + exposed_entities.append( + { "entity_id": entity_id, "state": state.state, - "friendly_name": state.attributes.get("friendly_name", entity_id), + "friendly_name": state.attributes.get( + "friendly_name", entity_id + ), "area": area_name, "domain": entity_id.split(".")[0], "device_class": state.attributes.get("device_class"), - "supported_features": state.attributes.get("supported_features", 0), + "supported_features": state.attributes.get( + "supported_features", 0 + ), "last_changed": state.last_changed.isoformat(), "last_updated": state.last_updated.isoformat(), - "attributes": dict(state.attributes) + "attributes": dict(state.attributes), } - - exposed_entities.append(entity_data) - - _LOGGER.debug(f"Found {len(exposed_entities)} exposed entities") - - return { - "entities": exposed_entities, - "count": len(exposed_entities) - } + ) - async def get_exposed_scripts(call: ServiceCall) -> dict[str, Any]: + _LOGGER.debug("Found %s exposed entities", len(exposed_entities)) + + return ServiceResponse( + { + "entities": exposed_entities, + "count": len(exposed_entities), + } + ) + + async def get_exposed_scripts(call: ServiceCall) -> ServiceResponse: """Get all scripts marked as MCP-accessible.""" entity_reg = er.async_get(hass) - - exposed_scripts = [] - - # Get all script entities + + exposed_scripts: list[dict[str, Any]] = [] + for entity_id in hass.states.async_entity_ids("script"): entity_entry = entity_reg.async_get(entity_id) - - # Check if script has the mcp_accessible label - if entity_entry and MCP_ACCESSIBLE_LABEL in entity_entry.labels: - state = hass.states.get(entity_id) - if not state: - continue - - # Get script attributes - script_data = { + + if not entity_entry: + continue + + if MCP_ACCESSIBLE_LABEL not in entity_entry.labels: + continue + + state = hass.states.get(entity_id) + if not state: + continue + + exposed_scripts.append( + { "entity_id": entity_id, - "friendly_name": state.attributes.get("friendly_name", entity_id), + "friendly_name": state.attributes.get( + "friendly_name", entity_id + ), "description": state.attributes.get("description", ""), "fields": state.attributes.get("fields", {}), } - - exposed_scripts.append(script_data) - - _LOGGER.debug(f"Found {len(exposed_scripts)} exposed scripts") - - return { - "scripts": exposed_scripts, - "count": len(exposed_scripts) - } + ) - async def get_entity_metadata(call: ServiceCall) -> dict[str, Any]: + _LOGGER.debug("Found %s exposed scripts", len(exposed_scripts)) + + return ServiceResponse( + { + "scripts": exposed_scripts, + "count": len(exposed_scripts), + } + ) + + async def get_entity_metadata(call: ServiceCall) -> ServiceResponse: """Get detailed metadata for a specific entity.""" entity_id = call.data.get(CONF_ENTITY_ID) - + if not entity_id: - _LOGGER.error("entity_id not provided") - return {"error": "entity_id required"} - + return ServiceResponse( + {"error": "entity_id is required"}, + success=False, + ) + entity_reg = er.async_get(hass) + area_reg = hass.helpers.area_registry.async_get(hass) + entity_entry = entity_reg.async_get(entity_id) state = hass.states.get(entity_id) - + if not state: - _LOGGER.error(f"Entity {entity_id} not found") - return {"error": f"Entity {entity_id} not found"} - - # Get area name if available + return ServiceResponse( + {"error": f"Entity {entity_id} not found"}, + success=False, + ) + area_name = None if entity_entry and entity_entry.area_id: - area_reg = hass.helpers.area_registry.async_get(hass) area = area_reg.async_get_area(entity_entry.area_id) if area: area_name = area.name - - metadata = { + + metadata: dict[str, Any] = { "entity_id": entity_id, "state": state.state, - "friendly_name": state.attributes.get("friendly_name", entity_id), + "friendly_name": state.attributes.get( + "friendly_name", entity_id + ), "area": area_name, "domain": entity_id.split(".")[0], "device_class": state.attributes.get("device_class"), - "supported_features": state.attributes.get("supported_features", 0), + "supported_features": state.attributes.get( + "supported_features", 0 + ), "last_changed": state.last_changed.isoformat(), "last_updated": state.last_updated.isoformat(), - "attributes": dict(state.attributes) + "attributes": dict(state.attributes), } - + if entity_entry: metadata["labels"] = list(entity_entry.labels) metadata["is_exposed_to_conversation"] = entity_entry.options.get( "conversation", {} ).get("should_expose", False) - - return metadata - # Register services + return ServiceResponse(metadata) + hass.services.async_register( DOMAIN, "get_exposed_entities", get_exposed_entities, - schema=None + supports_response=SupportsResponse.ONLY, ) - + hass.services.async_register( DOMAIN, "get_exposed_scripts", get_exposed_scripts, - schema=None + supports_response=SupportsResponse.ONLY, ) - + hass.services.async_register( DOMAIN, "get_entity_metadata", get_entity_metadata, - schema=None + supports_response=SupportsResponse.ONLY, ) - + _LOGGER.info("MCP Bridge services registered successfully") - - return True \ No newline at end of file + + return True