Smart-Dashboard/restricted/history/shelly_discovery.py
2026-02-14 19:47:21 +01:00

756 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Shelly Device Discovery Script
Findet alle Shelly-Geräte im lokalen Netzwerk und speichert Sensoren und Aktoren
in der Datenbank gemäß dem EnergyFlow Schema.
"""
import json
import requests
import socket
import mysql.connector
from mysql.connector import Error
from typing import List, Dict, Optional
import argparse
import logging
from datetime import datetime
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ShellyDiscovery:
"""Klasse zum Entdecken von Shelly-Geräten im Netzwerk"""
SHELLY_MDNS_SERVICE = "_http._tcp.local."
COMMON_PORTS = [80]
def __init__(self, network_range: str = "192.168.1"):
self.network_range = network_range
self.devices = []
def scan_network(self, start_ip: int = 1, end_ip: int = 254, timeout: float = 0.5) -> List[str]:
"""
Scannt das Netzwerk nach aktiven Hosts
Args:
start_ip: Start IP (letztes Oktett)
end_ip: End IP (letztes Oktett)
timeout: Timeout für Socket-Verbindung
Returns:
Liste von erreichbaren IP-Adressen
"""
active_hosts = []
logger.info(f"Scanne Netzwerk {self.network_range}.{start_ip}-{end_ip}...")
for i in range(start_ip, end_ip + 1):
ip = f"{self.network_range}.{i}"
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
result = sock.connect_ex((ip, 80))
if result == 0:
active_hosts.append(ip)
logger.debug(f"Host gefunden: {ip}")
except:
pass
finally:
sock.close()
logger.info(f"{len(active_hosts)} aktive Hosts gefunden")
return active_hosts
def is_shelly_device(self, ip: str) -> Optional[Dict]:
"""
Prüft ob ein Host ein Shelly-Gerät ist
Args:
ip: IP-Adresse des Hosts
Returns:
Device Info Dict wenn Shelly, sonst None
"""
try:
# Versuche Gen2 API (neuere Shelly-Geräte)
response = requests.get(
f"http://{ip}/rpc/Shelly.GetDeviceInfo",
timeout=2
)
if response.status_code == 200:
data = response.json()
logger.info(f"Shelly Gen2 Gerät gefunden: {ip} - {data.get('name', 'Unknown')}")
return {
'ip': ip,
'generation': 2,
'info': data
}
except:
pass
try:
# Versuche Gen1 API (ältere Shelly-Geräte)
response = requests.get(
f"http://{ip}/shelly",
timeout=2
)
if response.status_code == 200:
data = response.json()
if 'type' in data and data['type'].startswith('SHELLY'):
logger.info(f"Shelly Gen1 Gerät gefunden: {ip} - {data.get('type', 'Unknown')}")
return {
'ip': ip,
'generation': 1,
'info': data
}
except:
pass
return None
def get_device_status(self, device: Dict) -> Optional[Dict]:
"""
Holt den Status eines Shelly-Geräts
Args:
device: Device Info Dictionary
Returns:
Status Dictionary oder None
"""
ip = device['ip']
try:
if device['generation'] == 2:
# Gen2 Status
response = requests.get(
f"http://{ip}/rpc/Shelly.GetStatus",
timeout=2
)
if response.status_code == 200:
return response.json()
else:
# Gen1 Status
response = requests.get(
f"http://{ip}/status",
timeout=2
)
if response.status_code == 200:
return response.json()
except Exception as e:
logger.error(f"Fehler beim Abrufen des Status von {ip}: {e}")
return None
def discover_devices(self, start_ip: int = 1, end_ip: int = 254) -> List[Dict]:
"""
Entdeckt alle Shelly-Geräte im Netzwerk
Returns:
Liste von Shelly-Geräten mit Status
"""
active_hosts = self.scan_network(start_ip, end_ip)
for ip in active_hosts:
device = self.is_shelly_device(ip)
if device:
status = self.get_device_status(device)
device['status'] = status
self.devices.append(device)
logger.info(f"Insgesamt {len(self.devices)} Shelly-Geräte entdeckt")
return self.devices
class ShellyDatabaseWriter:
"""Klasse zum Schreiben der Shelly-Daten in die Datenbank"""
def __init__(self, host: str, user: str, password: str, database: str):
self.host = host
self.user = user
self.password = password
self.database = database
self.connection = None
def connect(self):
"""Stellt Verbindung zur Datenbank her"""
try:
self.connection = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4',
collation='utf8mb4_bin'
)
logger.info("Datenbankverbindung hergestellt")
except Error as e:
logger.error(f"Fehler bei Datenbankverbindung: {e}")
raise
def disconnect(self):
"""Schließt Datenbankverbindung"""
if self.connection and self.connection.is_connected():
self.connection.close()
logger.info("Datenbankverbindung geschlossen")
def parse_gen2_device(self, device: Dict) -> tuple:
"""
Parst Gen2 Shelly-Gerät und extrahiert Aktoren/Sensoren
Returns:
(actors, sensors) Tuple mit Listen
"""
actors = []
sensors = []
info = device.get('info', {})
status = device.get('status', {})
ip = device['ip']
device_name = info.get('name', f"Shelly_{info.get('id', ip)}")
device_model = info.get('model', 'Unknown')
# Switches als Aktoren
if 'switch:0' in status or 'switch' in status:
switch_count = 0
for key in status.keys():
if key.startswith('switch:'):
switch_count += 1
for i in range(switch_count):
switch_data = status.get(f'switch:{i}', {})
actors.append({
'type': f'ShellySwitch_{device_model}',
'name': f"{device_name}_Switch_{i}",
'url': f"http://{ip}/rpc/Switch.Set?id={i}",
'parameters': json.dumps({
'device_id': info.get('id'),
'switch_id': i,
'model': device_model,
'generation': 2
}),
'commands': [
{'command_name': 'turn_on', 'params': []},
{'command_name': 'turn_off', 'params': []},
{'command_name': 'toggle', 'params': []}
],
'states': [
{
'state_name': 'output',
'current_value': str(switch_data.get('output', False)),
'unit': None
}
]
})
# Cover/Roller als Aktoren
if 'cover:0' in status:
cover_count = 0
for key in status.keys():
if key.startswith('cover:'):
cover_count += 1
for i in range(cover_count):
cover_data = status.get(f'cover:{i}', {})
actors.append({
'type': f'ShellyCover_{device_model}',
'name': f"{device_name}_Cover_{i}",
'url': f"http://{ip}/rpc/Cover.GoToPosition?id={i}",
'parameters': json.dumps({
'device_id': info.get('id'),
'cover_id': i,
'model': device_model,
'generation': 2
}),
'commands': [
{'command_name': 'open', 'params': []},
{'command_name': 'close', 'params': []},
{'command_name': 'stop', 'params': []},
{'command_name': 'set_position', 'params': [
{
'parameter_name': 'position',
'parameter_type': 'integer',
'min_value': 0,
'max_value': 100
}
]}
],
'states': [
{
'state_name': 'current_pos',
'current_value': str(cover_data.get('current_pos', 0)),
'unit': '%'
},
{
'state_name': 'state',
'current_value': cover_data.get('state', 'unknown'),
'unit': None
}
]
})
# Temperatur-Sensoren
for key in status.keys():
if key.startswith('temperature:'):
temp_id = key.split(':')[1]
temp_data = status[key]
sensors.append({
'type': 'ShellyTemperatureSensor',
'name': f"{device_name}_Temperature_{temp_id}",
'url': f"http://{ip}/rpc/Temperature.GetStatus?id={temp_id}",
'parameters': json.dumps({
'device_id': info.get('id'),
'sensor_id': temp_id,
'model': device_model
}),
'states': [
{
'state_name': 'temperature',
'current_value': str(temp_data.get('tC', 0)),
'unit': '°C'
}
]
})
# Humidity-Sensoren
for key in status.keys():
if key.startswith('humidity:'):
hum_id = key.split(':')[1]
hum_data = status[key]
sensors.append({
'type': 'ShellyHumiditySensor',
'name': f"{device_name}_Humidity_{hum_id}",
'url': f"http://{ip}/rpc/Humidity.GetStatus?id={hum_id}",
'parameters': json.dumps({
'device_id': info.get('id'),
'sensor_id': hum_id,
'model': device_model
}),
'states': [
{
'state_name': 'humidity',
'current_value': str(hum_data.get('rh', 0)),
'unit': '%'
}
]
})
# Energie-Sensoren (Power Meter)
for i in range(10): # Max 10 switches/covers prüfen
switch_key = f'switch:{i}'
if switch_key in status:
switch_data = status[switch_key]
if 'apower' in switch_data: # Aktuelle Leistung
sensors.append({
'type': 'ShellyPowerMeter',
'name': f"{device_name}_Power_{i}",
'url': f"http://{ip}/rpc/Switch.GetStatus?id={i}",
'parameters': json.dumps({
'device_id': info.get('id'),
'switch_id': i,
'model': device_model
}),
'states': [
{
'state_name': 'active_power',
'current_value': str(switch_data.get('apower', 0)),
'unit': 'W'
},
{
'state_name': 'voltage',
'current_value': str(switch_data.get('voltage', 0)),
'unit': 'V'
},
{
'state_name': 'current',
'current_value': str(switch_data.get('current', 0)),
'unit': 'A'
}
]
})
return actors, sensors
def parse_gen1_device(self, device: Dict) -> tuple:
"""
Parst Gen1 Shelly-Gerät und extrahiert Aktoren/Sensoren
Returns:
(actors, sensors) Tuple mit Listen
"""
actors = []
sensors = []
info = device.get('info', {})
status = device.get('status', {})
ip = device['ip']
device_type = info.get('type', 'Unknown')
device_name = info.get('name', f"Shelly_{device_type}_{ip}")
# Relays als Aktoren
if 'relays' in status:
for i, relay in enumerate(status['relays']):
actors.append({
'type': f'ShellyRelay_{device_type}',
'name': f"{device_name}_Relay_{i}",
'url': f"http://{ip}/relay/{i}",
'parameters': json.dumps({
'device_type': device_type,
'relay_id': i,
'generation': 1
}),
'commands': [
{'command_name': 'turn_on', 'params': []},
{'command_name': 'turn_off', 'params': []},
{'command_name': 'toggle', 'params': []}
],
'states': [
{
'state_name': 'ison',
'current_value': str(relay.get('ison', False)),
'unit': None
}
]
})
# Rollers als Aktoren
if 'rollers' in status:
for i, roller in enumerate(status['rollers']):
actors.append({
'type': f'ShellyRoller_{device_type}',
'name': f"{device_name}_Roller_{i}",
'url': f"http://{ip}/roller/{i}",
'parameters': json.dumps({
'device_type': device_type,
'roller_id': i,
'generation': 1
}),
'commands': [
{'command_name': 'open', 'params': []},
{'command_name': 'close', 'params': []},
{'command_name': 'stop', 'params': []},
{'command_name': 'go_to_position', 'params': [
{
'parameter_name': 'position',
'parameter_type': 'integer',
'min_value': 0,
'max_value': 100
}
]}
],
'states': [
{
'state_name': 'current_pos',
'current_value': str(roller.get('current_pos', 0)),
'unit': '%'
},
{
'state_name': 'state',
'current_value': roller.get('state', 'stop'),
'unit': None
}
]
})
# Temperatur-Sensoren
if 'tmp' in status:
temp_data = status['tmp']
if 'tC' in temp_data:
sensors.append({
'type': 'ShellyTemperatureSensor',
'name': f"{device_name}_Temperature",
'url': f"http://{ip}/status",
'parameters': json.dumps({
'device_type': device_type,
'generation': 1
}),
'states': [
{
'state_name': 'temperature',
'current_value': str(temp_data.get('tC', 0)),
'unit': '°C'
}
]
})
# Energie-Sensoren (Meters)
if 'meters' in status:
for i, meter in enumerate(status['meters']):
sensors.append({
'type': 'ShellyPowerMeter',
'name': f"{device_name}_Power_{i}",
'url': f"http://{ip}/status",
'parameters': json.dumps({
'device_type': device_type,
'meter_id': i,
'generation': 1
}),
'states': [
{
'state_name': 'power',
'current_value': str(meter.get('power', 0)),
'unit': 'W'
},
{
'state_name': 'total',
'current_value': str(meter.get('total', 0)),
'unit': 'Wh'
}
]
})
return actors, sensors
def insert_actor(self, actor: Dict) -> Optional[int]:
"""
Fügt einen Aktor in die Datenbank ein
Returns:
Actor ID oder None bei Fehler
"""
try:
cursor = self.connection.cursor()
# Prüfe ob Aktor bereits existiert
cursor.execute(
"SELECT id FROM actors WHERE url = %s",
(actor['url'],)
)
result = cursor.fetchone()
if result:
actor_id = result[0]
# Update bestehender Aktor
cursor.execute(
"""UPDATE actors
SET type = %s, name = %s, parameters = %s
WHERE id = %s""",
(actor['type'], actor['name'], actor['parameters'], actor_id)
)
logger.info(f"Aktor aktualisiert: {actor['name']}")
else:
# Neuer Aktor
cursor.execute(
"""INSERT INTO actors (type, name, parameters, url)
VALUES (%s, %s, %s, %s)""",
(actor['type'], actor['name'], actor['parameters'], actor['url'])
)
actor_id = cursor.lastrowid
logger.info(f"Neuer Aktor eingefügt: {actor['name']}")
# Commands einfügen
for command in actor.get('commands', []):
cursor.execute(
"""INSERT INTO actor_commands (actor_id, command_name)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE command_name = command_name""",
(actor_id, command['command_name'])
)
command_id = cursor.lastrowid
# Command Parameters einfügen
for param in command.get('params', []):
cursor.execute(
"""INSERT INTO command_parameters
(command_id, parameter_name, parameter_type, min_value, max_value)
VALUES (%s, %s, %s, %s, %s)""",
(command_id, param['parameter_name'], param['parameter_type'],
param.get('min_value'), param.get('max_value'))
)
# States einfügen
for state in actor.get('states', []):
cursor.execute(
"""INSERT INTO actor_states
(actor_id, state_name, current_value, unit)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
current_value = VALUES(current_value),
last_updated = CURRENT_TIMESTAMP""",
(actor_id, state['state_name'], state['current_value'], state['unit'])
)
self.connection.commit()
cursor.close()
return actor_id
except Error as e:
logger.error(f"Fehler beim Einfügen des Aktors: {e}")
self.connection.rollback()
return None
def insert_sensor(self, sensor: Dict) -> Optional[int]:
"""
Fügt einen Sensor in die Datenbank ein
Returns:
Sensor ID oder None bei Fehler
"""
try:
cursor = self.connection.cursor()
# Prüfe ob Sensor bereits existiert
cursor.execute(
"SELECT id FROM sensors WHERE url = %s",
(sensor['url'],)
)
result = cursor.fetchone()
if result:
sensor_id = result[0]
# Update bestehender Sensor
cursor.execute(
"""UPDATE sensors
SET type = %s, name = %s, parameters = %s
WHERE id = %s""",
(sensor['type'], sensor['name'], sensor['parameters'], sensor_id)
)
logger.info(f"Sensor aktualisiert: {sensor['name']}")
else:
# Neuer Sensor
cursor.execute(
"""INSERT INTO sensors (type, name, parameters, url)
VALUES (%s, %s, %s, %s)""",
(sensor['type'], sensor['name'], sensor['parameters'], sensor['url'])
)
sensor_id = cursor.lastrowid
logger.info(f"Neuer Sensor eingefügt: {sensor['name']}")
# States einfügen
for state in sensor.get('states', []):
cursor.execute(
"""INSERT INTO sensor_states
(sensor_id, state_name, current_value, unit)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
current_value = VALUES(current_value),
last_updated = CURRENT_TIMESTAMP""",
(sensor_id, state['state_name'], state['current_value'], state['unit'])
)
self.connection.commit()
cursor.close()
return sensor_id
except Error as e:
logger.error(f"Fehler beim Einfügen des Sensors: {e}")
self.connection.rollback()
return None
def process_devices(self, devices: List[Dict]):
"""
Verarbeitet alle entdeckten Geräte und schreibt sie in die DB
"""
total_actors = 0
total_sensors = 0
for device in devices:
logger.info(f"Verarbeite Gerät: {device['ip']}")
if device['generation'] == 2:
actors, sensors = self.parse_gen2_device(device)
else:
actors, sensors = self.parse_gen1_device(device)
# Aktoren einfügen
for actor in actors:
if self.insert_actor(actor):
total_actors += 1
# Sensoren einfügen
for sensor in sensors:
if self.insert_sensor(sensor):
total_sensors += 1
logger.info(f"Verarbeitung abgeschlossen: {total_actors} Aktoren, {total_sensors} Sensoren")
def main():
"""Hauptfunktion"""
parser = argparse.ArgumentParser(
description='Findet Shelly-Geräte im Netzwerk und schreibt sie in die Datenbank'
)
parser.add_argument(
'--network',
default='192.168.1',
help='Netzwerk-Präfix (Standard: 192.168.1)'
)
parser.add_argument(
'--start-ip',
type=int,
default=1,
help='Start IP (letztes Oktett, Standard: 1)'
)
parser.add_argument(
'--end-ip',
type=int,
default=254,
help='End IP (letztes Oktett, Standard: 254)'
)
parser.add_argument(
'--db-host',
default='localhost',
help='Datenbank Host (Standard: localhost)'
)
parser.add_argument(
'--db-user',
required=True,
help='Datenbank Benutzer'
)
parser.add_argument(
'--db-password',
required=True,
help='Datenbank Passwort'
)
parser.add_argument(
'--db-name',
default='EnergyFlow',
help='Datenbank Name (Standard: EnergyFlow)'
)
parser.add_argument(
'--debug',
action='store_true',
help='Debug-Modus aktivieren'
)
args = parser.parse_args()
if args.debug:
logger.setLevel(logging.DEBUG)
# Discovery
logger.info("Starte Shelly Device Discovery...")
discovery = ShellyDiscovery(network_range=args.network)
devices = discovery.discover_devices(start_ip=args.start_ip, end_ip=args.end_ip)
if not devices:
logger.warning("Keine Shelly-Geräte gefunden!")
return
# Datenbank-Schreibvorgang
logger.info("Schreibe Geräte in Datenbank...")
db_writer = ShellyDatabaseWriter(
host=args.db_host,
user=args.db_user,
password=args.db_password,
database=args.db_name
)
try:
db_writer.connect()
db_writer.process_devices(devices)
finally:
db_writer.disconnect()
logger.info("Fertig!")
if __name__ == '__main__':
main()