#!/usr/bin/env python3 """ Shelly Device Discovery Script Findet alle Shelly-Geräte im lokalen Netzwerk und speichert Sensoren und Aktoren in der Datenbank gemäß dem EnergyFlow Schema. """ import json import requests import socket import mysql.connector from mysql.connector import Error from typing import List, Dict, Optional import argparse import logging from datetime import datetime # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class ShellyDiscovery: """Klasse zum Entdecken von Shelly-Geräten im Netzwerk""" SHELLY_MDNS_SERVICE = "_http._tcp.local." COMMON_PORTS = [80] def __init__(self, network_range: str = "192.168.1"): self.network_range = network_range self.devices = [] def scan_network(self, start_ip: int = 1, end_ip: int = 254, timeout: float = 0.5) -> List[str]: """ Scannt das Netzwerk nach aktiven Hosts Args: start_ip: Start IP (letztes Oktett) end_ip: End IP (letztes Oktett) timeout: Timeout für Socket-Verbindung Returns: Liste von erreichbaren IP-Adressen """ active_hosts = [] logger.info(f"Scanne Netzwerk {self.network_range}.{start_ip}-{end_ip}...") for i in range(start_ip, end_ip + 1): ip = f"{self.network_range}.{i}" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) try: result = sock.connect_ex((ip, 80)) if result == 0: active_hosts.append(ip) logger.debug(f"Host gefunden: {ip}") except: pass finally: sock.close() logger.info(f"{len(active_hosts)} aktive Hosts gefunden") return active_hosts def is_shelly_device(self, ip: str) -> Optional[Dict]: """ Prüft ob ein Host ein Shelly-Gerät ist Args: ip: IP-Adresse des Hosts Returns: Device Info Dict wenn Shelly, sonst None """ try: # Versuche Gen2 API (neuere Shelly-Geräte) response = requests.get( f"http://{ip}/rpc/Shelly.GetDeviceInfo", timeout=2 ) if response.status_code == 200: data = response.json() logger.info(f"Shelly Gen2 Gerät gefunden: {ip} - {data.get('name', 'Unknown')}") return { 'ip': ip, 'generation': 2, 'info': data } except: pass try: # Versuche Gen1 API (ältere Shelly-Geräte) response = requests.get( f"http://{ip}/shelly", timeout=2 ) if response.status_code == 200: data = response.json() if 'type' in data and data['type'].startswith('SHELLY'): logger.info(f"Shelly Gen1 Gerät gefunden: {ip} - {data.get('type', 'Unknown')}") return { 'ip': ip, 'generation': 1, 'info': data } except: pass return None def get_device_status(self, device: Dict) -> Optional[Dict]: """ Holt den Status eines Shelly-Geräts Args: device: Device Info Dictionary Returns: Status Dictionary oder None """ ip = device['ip'] try: if device['generation'] == 2: # Gen2 Status response = requests.get( f"http://{ip}/rpc/Shelly.GetStatus", timeout=2 ) if response.status_code == 200: return response.json() else: # Gen1 Status response = requests.get( f"http://{ip}/status", timeout=2 ) if response.status_code == 200: return response.json() except Exception as e: logger.error(f"Fehler beim Abrufen des Status von {ip}: {e}") return None def discover_devices(self, start_ip: int = 1, end_ip: int = 254) -> List[Dict]: """ Entdeckt alle Shelly-Geräte im Netzwerk Returns: Liste von Shelly-Geräten mit Status """ active_hosts = self.scan_network(start_ip, end_ip) for ip in active_hosts: device = self.is_shelly_device(ip) if device: status = self.get_device_status(device) device['status'] = status self.devices.append(device) logger.info(f"Insgesamt {len(self.devices)} Shelly-Geräte entdeckt") return self.devices class ShellyDatabaseWriter: """Klasse zum Schreiben der Shelly-Daten in die Datenbank""" def __init__(self, host: str, user: str, password: str, database: str): self.host = host self.user = user self.password = password self.database = database self.connection = None def connect(self): """Stellt Verbindung zur Datenbank her""" try: self.connection = mysql.connector.connect( host=self.host, user=self.user, password=self.password, database=self.database, charset='utf8mb4', collation='utf8mb4_bin' ) logger.info("Datenbankverbindung hergestellt") except Error as e: logger.error(f"Fehler bei Datenbankverbindung: {e}") raise def disconnect(self): """Schließt Datenbankverbindung""" if self.connection and self.connection.is_connected(): self.connection.close() logger.info("Datenbankverbindung geschlossen") def parse_gen2_device(self, device: Dict) -> tuple: """ Parst Gen2 Shelly-Gerät und extrahiert Aktoren/Sensoren Returns: (actors, sensors) Tuple mit Listen """ actors = [] sensors = [] info = device.get('info', {}) status = device.get('status', {}) ip = device['ip'] device_name = info.get('name', f"Shelly_{info.get('id', ip)}") device_model = info.get('model', 'Unknown') # Switches als Aktoren if 'switch:0' in status or 'switch' in status: switch_count = 0 for key in status.keys(): if key.startswith('switch:'): switch_count += 1 for i in range(switch_count): switch_data = status.get(f'switch:{i}', {}) actors.append({ 'type': f'ShellySwitch_{device_model}', 'name': f"{device_name}_Switch_{i}", 'url': f"http://{ip}/rpc/Switch.Set?id={i}", 'parameters': json.dumps({ 'device_id': info.get('id'), 'switch_id': i, 'model': device_model, 'generation': 2 }), 'commands': [ {'command_name': 'turn_on', 'params': []}, {'command_name': 'turn_off', 'params': []}, {'command_name': 'toggle', 'params': []} ], 'states': [ { 'state_name': 'output', 'current_value': str(switch_data.get('output', False)), 'unit': None } ] }) # Cover/Roller als Aktoren if 'cover:0' in status: cover_count = 0 for key in status.keys(): if key.startswith('cover:'): cover_count += 1 for i in range(cover_count): cover_data = status.get(f'cover:{i}', {}) actors.append({ 'type': f'ShellyCover_{device_model}', 'name': f"{device_name}_Cover_{i}", 'url': f"http://{ip}/rpc/Cover.GoToPosition?id={i}", 'parameters': json.dumps({ 'device_id': info.get('id'), 'cover_id': i, 'model': device_model, 'generation': 2 }), 'commands': [ {'command_name': 'open', 'params': []}, {'command_name': 'close', 'params': []}, {'command_name': 'stop', 'params': []}, {'command_name': 'set_position', 'params': [ { 'parameter_name': 'position', 'parameter_type': 'integer', 'min_value': 0, 'max_value': 100 } ]} ], 'states': [ { 'state_name': 'current_pos', 'current_value': str(cover_data.get('current_pos', 0)), 'unit': '%' }, { 'state_name': 'state', 'current_value': cover_data.get('state', 'unknown'), 'unit': None } ] }) # Temperatur-Sensoren for key in status.keys(): if key.startswith('temperature:'): temp_id = key.split(':')[1] temp_data = status[key] sensors.append({ 'type': 'ShellyTemperatureSensor', 'name': f"{device_name}_Temperature_{temp_id}", 'url': f"http://{ip}/rpc/Temperature.GetStatus?id={temp_id}", 'parameters': json.dumps({ 'device_id': info.get('id'), 'sensor_id': temp_id, 'model': device_model }), 'states': [ { 'state_name': 'temperature', 'current_value': str(temp_data.get('tC', 0)), 'unit': '°C' } ] }) # Humidity-Sensoren for key in status.keys(): if key.startswith('humidity:'): hum_id = key.split(':')[1] hum_data = status[key] sensors.append({ 'type': 'ShellyHumiditySensor', 'name': f"{device_name}_Humidity_{hum_id}", 'url': f"http://{ip}/rpc/Humidity.GetStatus?id={hum_id}", 'parameters': json.dumps({ 'device_id': info.get('id'), 'sensor_id': hum_id, 'model': device_model }), 'states': [ { 'state_name': 'humidity', 'current_value': str(hum_data.get('rh', 0)), 'unit': '%' } ] }) # Energie-Sensoren (Power Meter) for i in range(10): # Max 10 switches/covers prüfen switch_key = f'switch:{i}' if switch_key in status: switch_data = status[switch_key] if 'apower' in switch_data: # Aktuelle Leistung sensors.append({ 'type': 'ShellyPowerMeter', 'name': f"{device_name}_Power_{i}", 'url': f"http://{ip}/rpc/Switch.GetStatus?id={i}", 'parameters': json.dumps({ 'device_id': info.get('id'), 'switch_id': i, 'model': device_model }), 'states': [ { 'state_name': 'active_power', 'current_value': str(switch_data.get('apower', 0)), 'unit': 'W' }, { 'state_name': 'voltage', 'current_value': str(switch_data.get('voltage', 0)), 'unit': 'V' }, { 'state_name': 'current', 'current_value': str(switch_data.get('current', 0)), 'unit': 'A' } ] }) return actors, sensors def parse_gen1_device(self, device: Dict) -> tuple: """ Parst Gen1 Shelly-Gerät und extrahiert Aktoren/Sensoren Returns: (actors, sensors) Tuple mit Listen """ actors = [] sensors = [] info = device.get('info', {}) status = device.get('status', {}) ip = device['ip'] device_type = info.get('type', 'Unknown') device_name = info.get('name', f"Shelly_{device_type}_{ip}") # Relays als Aktoren if 'relays' in status: for i, relay in enumerate(status['relays']): actors.append({ 'type': f'ShellyRelay_{device_type}', 'name': f"{device_name}_Relay_{i}", 'url': f"http://{ip}/relay/{i}", 'parameters': json.dumps({ 'device_type': device_type, 'relay_id': i, 'generation': 1 }), 'commands': [ {'command_name': 'turn_on', 'params': []}, {'command_name': 'turn_off', 'params': []}, {'command_name': 'toggle', 'params': []} ], 'states': [ { 'state_name': 'ison', 'current_value': str(relay.get('ison', False)), 'unit': None } ] }) # Rollers als Aktoren if 'rollers' in status: for i, roller in enumerate(status['rollers']): actors.append({ 'type': f'ShellyRoller_{device_type}', 'name': f"{device_name}_Roller_{i}", 'url': f"http://{ip}/roller/{i}", 'parameters': json.dumps({ 'device_type': device_type, 'roller_id': i, 'generation': 1 }), 'commands': [ {'command_name': 'open', 'params': []}, {'command_name': 'close', 'params': []}, {'command_name': 'stop', 'params': []}, {'command_name': 'go_to_position', 'params': [ { 'parameter_name': 'position', 'parameter_type': 'integer', 'min_value': 0, 'max_value': 100 } ]} ], 'states': [ { 'state_name': 'current_pos', 'current_value': str(roller.get('current_pos', 0)), 'unit': '%' }, { 'state_name': 'state', 'current_value': roller.get('state', 'stop'), 'unit': None } ] }) # Temperatur-Sensoren if 'tmp' in status: temp_data = status['tmp'] if 'tC' in temp_data: sensors.append({ 'type': 'ShellyTemperatureSensor', 'name': f"{device_name}_Temperature", 'url': f"http://{ip}/status", 'parameters': json.dumps({ 'device_type': device_type, 'generation': 1 }), 'states': [ { 'state_name': 'temperature', 'current_value': str(temp_data.get('tC', 0)), 'unit': '°C' } ] }) # Energie-Sensoren (Meters) if 'meters' in status: for i, meter in enumerate(status['meters']): sensors.append({ 'type': 'ShellyPowerMeter', 'name': f"{device_name}_Power_{i}", 'url': f"http://{ip}/status", 'parameters': json.dumps({ 'device_type': device_type, 'meter_id': i, 'generation': 1 }), 'states': [ { 'state_name': 'power', 'current_value': str(meter.get('power', 0)), 'unit': 'W' }, { 'state_name': 'total', 'current_value': str(meter.get('total', 0)), 'unit': 'Wh' } ] }) return actors, sensors def insert_actor(self, actor: Dict) -> Optional[int]: """ Fügt einen Aktor in die Datenbank ein Returns: Actor ID oder None bei Fehler """ try: cursor = self.connection.cursor() # Prüfe ob Aktor bereits existiert cursor.execute( "SELECT id FROM actors WHERE url = %s", (actor['url'],) ) result = cursor.fetchone() if result: actor_id = result[0] # Update bestehender Aktor cursor.execute( """UPDATE actors SET type = %s, name = %s, parameters = %s WHERE id = %s""", (actor['type'], actor['name'], actor['parameters'], actor_id) ) logger.info(f"Aktor aktualisiert: {actor['name']}") else: # Neuer Aktor cursor.execute( """INSERT INTO actors (type, name, parameters, url) VALUES (%s, %s, %s, %s)""", (actor['type'], actor['name'], actor['parameters'], actor['url']) ) actor_id = cursor.lastrowid logger.info(f"Neuer Aktor eingefügt: {actor['name']}") # Commands einfügen for command in actor.get('commands', []): cursor.execute( """INSERT INTO actor_commands (actor_id, command_name) VALUES (%s, %s) ON DUPLICATE KEY UPDATE command_name = command_name""", (actor_id, command['command_name']) ) command_id = cursor.lastrowid # Command Parameters einfügen for param in command.get('params', []): cursor.execute( """INSERT INTO command_parameters (command_id, parameter_name, parameter_type, min_value, max_value) VALUES (%s, %s, %s, %s, %s)""", (command_id, param['parameter_name'], param['parameter_type'], param.get('min_value'), param.get('max_value')) ) # States einfügen for state in actor.get('states', []): cursor.execute( """INSERT INTO actor_states (actor_id, state_name, current_value, unit) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE current_value = VALUES(current_value), last_updated = CURRENT_TIMESTAMP""", (actor_id, state['state_name'], state['current_value'], state['unit']) ) self.connection.commit() cursor.close() return actor_id except Error as e: logger.error(f"Fehler beim Einfügen des Aktors: {e}") self.connection.rollback() return None def insert_sensor(self, sensor: Dict) -> Optional[int]: """ Fügt einen Sensor in die Datenbank ein Returns: Sensor ID oder None bei Fehler """ try: cursor = self.connection.cursor() # Prüfe ob Sensor bereits existiert cursor.execute( "SELECT id FROM sensors WHERE url = %s", (sensor['url'],) ) result = cursor.fetchone() if result: sensor_id = result[0] # Update bestehender Sensor cursor.execute( """UPDATE sensors SET type = %s, name = %s, parameters = %s WHERE id = %s""", (sensor['type'], sensor['name'], sensor['parameters'], sensor_id) ) logger.info(f"Sensor aktualisiert: {sensor['name']}") else: # Neuer Sensor cursor.execute( """INSERT INTO sensors (type, name, parameters, url) VALUES (%s, %s, %s, %s)""", (sensor['type'], sensor['name'], sensor['parameters'], sensor['url']) ) sensor_id = cursor.lastrowid logger.info(f"Neuer Sensor eingefügt: {sensor['name']}") # States einfügen for state in sensor.get('states', []): cursor.execute( """INSERT INTO sensor_states (sensor_id, state_name, current_value, unit) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE current_value = VALUES(current_value), last_updated = CURRENT_TIMESTAMP""", (sensor_id, state['state_name'], state['current_value'], state['unit']) ) self.connection.commit() cursor.close() return sensor_id except Error as e: logger.error(f"Fehler beim Einfügen des Sensors: {e}") self.connection.rollback() return None def process_devices(self, devices: List[Dict]): """ Verarbeitet alle entdeckten Geräte und schreibt sie in die DB """ total_actors = 0 total_sensors = 0 for device in devices: logger.info(f"Verarbeite Gerät: {device['ip']}") if device['generation'] == 2: actors, sensors = self.parse_gen2_device(device) else: actors, sensors = self.parse_gen1_device(device) # Aktoren einfügen for actor in actors: if self.insert_actor(actor): total_actors += 1 # Sensoren einfügen for sensor in sensors: if self.insert_sensor(sensor): total_sensors += 1 logger.info(f"Verarbeitung abgeschlossen: {total_actors} Aktoren, {total_sensors} Sensoren") def main(): """Hauptfunktion""" parser = argparse.ArgumentParser( description='Findet Shelly-Geräte im Netzwerk und schreibt sie in die Datenbank' ) parser.add_argument( '--network', default='192.168.1', help='Netzwerk-Präfix (Standard: 192.168.1)' ) parser.add_argument( '--start-ip', type=int, default=1, help='Start IP (letztes Oktett, Standard: 1)' ) parser.add_argument( '--end-ip', type=int, default=254, help='End IP (letztes Oktett, Standard: 254)' ) parser.add_argument( '--db-host', default='localhost', help='Datenbank Host (Standard: localhost)' ) parser.add_argument( '--db-user', required=True, help='Datenbank Benutzer' ) parser.add_argument( '--db-password', required=True, help='Datenbank Passwort' ) parser.add_argument( '--db-name', default='EnergyFlow', help='Datenbank Name (Standard: EnergyFlow)' ) parser.add_argument( '--debug', action='store_true', help='Debug-Modus aktivieren' ) args = parser.parse_args() if args.debug: logger.setLevel(logging.DEBUG) # Discovery logger.info("Starte Shelly Device Discovery...") discovery = ShellyDiscovery(network_range=args.network) devices = discovery.discover_devices(start_ip=args.start_ip, end_ip=args.end_ip) if not devices: logger.warning("Keine Shelly-Geräte gefunden!") return # Datenbank-Schreibvorgang logger.info("Schreibe Geräte in Datenbank...") db_writer = ShellyDatabaseWriter( host=args.db_host, user=args.db_user, password=args.db_password, database=args.db_name ) try: db_writer.connect() db_writer.process_devices(devices) finally: db_writer.disconnect() logger.info("Fertig!") if __name__ == '__main__': main()