#!/usr/bin/env python3 """ Somfy Tahoma Local API to MySQL Database Script Liest alle Aktoren und Sensoren aus der Tahoma Box und speichert sie in MySQL """ import requests import pymysql from pymysql import Error import json import logging from typing import List, Dict, Optional import urllib3 import socket import configparser import os from mqtt_discovery import HomeAssistantDiscovery, MQTTDeviceConverter # SSL-Warnungen deaktivieren (Tahoma verwendet selbst-signierte Zertifikate) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class Config: """Lädt und verwaltet die Konfiguration aus config.ini""" def __init__(self, config_file: str = 'config.ini'): """ Lädt die Konfiguration Args: config_file: Name der Konfigurationsdatei (wird im Script-Verzeichnis gesucht) """ # Verzeichnis des Scripts ermitteln script_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(script_dir, config_file) if not os.path.exists(config_path): raise FileNotFoundError( f"Konfigurationsdatei '{config_file}' nicht gefunden!\n" f"Erwartet in: {script_dir}\n" f"Bitte erstellen Sie die Datei anhand der Vorlage config.ini.example" ) self.config = configparser.ConfigParser() self.config.read(config_path, encoding='utf-8') logger.info(f"Konfiguration geladen von: {config_path}") # Konfiguration validieren self._validate() def _validate(self): """Validiert die Konfiguration""" required_sections = ['tahoma', 'database', 'wled', 'options'] for section in required_sections: if not self.config.has_section(section): raise ValueError(f"Erforderliche Sektion '[{section}]' fehlt in config.ini") def _get_bool(self, section: str, key: str, fallback: bool = False) -> bool: """Hilfsmethode zum Lesen von Boolean-Werten""" value = self.config.get(section, key, fallback=str(fallback)).strip().lower() return value in ('true', '1', 'yes', 'on') def _get_int(self, section: str, key: str, fallback: int = 0) -> int: """Hilfsmethode zum Lesen von Integer-Werten""" try: return self.config.getint(section, key, fallback=fallback) except ValueError: return fallback def _get_list(self, section: str, key: str) -> List[str]: """Hilfsmethode zum Lesen von Listen (komma-getrennt)""" value = self.config.get(section, key, fallback='').strip() if not value: return [] return [item.strip() for item in value.split(',') if item.strip()] # Tahoma Konfiguration @property def tahoma_ip(self) -> str: return self.config.get('tahoma', 'ip').strip() @property def tahoma_token(self) -> str: return self.config.get('tahoma', 'token').strip() @property def tahoma_timeout(self) -> int: if self.config.has_section('advanced'): return self._get_int('advanced', 'tahoma_timeout', 10) return 10 # Datenbank Konfiguration @property def db_host(self) -> str: return self.config.get('database', 'host').strip() @property def db_port(self) -> int: return self._get_int('database', 'port', 3306) @property def db_name(self) -> str: return self.config.get('database', 'database').strip() @property def db_user(self) -> str: return self.config.get('database', 'user').strip() @property def db_password(self) -> str: return self.config.get('database', 'password').strip() # WLED Konfiguration @property def wled_enable(self) -> bool: return self._get_bool('wled', 'enable', True) @property def wled_discovery_timeout(self) -> int: return self._get_int('wled', 'discovery_timeout', 5) @property def wled_manual_ips(self) -> List[str]: return self._get_list('wled', 'manual_ips') @property def wled_scan_network(self) -> Optional[str]: value = self.config.get('wled', 'scan_network', fallback='').strip() return value if value else None @property def wled_timeout(self) -> int: if self.config.has_section('advanced'): return self._get_int('advanced', 'wled_timeout', 2) return 2 # MQTT Konfiguration @property def mqtt_enable(self) -> bool: if not self.config.has_section('mqtt'): return False return self._get_bool('mqtt', 'enable', False) @property def mqtt_broker(self) -> str: if not self.config.has_section('mqtt'): return 'localhost' return self.config.get('mqtt', 'broker', fallback='localhost').strip() @property def mqtt_port(self) -> int: if not self.config.has_section('mqtt'): return 1883 return self._get_int('mqtt', 'port', 1883) @property def mqtt_username(self) -> Optional[str]: if not self.config.has_section('mqtt'): return None value = self.config.get('mqtt', 'username', fallback='').strip() return value if value else None @property def mqtt_password(self) -> Optional[str]: if not self.config.has_section('mqtt'): return None value = self.config.get('mqtt', 'password', fallback='').strip() return value if value else None @property def mqtt_discovery_prefix(self) -> str: if not self.config.has_section('mqtt'): return 'homeassistant' return self.config.get('mqtt', 'discovery_prefix', fallback='homeassistant').strip() @property def mqtt_discovery_timeout(self) -> int: if not self.config.has_section('mqtt'): return 10 return self._get_int('mqtt', 'discovery_timeout', 10) # Optionen @property def clear_tables(self) -> bool: return self._get_bool('options', 'clear_tables', True) @property def log_level(self) -> str: level = self.config.get('options', 'log_level', fallback='INFO').strip().upper() valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] return level if level in valid_levels else 'INFO' @property def log_file(self) -> Optional[str]: value = self.config.get('options', 'log_file', fallback='').strip() return value if value else None # Advanced @property def scan_threads(self) -> int: if self.config.has_section('advanced'): return self._get_int('advanced', 'scan_threads', 50) return 50 # Konfiguration laden (global, wird in main() initialisiert) config = None # Logging konfigurieren (wird nach Config-Laden angepasst) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class TahomaAPI: """Klasse für die Kommunikation mit der Tahoma Local API""" def __init__(self, gateway_ip: str, api_token: str): """ Initialisiert die Tahoma API Verbindung Args: gateway_ip: IP-Adresse der Tahoma Box api_token: API Token (Bearer Token) """ self.base_url = f"https://{gateway_ip}:8443/enduser-mobile-web/1/enduserAPI" self.headers = { "Authorization": f"Bearer {api_token}", "Content-Type": "application/json" } def get_setup(self) -> Optional[Dict]: """ Ruft die komplette Setup-Konfiguration ab Returns: Dictionary mit allen Geräten oder None bei Fehler """ try: url = f"{self.base_url}/setup" response = requests.get(url, headers=self.headers, verify=False, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.error(f"Fehler beim Abrufen der Setup-Daten: {e}") return None def get_devices(self) -> List[Dict]: """ Extrahiert alle Geräte aus dem Setup Returns: Liste aller Geräte """ setup = self.get_setup() if not setup: return [] devices = setup.get('devices', []) logger.info(f"{len(devices)} Geräte gefunden") return devices def get_device_definition(self, device_url: str) -> Optional[Dict]: """ Ruft die detaillierte Definition eines Geräts ab Args: device_url: URL des Geräts Returns: Dictionary mit Gerätedefinition oder None bei Fehler """ try: # Device URL encodieren from urllib.parse import quote encoded_url = quote(device_url, safe='') url = f"{self.base_url}/setup/devices/{encoded_url}" response = requests.get(url, headers=self.headers, verify=False, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.debug(f"Fehler beim Abrufen der Device-Definition für {device_url}: {e}") return None def get_device_states(self, device_url: str) -> List[Dict]: """ Ruft die aktuellen States eines Geräts ab Args: device_url: URL des Geräts Returns: Liste der States """ try: from urllib.parse import quote encoded_url = quote(device_url, safe='') url = f"{self.base_url}/setup/devices/{encoded_url}/states" response = requests.get(url, headers=self.headers, verify=False, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.debug(f"Fehler beim Abrufen der Device-States für {device_url}: {e}") return [] class DatabaseManager: """Klasse für die MySQL-Datenbankoperationen""" def __init__(self, host: str, database: str, user: str, password: str, port: int = 3306): """ Initialisiert die Datenbankverbindung Args: host: MySQL Host database: Datenbankname user: Benutzername password: Passwort port: Port (Standard: 3306) """ self.host = host self.database = database self.user = user self.password = password self.port = port self.connection = None def connect(self) -> bool: """ Stellt Verbindung zur Datenbank her Returns: True bei Erfolg, False bei Fehler """ try: self.connection = pymysql.connect( host=self.host, database=self.database, user=self.user, password=self.password, port=self.port, charset='utf8mb4' ) logger.info("Erfolgreich mit MariaDB/MySQL-Datenbank verbunden") return True except Error as e: logger.error(f"Fehler bei der Datenbankverbindung: {e}") return False def disconnect(self): """Schließt die Datenbankverbindung""" if self.connection: self.connection.close() logger.info("Datenbankverbindung geschlossen") def clear_tables(self): """Löscht alle Einträge aus allen Tabellen""" try: cursor = self.connection.cursor() # Foreign Key Constraints temporär deaktivieren cursor.execute("SET FOREIGN_KEY_CHECKS=0") # Alle Tabellen leeren cursor.execute("DELETE FROM command_parameters") cursor.execute("DELETE FROM actor_commands") cursor.execute("DELETE FROM actor_states") cursor.execute("DELETE FROM actors") cursor.execute("DELETE FROM sensor_states") cursor.execute("DELETE FROM sensors") # Foreign Key Constraints wieder aktivieren cursor.execute("SET FOREIGN_KEY_CHECKS=1") self.connection.commit() logger.info("Alle Tabellen geleert") cursor.close() except Error as e: logger.error(f"Fehler beim Leeren der Tabellen: {e}") self.connection.rollback() def insert_actor(self, device_type: str, name: str, url: str, commands: list, states: list) -> bool: """ Fügt einen Aktor mit Commands und States in die Datenbank ein (AKTUALISIERT für MQTT URL-Support) """ try: cursor = self.connection.cursor() # 1. Aktor einfügen query = """ INSERT INTO actors (type, name, parameters, url) VALUES (%s, %s, NULL, %s) """ cursor.execute(query, (device_type, name, url)) actor_id = cursor.lastrowid # 2. Commands einfügen for cmd in commands: command_name = cmd.get('command', '') cmd_query = """ INSERT INTO actor_commands (actor_id, command_name) VALUES (%s, %s) """ cursor.execute(cmd_query, (actor_id, command_name)) command_id = cursor.lastrowid # Parameter mit URL einfügen cmd_params = cmd.get('parameters', []) for param in cmd_params: param_query = """ INSERT INTO command_parameters (command_id, parameter_name, parameter_type, min_value, max_value, possible_values, url) VALUES (%s, %s, %s, %s, %s, %s, %s) """ param_name = param.get('name', '') param_type = param.get('type', '') min_val = param.get('min') max_val = param.get('max') possible_vals = json.dumps(param.get('values')) if 'values' in param else None param_url = param.get('url') # NEU: MQTT Topic cursor.execute(param_query, (command_id, param_name, param_type, min_val, max_val, possible_vals, param_url)) # 3. States mit URL einfügen for state in states: state_query = """ INSERT INTO actor_states (actor_id, state_name, state_type, current_value, unit, url) VALUES (%s, %s, %s, %s, %s, %s) """ state_name = state.get('name', '') state_type = state.get('type', 0) current_value = str(state.get('current_value', '')) if 'current_value' in state else None unit = state.get('unit') state_url = state.get('url') # NEU: MQTT Topic cursor.execute(state_query, (actor_id, state_name, state_type, current_value, unit, state_url)) self.connection.commit() cursor.close() return True except Error as e: logger.error(f"Fehler beim Einfügen des Aktors {name}: {e}") self.connection.rollback() return False def insert_sensor(self, device_type: str, name: str, url: str, states: list) -> bool: """ Fügt einen Sensor mit States in die Datenbank ein (AKTUALISIERT für MQTT URL-Support) """ try: cursor = self.connection.cursor() # 1. Sensor einfügen query = """ INSERT INTO sensors (type, name, parameters, url) VALUES (%s, %s, NULL, %s) """ cursor.execute(query, (device_type, name, url)) sensor_id = cursor.lastrowid # 2. States mit URL einfügen for state in states: state_query = """ INSERT INTO sensor_states (sensor_id, state_name, state_type, current_value, unit, url) VALUES (%s, %s, %s, %s, %s, %s) """ state_name = state.get('name', '') state_type = state.get('type', 0) current_value = str(state.get('current_value', '')) if 'current_value' in state else None unit = state.get('unit') state_url = state.get('url') # NEU: MQTT Topic cursor.execute(state_query, (sensor_id, state_name, state_type, current_value, unit, state_url)) self.connection.commit() cursor.close() return True except Error as e: logger.error(f"Fehler beim Einfügen des Sensors {name}: {e}") self.connection.rollback() return False class WLEDDiscovery: """Klasse für die automatische WLED-Geräteerkennung im Netzwerk""" @staticmethod def discover_devices(timeout: int = 5) -> List[str]: """ Sucht nach WLED-Geräten im lokalen Netzwerk mittels mDNS Args: timeout: Timeout in Sekunden für die Suche Returns: Liste mit IP-Adressen gefundener WLED-Geräte """ try: from zeroconf import ServiceBrowser, ServiceListener, Zeroconf import time class WLEDListener(ServiceListener): def __init__(self): self.devices = [] def add_service(self, zc, type_, name): info = zc.get_service_info(type_, name) if info: # IP-Adresse extrahieren addresses = [socket.inet_ntoa(addr) for addr in info.addresses] for addr in addresses: if addr not in self.devices: self.devices.append(addr) logger.info(f"WLED-Gerät gefunden: {name} ({addr})") def remove_service(self, zc, type_, name): pass def update_service(self, zc, type_, name): pass zeroconf = Zeroconf() listener = WLEDListener() browser = ServiceBrowser(zeroconf, "_http._tcp.local.", listener) logger.info(f"Suche nach WLED-Geräten (Timeout: {timeout}s)...") time.sleep(timeout) zeroconf.close() # Filtern: Nur WLED-Geräte wled_devices = [] for ip in listener.devices: if WLEDDiscovery.is_wled_device(ip): wled_devices.append(ip) logger.info(f"{len(wled_devices)} WLED-Geräte gefunden") return wled_devices except ImportError: logger.warning("zeroconf-Bibliothek nicht installiert. Verwende Netzwerk-Scan...") return WLEDDiscovery.scan_network() except Exception as e: logger.error(f"Fehler bei WLED-Discovery: {e}") return [] @staticmethod def scan_network(network: str = None, max_threads: int = 50) -> List[str]: """ Scannt das Netzwerk nach WLED-Geräten (Fallback-Methode) Args: network: Netzwerk im Format "192.168.1.0/24" (None = automatisch) max_threads: Maximale Anzahl paralleler Threads Returns: Liste mit IP-Adressen gefundener WLED-Geräte """ import socket import concurrent.futures if network is None: # Eigene IP ermitteln und Netzwerk ableiten try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] s.close() # Netzwerk ableiten (angenommen /24) network_prefix = '.'.join(local_ip.split('.')[:-1]) except: logger.warning("Konnte lokale IP nicht ermitteln, verwende 192.168.1.x") network_prefix = "192.168.1" else: network_prefix = '.'.join(network.split('.')[:3]) logger.info(f"Scanne Netzwerk {network_prefix}.0/24 nach WLED-Geräten...") def check_ip(ip): if WLEDDiscovery.is_wled_device(ip): return ip return None wled_devices = [] with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: futures = [executor.submit(check_ip, f"{network_prefix}.{i}") for i in range(1, 255)] for future in concurrent.futures.as_completed(futures): result = future.result() if result: wled_devices.append(result) logger.info(f"WLED-Gerät gefunden: {result}") return wled_devices @staticmethod def is_wled_device(ip: str, timeout: float = 1.0) -> bool: """ Prüft ob eine IP-Adresse ein WLED-Gerät ist Args: ip: IP-Adresse timeout: Timeout für die Anfrage Returns: True wenn WLED-Gerät, False sonst """ try: response = requests.get( f"http://{ip}/json/info", timeout=timeout, headers={'User-Agent': 'TahomaSync/1.0'} ) if response.status_code == 200: data = response.json() # WLED antwortet mit spezifischen Feldern return 'ver' in data or 'name' in data except: pass return False class WLEDAPI: """Klasse für die Kommunikation mit WLED-Geräten""" def __init__(self, ip: str): """ Initialisiert die WLED-API Verbindung Args: ip: IP-Adresse des WLED-Geräts """ self.ip = ip self.base_url = f"http://{ip}" def get_info(self) -> Optional[Dict]: """ Ruft Geräteinformationen ab Returns: Dictionary mit Geräteinformationen oder None """ try: response = requests.get(f"{self.base_url}/json/info", timeout=2) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Fehler beim Abrufen der WLED-Info von {self.ip}: {e}") return None def get_presets(self) -> Optional[Dict]: """ Ruft die Presets ab Returns: Dictionary mit Presets oder None """ try: response = requests.get(f"{self.base_url}/presets.json", timeout=2) response.raise_for_status() presets_data = response.json() # Nur die relevanten Daten extrahieren preset_list = [] if isinstance(presets_data, dict): for preset_id, preset_data in presets_data.items(): preset_name = preset_data.get('n', f'Preset {preset_id}') preset_list.append({ int(preset_id): preset_name }) return preset_list except Exception as e: logger.error(f"Fehler beim Abrufen der WLED-Presets von {self.ip}: {e}") return None def get_effects(self) -> Optional[Dict]: """ Ruft die Effects ab Returns: Dictionary mit effects oder None """ try: response = requests.get(f"{self.base_url}/json/eff", timeout=2) response.raise_for_status() eff_data = response.json() # Nur die relevanten Daten extrahieren eff_list = [] for eff_id, eff_data in enumerate(eff_data): if(eff_data): eff_name = eff_data else: eff_name = "Effect "+eff_id eff_list.append({ int(eff_id): eff_name }) return eff_list except Exception as e: logger.error(f"Fehler beim Abrufen der WLED-Effects von {self.ip}: {e}") return None def get_state(self) -> Optional[Dict]: """ Ruft aktuellen Zustand ab Returns: Dictionary mit aktuellem Zustand oder None """ try: response = requests.get(f"{self.base_url}/json/state", timeout=2) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Fehler beim Abrufen des WLED-State von {self.ip}: {e}") return None def get_device_data(self) -> Optional[Dict]: """ Erstellt ein Geräte-Dictionary im Tahoma-ähnlichen Format Returns: Dictionary mit Gerätedaten """ info = self.get_info() state = self.get_state() if not info: return None # Name des Geräts name = info.get('name', f"WLED {self.ip}") preset_values = self.get_presets() eff_values = self.get_effects() # Commands für WLED erstellen commands = [ { 'command': 'on', 'parameters': [] }, { 'command': 'off', 'parameters': [] }, { 'command': 'setBrightness', 'parameters': [ { 'name': 'brightness', 'type': 'integer', 'min': 0, 'max': 255 } ] }, { 'command': 'setColor', 'parameters': [ { 'name': 'red', 'type': 'integer', 'min': 0, 'max': 255 }, { 'name': 'green', 'type': 'integer', 'min': 0, 'max': 255 }, { 'name': 'blue', 'type': 'integer', 'min': 0, 'max': 255 } ] }, { 'command': 'setEffect', 'parameters': [ { 'name': 'effect', 'type': 'integer', 'min': 0, 'max': 255, 'values':eff_values } ] }, { 'command': 'setPreset', 'parameters': [ { 'name': 'preset', 'type': 'integer', 'min': 1, 'max': 250, 'values': preset_values } ] } ] # States erstellen states = [] if state: states.append({ 'name': 'power', 'type': 'boolean', 'current_value': state.get('on', False) }) states.append({ 'name': 'brightness', 'type': 'integer', 'current_value': state.get('bri', 0) }) # Farbe (erstes Segment) segments = state.get('seg', []) if segments and len(segments) > 0: seg = segments[0] colors = seg.get('col', [[0,0,0]]) if colors and len(colors) > 0: rgb = colors[0] states.append({ 'name': 'color_rgb', 'type': 'array', 'current_value': rgb }) return { 'name': name, 'type': 'WLED', 'url': f"wled://{self.ip}", 'commands': commands, 'states': states, 'info': { 'version': info.get('ver', 'unknown'), 'ip': self.ip, 'mac': info.get('mac', 'unknown') } } class DeviceClassifier: """Klassifiziert Geräte als Aktoren oder Sensoren""" # Bekannte Aktor-Typen (können erweitert werden) ACTOR_TYPES = { 'RollerShutter', 'ExteriorScreen', 'Awning', 'Blind', 'GarageDoor', 'Window', 'Light', 'OnOff', 'DimmableLight', 'HeatingSystem', 'Valve', 'Switch', 'Door', 'Curtain', 'VenetianBlind', 'PergolaScreen' } # Bekannte Sensor-Typen (können erweitert werden) SENSOR_TYPES = { 'TemperatureSensor', 'LightSensor', 'HumiditySensor', 'ContactSensor', 'OccupancySensor', 'SmokeSensor', 'WaterDetectionSensor', 'WindowHandle', 'MotionSensor', 'SunSensor', 'WindSensor', 'RainSensor', 'ConsumptionSensor' } # Bekannte Kommandos (können erweitert werden) TAHOMA_COMMANDS = { # ---------- Jalousien & Rollos ---------- "setClosure": [ { "name": "position", "type": "integer", "min": 0, "max": 100, "unit": "%", "description": "0 % = offen, 100 % = zu" } ], "setClosureAndOrientation": [ { "name": "position", "type": "integer", "min": 0, "max": 100, "unit": "%", "description": "0 % = offen, 100 % = zu" }, { "name": "neigung", "type": "integer", "min": 0, "max": 100, "unit": "%", "description": "0 % = offen, 100 % = zu (Neigungswinkel bei Lamellen)" }, ], "setOrientation": [ { "name": "neigung", "type": "integer", "min": 0, "max": 100, "unit": "%", "description": "0 % = offen, 100 % = zu (Nur Neigungswinkel)" } ], "up": [], # öffnet vollständig → kein Parameter "down": [], # schließt vollständig → kein Parameter "my": [], # fährt zur „Meine‑Position“ "setMyPosition": [ { "name": "position", "type": "integer", "min": 0, "max": 100, "unit": "%", "description": "Speichert die aktuelle bzw. angegebene Position (0 % = offen, 100 % = zu)" } ], "stop": [], # sofort anhalten "refresh": [], # Status neu abfragen # ---------- Licht / Schalter ---------- "on": [], "off": [], "toggle": [], "setIntensity": [ { "name": "helligkeit", "type": "integer", "min": 0, "max": 100, "unit": "%", "description": "Helligkeit des Lichts" } ], "setColor": [ { "name": "farbton", "type": "integer", "min": 0, "max": 360, "unit": "°", "description": "Farbton (0‑360°)" }, { "name": "sättigung", "type": "integer", "min": 0, "max": 100, "unit": "%", "description": "Sättigung des Farbtons" }, ], "setColorTemperature": [ { "name": "farbtemperatur", "type": "integer", "min": 2000, "max": 6500, "unit": "K", "description": "Farbtemperatur in Kelvin" } ], "setTransition": [ { "name": "dauer", "type": "integer", "min": 0, "max": 3600, "unit": "s", "description": "Übergangszeit für nachfolgende Befehle" } ], # ---------- Thermostat ---------- "setTargetTemperature": [ { "name": "temperatur", "type": "float", "min": 5.0, "max": 30.0, "unit": "°C", "description": "Soll‑Temperatur" } ], "setMode": [ { "name": "betriebsart", "type": "string", "enum": ["off", "heating", "cooling", "auto"], "unit": None, "description": "Betriebsmodus des Thermostats" } ], "setBoost": [ { "name": "boost_dauer", "type": "integer", "min": 1, "max": 180, "unit": "min", "description": "Kurz‑Boost‑Dauer" } ], # ---------- Schalt‑ / Szenen‑Geräte ---------- "pulse": [ { "name": "impuls_dauer", "type": "integer", "min": 1, "max": 3600, "unit": "s", "description": "Kurzimpuls‑Dauer" } ], "setLevel": [ { "name": "ausgangs_level", "type": "integer", "min": 0, "max": 100, "unit": "%", "description": "Ausgangs‑Level (dimmbare Relais)" } ], "trigger": [], # Szene ausführen → kein Parameter # ---------- Meta‑Befehle (für alle Geräte) ---------- "configureReporting": [ { "name": "intervall", "type": "integer", "min": 30, "max": 86400, "unit": "s", "description": "Meldeintervall für das Gerät" } ], "setBatteryThreshold": [ { "name": "warnschwelle", "type": "integer", "min": 0, "max": 100, "unit": "%", "description": "Batteriewarnschwelle" } ], } @classmethod def is_actor(cls, device: Dict) -> bool: """ Prüft, ob ein Gerät ein Aktor ist Args: device: Geräte-Dictionary Returns: True wenn Aktor, False sonst """ device_type = device.get('definition', '').get('uiClass', '') # Prüfung nach bekannten Typen if device_type in cls.ACTOR_TYPES: return True # Prüfung nach Commandos (Aktoren haben typischerweise Commands) commands = device.get('definition', {}).get('commands', []) if commands and len(commands) > 0: # Wenn Commands wie open, close, on, off existieren command_names = [cmd.get('commandName', '') for cmd in commands] actor_commands = {'open', 'close', 'on', 'off', 'up', 'down', 'setPosition', 'dim'} if any(cmd in actor_commands for cmd in command_names): return True return False @classmethod def is_sensor(cls, device: Dict) -> bool: """ Prüft, ob ein Gerät ein Sensor ist Args: device: Geräte-Dictionary Returns: True wenn Sensor, False sonst """ device_type = device.get('definition', '').get('uiClass', '') controllable_name = device.get('controllableName', '') # Prüfung nach bekannten Typen if device_type in cls.SENSOR_TYPES: return True if controllable_name in cls.SENSOR_TYPES: return True else: return False # Prüfung nach States (Sensoren haben typischerweise nur States, keine Commands) states = device.get('states', []) commands = device.get('definition', {}).get('commands', []) # Sensor hat States aber keine oder nur wenige Commands if states and len(states) > 0 and len(commands) <= 1: return True return False @classmethod def extract_actor_data(cls, device: Dict) -> tuple: """ Extrahiert Commands und States aus einem Aktor Args: device: Geräte-Dictionary von der Tahoma API Returns: Tuple (commands_list, states_list) """ commands = [] states = [] # Commands aus der Definition extrahieren cmd_definitions = device.get('definition', {}).get('commands', []) for cmd in cmd_definitions: command_name = cmd.get('commandName', '') command_entry = { 'command': command_name, 'parameters': [] } # HINWEIS: Tahoma API liefert oft keine detaillierten Parameter-Infos # Daher werden Commands erstmal ohne Parameter-Details gespeichert #cmd_params = cmd.get('parameters', []) cmd_params = cls.TAHOMA_COMMANDS.get(command_name,[]) for cmd_param in cmd_params: param_detail = { 'name': cmd_param.get('name', '') } # Datentyp (falls vorhanden) param_type = cmd_param.get('type') if param_type: param_detail['type'] = param_type # Min/Max Werte (meist nicht in Tahoma API vorhanden) if 'min' in cmd_param: param_detail['min'] = cmd_param['min'] if 'max' in cmd_param: param_detail['max'] = cmd_param['max'] # Mögliche Werte (enum) (meist nicht vorhanden) if 'values' in cmd_param: param_detail['values'] = cmd_param['values'] if 'unit' in cmd_param: param_detail['unit'] = cmd_param['unit'] if 'description' in cmd_param: param_detail['description'] = cmd_param['description'] # Nur hinzufügen wenn Name vorhanden if param_detail['name']: command_entry['parameters'].append(param_detail) commands.append(command_entry) # States extrahieren state_definitions = device.get('states', []) for state in state_definitions: state_name = state.get('name', '') if state_name: state_entry = { 'name': state_name, 'type': state.get('type', 0) } if 'value' in state: state_entry['current_value'] = state['value'] states.append(state_entry) return commands, states @classmethod def extract_sensor_data(cls, device: Dict) -> list: """ Extrahiert States aus einem Sensor Args: device: Geräte-Dictionary von der Tahoma API Returns: Liste der States """ states = [] # States extrahieren state_definitions = device.get('states', []) for state in state_definitions: state_name = state.get('name', '') if state_name: state_entry = { 'name': state_name, 'type': state.get('type', 0) } # Aktueller Wert falls vorhanden if 'value' in state: state_entry['current_value'] = state['value'] states.append(state_entry) return states def process_devices(tahoma: TahomaAPI, db: DatabaseManager, enable_wled: bool = True, wled_timeout: int = 5, enable_mqtt: bool = True, # NEU clear_before_insert: bool = True): """ Verarbeitet alle Geräte und speichert sie in der Datenbank Args: tahoma: TahomaAPI Instanz db: DatabaseManager Instanz enable_wled: WLED-Geräte suchen und hinzufügen wled_timeout: Timeout für WLED-Discovery in Sekunden clear_before_insert: Tabellen vor dem Einfügen leeren (Standard: True) """ # Optional: Tabellen leeren if clear_before_insert: db.clear_tables() actor_count = 0 sensor_count = 0 unknown_count = 0 # ========== TAHOMA-GERÄTE VERARBEITEN ========== logger.info("=" * 60) logger.info("TAHOMA-GERÄTE WERDEN ABGERUFEN") logger.info("=" * 60) devices = tahoma.get_devices() if not devices: logger.warning("Keine Tahoma-Geräte gefunden") else: # Gruppierte Geräte identifizieren (#1, #2, etc.) device_groups = {} # {base_url: [devices]} standalone_devices = [] for device in devices: device_url = device.get('deviceURL', '') # Prüfen ob URL mit #1, #2, etc. endet import re match = re.match(r'(.+)#(\d+)$', device_url) if match: base_url = match.group(1) if base_url not in device_groups: device_groups[base_url] = [] device_groups[base_url].append(device) else: standalone_devices.append(device) # Gruppierte Geräte verarbeiten for base_url, group_devices in device_groups.items(): # Hauptgerät finden (ohne #-Endung oder mit #1) main_device = None sub_devices = [] for dev in group_devices: url = dev.get('deviceURL', '') if url.endswith('#1'): main_device = dev else: sub_devices.append(dev) # Falls kein #1, nehme das erste Gerät als Hauptgerät if not main_device and group_devices: main_device = group_devices[0] sub_devices = group_devices[1:] # Name vom Hauptgerät für alle übernehmen main_name = main_device.get('label', 'Unbekannt') if main_device else 'Unbekannt' # Alle Geräte der Gruppe verarbeiten mit gemeinsamem Namen for device in group_devices: device_url = device.get('deviceURL', '') # controllableName als Typ verwenden device_type = device.get('controllableName', device.get('uiClass', 'Unknown')) # Gerät klassifizieren is_actor = DeviceClassifier.is_actor(device) is_sensor = DeviceClassifier.is_sensor(device) if is_actor: commands, states = DeviceClassifier.extract_actor_data(device) if db.insert_actor(device_type, main_name, device_url, commands, states): actor_count += 1 logger.info(f"✓ Aktor [Gruppe]: {main_name} ({device_type}) - " f"{len(commands)} Commands, {len(states)} States") elif is_sensor: states = DeviceClassifier.extract_sensor_data(device) if db.insert_sensor(device_type, main_name, device_url, states): sensor_count += 1 logger.info(f"✓ Sensor [Gruppe]: {main_name} ({device_type}) - " f"{len(states)} States") else: unknown_count += 1 logger.warning(f"⚠ Unbekannt [Gruppe]: {main_name} ({device_type}-{device_url})") # Standalone-Geräte verarbeiten for device in standalone_devices: device_url = device.get('deviceURL', '') device_name = device.get('label', 'Unbekannt') # controllableName als Typ verwenden device_type = device.get('controllableName', device.get('uiClass', 'Unknown')) # Gerät klassifizieren is_actor = DeviceClassifier.is_actor(device) is_sensor = DeviceClassifier.is_sensor(device) if is_actor: # Daten extrahieren commands, states = DeviceClassifier.extract_actor_data(device) # In Datenbank speichern if db.insert_actor(device_type, device_name, device_url, commands, states): actor_count += 1 logger.info(f"✓ Aktor: {device_name} ({device_type}) - " f"{len(commands)} Commands, {len(states)} States") elif is_sensor: # Daten extrahieren states = DeviceClassifier.extract_sensor_data(device) # In Datenbank speichern if db.insert_sensor(device_type, device_name, device_url, states): sensor_count += 1 logger.info(f"✓ Sensor: {device_name} ({device_type}) - " f"{len(states)} States") else: unknown_count += 1 logger.warning(f"⚠ Unbekannt: {device_name} ({device_type})") # ========== WLED-GERÄTE SUCHEN UND VERARBEITEN ========== if enable_wled: logger.info("\n" + "=" * 60) logger.info("WLED-GERÄTE WERDEN GESUCHT") logger.info("=" * 60) wled_ips = WLEDDiscovery.discover_devices(timeout=wled_timeout) # Manuelle IPs aus Config hinzufügen if config and config.wled_manual_ips: logger.info(f"Füge {len(config.wled_manual_ips)} manuelle WLED-IPs hinzu...") for manual_ip in config.wled_manual_ips: if manual_ip not in wled_ips: if WLEDDiscovery.is_wled_device(manual_ip): wled_ips.append(manual_ip) logger.info(f"✓ Manuelles WLED-Gerät: {manual_ip}") else: logger.warning(f"⚠ {manual_ip} ist kein WLED-Gerät") if not wled_ips: logger.info("Keine WLED-Geräte gefunden") else: logger.info(f"\n{len(wled_ips)} WLED-Geräte gefunden, füge sie hinzu...") for ip in wled_ips: try: wled = WLEDAPI(ip) device_data = wled.get_device_data() if device_data: name = device_data['name'] device_type = device_data['type'] url = device_data['url'] commands = device_data['commands'] states = device_data['states'] # WLED immer als Aktor hinzufügen if db.insert_actor(device_type, name, url, commands, states): actor_count += 1 logger.info(f"✓ WLED: {name} ({ip}) - " f"{len(commands)} Commands, {len(states)} States") else: logger.warning(f"⚠ Konnte keine Daten von WLED {ip} abrufen") except Exception as e: logger.error(f"✗ Fehler beim Verarbeiten von WLED {ip}: {e}") # ========== MQTT-GERÄTE SUCHEN UND VERARBEITEN ========== # ========== MQTT-GERÄTE SUCHEN UND VERARBEITEN ========== if enable_mqtt: logger.info("\n" + "=" * 60) logger.info("MQTT/HOME ASSISTANT DISCOVERY") logger.info("=" * 60) try: # MQTT Discovery initialisieren mqtt_discovery = HomeAssistantDiscovery( broker=config.mqtt_broker, port=config.mqtt_port, username=config.mqtt_username, password=config.mqtt_password, discovery_prefix=config.mqtt_discovery_prefix ) # Verbinden if mqtt_discovery.connect(): # Discovery durchführen mqtt_entities = mqtt_discovery.discover_devices( timeout=config.mqtt_discovery_timeout ) # Entities nach Geräten gruppieren mqtt_devices = MQTTDeviceConverter.group_entities_by_device(mqtt_entities) # Geräte verarbeiten if not mqtt_devices: logger.info("Keine MQTT-Geräte gefunden") else: logger.info(f"\n{len(mqtt_devices)} MQTT-Geräte gefunden (aus {len(mqtt_entities)} Entities)") for device_id, device_data in mqtt_devices.items(): try: # Gerät in Actor/Sensor konvertieren actor_data, sensor_data = MQTTDeviceConverter.convert_device_to_actors_and_sensors( device_id, device_data ) # Actor speichern falls vorhanden if actor_data: if db.insert_actor( actor_data['type'], actor_data['name'], actor_data['url'], actor_data['commands'], actor_data['states'] ): actor_count += 1 logger.info(f"✓ MQTT Actor: {actor_data['name']} - " f"{len(actor_data['commands'])} Commands, " f"{len(actor_data['states'])} States") # Sensor speichern falls vorhanden if sensor_data: if db.insert_sensor( sensor_data['type'], sensor_data['name'], sensor_data['url'], sensor_data['states'] ): sensor_count += 1 logger.info(f"✓ MQTT Sensor: {sensor_data['name']} - " f"{len(sensor_data['states'])} States") except Exception as e: logger.error(f"✗ Fehler beim Verarbeiten von MQTT-Gerät {device_id}: {e}") import traceback traceback.print_exc() # Verbindung trennen mqtt_discovery.disconnect() else: logger.error("MQTT-Verbindung fehlgeschlagen") except Exception as e: logger.error(f"✗ MQTT Discovery Fehler: {e}") import traceback traceback.print_exc() # ========== ZUSAMMENFASSUNG ========== logger.info("\n" + "=" * 60) logger.info("ZUSAMMENFASSUNG") logger.info("=" * 60) logger.info(f"Aktoren gespeichert: {actor_count}") logger.info(f"Sensoren gespeichert: {sensor_count}") logger.info(f"Unbekannte Geräte: {unknown_count}") logger.info("=" * 60) def main(): """Hauptfunktion""" global config # Konfigurationsdatei laden try: config = Config('config.ini') logger.info("Konfiguration erfolgreich geladen") except FileNotFoundError as e: print(f"FEHLER: {e}") print("\nBitte erstellen Sie eine config.ini Datei mit Ihren Einstellungen.") print("Siehe config.ini Vorlage für Details.") return except Exception as e: print(f"FEHLER beim Laden der Konfiguration: {e}") return # Logging-Level anpassen log_level = getattr(logging, config.log_level) logger.setLevel(log_level) # Optional: Log-Datei einrichten if config.log_file: file_handler = logging.FileHandler(config.log_file, encoding='utf-8') file_handler.setLevel(log_level) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logger.addHandler(file_handler) logger.info(f"Logging in Datei: {config.log_file}") # Tahoma API initialisieren logger.info("Verbinde mit Tahoma Box...") tahoma = TahomaAPI(config.tahoma_ip, config.tahoma_token) # Datenbank initialisieren logger.info("Verbinde mit MySQL-Datenbank...") db = DatabaseManager( config.db_host, config.db_name, config.db_user, config.db_password, config.db_port ) if not db.connect(): logger.error("Datenbankverbindung fehlgeschlagen. Abbruch.") return try: # Geräte verarbeiten und in Datenbank speichern process_devices( tahoma, db, enable_wled=config.wled_enable, wled_timeout=config.wled_discovery_timeout, enable_mqtt=config.mqtt_enable, # NEU clear_before_insert=config.clear_tables ) logger.info("\n✓ Import erfolgreich abgeschlossen!") except Exception as e: logger.error(f"✗ Fehler während der Verarbeitung: {e}") import traceback traceback.print_exc() finally: # Datenbankverbindung schließen db.disconnect() if __name__ == "__main__": main()