Compare commits

..

10 Commits

Author SHA1 Message Date
Martin
adbf33d3c7 Maybe this one will be the one 2026-02-10 22:51:07 -05:00
Martin
a3927ba66c [FIX] Third time the charm 2026-02-10 21:57:07 -05:00
Martin
2120685cdf [FIX] New tentative to retrieve the fields 2026-02-10 21:47:29 -05:00
Martin
e17787fa0f [FIX] Fields were not populated for scripts 2026-02-10 21:40:18 -05:00
Martin
76bd1582e7 [FIX] Type Alias error 2026-02-10 16:56:48 -05:00
Martin
8a3025081a [FIX] Removed invalid helpers usage 2026-02-10 16:51:19 -05:00
Martin
15f5918e20 [FIX] Update script to use SupportsResponse and return ServiceResponse 2026-02-10 16:33:16 -05:00
Martin
09a500904c [MOD] Fix issue with init script 2026-02-10 16:14:11 -05:00
Martin
a323b1e9d0 [ADD] Debug files 2026-02-10 15:43:00 -05:00
Martin
7b742858db [ADD] ha_shell_command 2026-02-10 15:24:29 -05:00
4 changed files with 333 additions and 96 deletions

View File

@@ -1,171 +1,173 @@
"""MCP Bridge integration for Home Assistant. """MCP Bridge integration for Home Assistant."""
from __future__ import annotations
This integration provides services to expose filtered entities and scripts
to MCP (Model Context Protocol) servers for AI agent control.
"""
import logging import logging
from typing import Any from typing import Any
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.typing import ConfigType
from homeassistant.const import CONF_ENTITY_ID from homeassistant.const import CONF_ENTITY_ID
from homeassistant.core import HomeAssistant, ServiceCall, SupportsResponse
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers import area_registry as ar
from homeassistant.helpers.typing import ConfigType
from homeassistant.components.script import async_get_script_fields
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
DOMAIN = "mcp_bridge" DOMAIN = "mcp_bridge"
# Label to mark scripts as MCP-accessible
MCP_ACCESSIBLE_LABEL = "mcp_accessible" MCP_ACCESSIBLE_LABEL = "mcp_accessible"
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the MCP Bridge integration."""
_LOGGER.info("Setting up MCP Bridge integration") _LOGGER.info("Setting up MCP Bridge integration")
async def get_exposed_entities(call: ServiceCall) -> dict[str, Any]: async def get_exposed_entities(call: ServiceCall) -> dict[str, Any]:
"""Get all entities exposed to voice assistants."""
entity_reg = er.async_get(hass) entity_reg = er.async_get(hass)
area_reg = ar.async_get(hass)
exposed_entities = []
entities: list[dict[str, Any]] = []
# Iterate through all entities
for entity_id, state in hass.states.async_all(): for state in hass.states.async_all():
entity_entry = entity_reg.async_get(entity_id) entity_entry = entity_reg.async_get(state.entity_id)
if not entity_entry:
# Check if entity is exposed to conversation (voice assistant) continue
if entity_entry and entity_entry.options.get("conversation", {}).get("should_expose", False):
# Get area name if available if not entity_entry.options.get("conversation", {}).get(
area_name = None "should_expose", False
if entity_entry.area_id: ):
area_reg = hass.helpers.area_registry.async_get(hass) continue
area = area_reg.async_get_area(entity_entry.area_id)
if area: area_name = None
area_name = area.name if entity_entry.area_id:
area = area_reg.async_get_area(entity_entry.area_id)
# Build entity data if area:
entity_data = { area_name = area.name
"entity_id": entity_id,
entities.append(
{
"entity_id": state.entity_id,
"state": state.state, "state": state.state,
"friendly_name": state.attributes.get("friendly_name", entity_id), "friendly_name": state.attributes.get(
"friendly_name", state.entity_id
),
"area": area_name, "area": area_name,
"domain": entity_id.split(".")[0], "domain": state.entity_id.split(".", 1)[0],
"device_class": state.attributes.get("device_class"), "device_class": state.attributes.get("device_class"),
"supported_features": state.attributes.get("supported_features", 0), "supported_features": state.attributes.get(
"supported_features", 0
),
"last_changed": state.last_changed.isoformat(), "last_changed": state.last_changed.isoformat(),
"last_updated": state.last_updated.isoformat(), "last_updated": state.last_updated.isoformat(),
"attributes": dict(state.attributes) "attributes": dict(state.attributes),
} }
)
exposed_entities.append(entity_data)
_LOGGER.debug(f"Found {len(exposed_entities)} exposed entities")
return { return {
"entities": exposed_entities, "entities": entities,
"count": len(exposed_entities) "count": len(entities),
} }
async def get_exposed_scripts(call: ServiceCall) -> dict[str, Any]: async def get_exposed_scripts(call: ServiceCall) -> dict[str, Any]:
"""Get all scripts marked as MCP-accessible."""
entity_reg = er.async_get(hass) entity_reg = er.async_get(hass)
scripts: list[dict[str, Any]] = []
exposed_scripts = []
# Get all script entities
for entity_id in hass.states.async_entity_ids("script"): for entity_id in hass.states.async_entity_ids("script"):
entity_entry = entity_reg.async_get(entity_id) entity_entry = entity_reg.async_get(entity_id)
if not entity_entry:
# Check if script has the mcp_accessible label continue
if entity_entry and MCP_ACCESSIBLE_LABEL in entity_entry.labels:
state = hass.states.get(entity_id) if MCP_ACCESSIBLE_LABEL not in entity_entry.labels:
if not state: continue
continue
fields = await async_get_script_fields(hass, entity_id)
# Get script attributes
script_data = { state = hass.states.get(entity_id)
scripts.append(
{
"entity_id": entity_id, "entity_id": entity_id,
"friendly_name": state.attributes.get("friendly_name", entity_id), "friendly_name": state.attributes.get(
"description": state.attributes.get("description", ""), "friendly_name", entity_id
"fields": state.attributes.get("fields", {}), )
if state
else entity_id,
"description": state.attributes.get("description", "")
if state
else "",
"fields": fields or {},
} }
)
exposed_scripts.append(script_data)
_LOGGER.debug(f"Found {len(exposed_scripts)} exposed scripts")
return { return {
"scripts": exposed_scripts, "scripts": scripts,
"count": len(exposed_scripts) "count": len(scripts),
} }
async def get_entity_metadata(call: ServiceCall) -> dict[str, Any]: async def get_entity_metadata(call: ServiceCall) -> dict[str, Any]:
"""Get detailed metadata for a specific entity."""
entity_id = call.data.get(CONF_ENTITY_ID) entity_id = call.data.get(CONF_ENTITY_ID)
if not entity_id: if not entity_id:
_LOGGER.error("entity_id not provided") return {"error": "entity_id is required"}
return {"error": "entity_id required"}
entity_reg = er.async_get(hass) entity_reg = er.async_get(hass)
entity_entry = entity_reg.async_get(entity_id) area_reg = ar.async_get(hass)
state = hass.states.get(entity_id) state = hass.states.get(entity_id)
if not state: if not state:
_LOGGER.error(f"Entity {entity_id} not found")
return {"error": f"Entity {entity_id} not found"} return {"error": f"Entity {entity_id} not found"}
# Get area name if available entity_entry = entity_reg.async_get(entity_id)
area_name = None area_name = None
if entity_entry and entity_entry.area_id: if entity_entry and entity_entry.area_id:
area_reg = hass.helpers.area_registry.async_get(hass)
area = area_reg.async_get_area(entity_entry.area_id) area = area_reg.async_get_area(entity_entry.area_id)
if area: if area:
area_name = area.name area_name = area.name
metadata = { data: dict[str, Any] = {
"entity_id": entity_id, "entity_id": entity_id,
"state": state.state, "state": state.state,
"friendly_name": state.attributes.get("friendly_name", entity_id), "friendly_name": state.attributes.get(
"friendly_name", entity_id
),
"area": area_name, "area": area_name,
"domain": entity_id.split(".")[0], "domain": entity_id.split(".", 1)[0],
"device_class": state.attributes.get("device_class"), "device_class": state.attributes.get("device_class"),
"supported_features": state.attributes.get("supported_features", 0), "supported_features": state.attributes.get(
"supported_features", 0
),
"last_changed": state.last_changed.isoformat(), "last_changed": state.last_changed.isoformat(),
"last_updated": state.last_updated.isoformat(), "last_updated": state.last_updated.isoformat(),
"attributes": dict(state.attributes) "attributes": dict(state.attributes),
} }
if entity_entry: if entity_entry:
metadata["labels"] = list(entity_entry.labels) data["labels"] = list(entity_entry.labels)
metadata["is_exposed_to_conversation"] = entity_entry.options.get( data["is_exposed_to_conversation"] = entity_entry.options.get(
"conversation", {} "conversation", {}
).get("should_expose", False) ).get("should_expose", False)
return metadata
# Register services return data
hass.services.async_register( hass.services.async_register(
DOMAIN, DOMAIN,
"get_exposed_entities", "get_exposed_entities",
get_exposed_entities, get_exposed_entities,
schema=None supports_response=SupportsResponse.ONLY,
) )
hass.services.async_register( hass.services.async_register(
DOMAIN, DOMAIN,
"get_exposed_scripts", "get_exposed_scripts",
get_exposed_scripts, get_exposed_scripts,
schema=None supports_response=SupportsResponse.ONLY,
) )
hass.services.async_register( hass.services.async_register(
DOMAIN, DOMAIN,
"get_entity_metadata", "get_entity_metadata",
get_entity_metadata, get_entity_metadata,
schema=None supports_response=SupportsResponse.ONLY,
) )
_LOGGER.info("MCP Bridge services registered successfully") _LOGGER.info("MCP Bridge services registered successfully")
return True return True

130
diagnose_install.sh Normal file
View File

@@ -0,0 +1,130 @@
#!/bin/bash
# MCP Bridge Installation Diagnostic Script
# Run this manually in HA terminal to see what's happening
echo "=== MCP Bridge Installation Diagnostic ==="
echo ""
# 1. Check network connectivity
echo "1. Testing Gitea connectivity..."
GITEA_URL="${GITEA_URL:-https://www.git.quarantinedstudio.com}"
REPO="${REPO:-mvezina/homeassistant-mcp-bridge}"
echo " Attempting to reach: $GITEA_URL/$REPO"
if curl -I "$GITEA_URL/$REPO" 2>&1 | head -n 1; then
echo " ✓ Can reach Gitea server"
else
echo " ✗ Cannot reach Gitea server"
echo " Please check:"
echo " - Is Gitea running?"
echo " - Is the URL correct?"
echo " - Can HA reach your Gitea server?"
fi
echo ""
# 2. Check if curl exists
echo "2. Checking for curl..."
if command -v curl &> /dev/null; then
echo " ✓ curl is available: $(which curl)"
curl --version | head -n 1
else
echo " ✗ curl is NOT available"
echo " Checking for wget..."
if command -v wget &> /dev/null; then
echo " ✓ wget is available: $(which wget)"
else
echo " ✗ Neither curl nor wget available!"
fi
fi
echo ""
# 3. Check config directory
echo "3. Checking config directory..."
CONFIG_DIR="/config"
if [ -d "$CONFIG_DIR" ]; then
echo " ✓ /config exists"
else
echo " ✗ /config does not exist, trying alternate..."
CONFIG_DIR="${HOME}/.homeassistant"
if [ -d "$CONFIG_DIR" ]; then
echo " ✓ Using $CONFIG_DIR"
else
echo " ✗ Cannot find config directory!"
fi
fi
echo ""
# 4. Check/create custom_components directory
echo "4. Checking custom_components directory..."
CUSTOM_DIR="$CONFIG_DIR/custom_components"
if [ -d "$CUSTOM_DIR" ]; then
echo "$CUSTOM_DIR exists"
else
echo " ! $CUSTOM_DIR does not exist, creating..."
mkdir -p "$CUSTOM_DIR" && echo " ✓ Created successfully" || echo " ✗ Failed to create"
fi
echo ""
# 5. Check write permissions
echo "5. Checking write permissions..."
TEST_FILE="$CUSTOM_DIR/test_write_$$"
if touch "$TEST_FILE" 2>/dev/null; then
echo " ✓ Can write to $CUSTOM_DIR"
rm "$TEST_FILE"
else
echo " ✗ Cannot write to $CUSTOM_DIR"
echo " Permission issue!"
fi
echo ""
# 6. Try downloading a test file
echo "6. Testing file download..."
BRANCH="${BRANCH:-main}"
TEST_URL="$GITEA_URL/$REPO/raw/branch/$BRANCH/custom_components/mcp_bridge/manifest.json"
echo " URL: $TEST_URL"
TEMP_FILE="/tmp/mcp_test_$$"
if curl -fsSL "$TEST_URL" -o "$TEMP_FILE" 2>/dev/null; then
echo " ✓ Successfully downloaded test file"
echo " File size: $(wc -c < "$TEMP_FILE") bytes"
echo " First line: $(head -n 1 "$TEMP_FILE")"
rm "$TEMP_FILE"
else
echo " ✗ Failed to download test file"
echo " Possible issues:"
echo " - Wrong repository name or path"
echo " - Wrong branch name"
echo " - File doesn't exist at that path"
echo " - Network/firewall blocking"
fi
echo ""
# 7. Check existing installation
echo "7. Checking for existing installation..."
INSTALL_DIR="$CONFIG_DIR/custom_components/mcp_bridge"
if [ -d "$INSTALL_DIR" ]; then
echo " ✓ Installation directory exists: $INSTALL_DIR"
echo " Files:"
ls -lh "$INSTALL_DIR" 2>/dev/null || echo " (empty or cannot list)"
if [ -f "$INSTALL_DIR/manifest.json" ]; then
VERSION=$(grep -oP '"version":\s*"\K[^"]+' "$INSTALL_DIR/manifest.json" 2>/dev/null || echo "unknown")
echo " Current version: $VERSION"
fi
else
echo " ! Installation directory does not exist yet"
fi
echo ""
# 8. Provide installation command
echo "8. Suggested installation command:"
echo ""
echo " export GITEA_URL=\"$GITEA_URL\""
echo " export REPO=\"$REPO\""
echo " export BRANCH=\"$BRANCH\""
echo " curl -fsSL \"\$GITEA_URL/\$REPO/raw/branch/\$BRANCH/quick_install.sh\" | bash"
echo ""
echo "=== Diagnostic Complete ==="
echo ""
echo "If all checks passed, run the installation command above."
echo "If checks failed, fix the issues indicated and try again."

View File

@@ -0,0 +1,36 @@
# Add this to your configuration.yaml
shell_command:
# Update MCP Bridge integration from Gitea
update_mcp_bridge: >
curl -fsSL https://www.git.quarantinedstudio.com/mvezina/homeassistant-mcp-bridge/raw/branch/main/quick_install.sh |
GITEA_URL=https://www.git.quarantinedstudio.com
REPO=mvezina/homeassistant-mcp-bridge
bash
# Then create an automation or script to call it:
script:
update_mcp_bridge:
alias: "Update MCP Bridge Integration"
sequence:
- service: shell_command.update_mcp_bridge
- delay:
seconds: 5
- service: system_log.write
data:
message: "MCP Bridge updated. Please restart Home Assistant."
level: warning
- service: persistent_notification.create
data:
title: "MCP Bridge Updated"
message: "The MCP Bridge integration has been updated. Please restart Home Assistant for changes to take effect."
# Optional: Add a button to your dashboard
button:
- type: button
name: Update MCP Bridge
icon: mdi:download
tap_action:
action: call-service
service: script.update_mcp_bridge

View File

@@ -0,0 +1,69 @@
#!/bin/bash
# MCP Bridge Quick Installer with Logging
# Logs to /config/mcp_bridge_install.log
# Configuration
GITEA_URL="${GITEA_URL:-http://your-gitea.local:3000}"
REPO="${REPO:-username/homeassistant-mcp-bridge}"
BRANCH="${BRANCH:-main}"
BASE_URL="$GITEA_URL/$REPO/raw/branch/$BRANCH/custom_components/mcp_bridge"
# Logging
LOG_FILE="/config/mcp_bridge_install.log"
exec 1> >(tee -a "$LOG_FILE")
exec 2>&1
echo "=== MCP Bridge Installation Started at $(date) ==="
echo "Configuration:"
echo " GITEA_URL: $GITEA_URL"
echo " REPO: $REPO"
echo " BRANCH: $BRANCH"
echo " BASE_URL: $BASE_URL"
echo ""
# Determine config directory
CONFIG_DIR="${CONFIG_DIR:-/config}"
if [ ! -d "$CONFIG_DIR" ]; then
CONFIG_DIR="${HOME}/.homeassistant"
echo "Using alternate config dir: $CONFIG_DIR"
fi
DEST="$CONFIG_DIR/custom_components/mcp_bridge"
echo "Installation directory: $DEST"
echo ""
# Create directory
echo "Creating directory..."
mkdir -p "$DEST" || {
echo "ERROR: Failed to create directory $DEST"
exit 1
}
echo "Directory created: $DEST"
echo ""
# Download files
echo "Downloading files..."
cd "$DEST" || exit 1
for file in __init__.py manifest.json services.yaml; do
echo " Downloading $file from $BASE_URL/$file"
if curl -fsSL "$BASE_URL/$file" -o "$file"; then
echo "$file downloaded successfully ($(wc -c < "$file") bytes)"
else
echo " ✗ FAILED to download $file"
echo " URL attempted: $BASE_URL/$file"
exit 1
fi
done
echo ""
echo "=== Installation Summary ==="
echo "Files installed in: $DEST"
ls -lh "$DEST"
echo ""
echo "✅ MCP Bridge installed successfully!"
echo "⚠️ IMPORTANT: Restart Home Assistant to activate"
echo ""
echo "Installation completed at $(date)"
echo "=== End of Installation Log ==="