599 lines
20 KiB
Python
599 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Somfy Tahoma Local API to MySQL Database Script
|
|
Liest alle Aktoren und Sensoren aus der Tahoma Box und speichert sie in MySQL
|
|
"""
|
|
|
|
import requests
|
|
import pymysql
|
|
from pymysql import Error
|
|
import json
|
|
import logging
|
|
from typing import List, Dict, Optional
|
|
import urllib3
|
|
|
|
# SSL-Warnungen deaktivieren (Tahoma verwendet selbst-signierte Zertifikate)
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
# Logging konfigurieren
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TahomaAPI:
|
|
"""Klasse für die Kommunikation mit der Tahoma Local API"""
|
|
|
|
def __init__(self, gateway_ip: str, api_token: str):
|
|
"""
|
|
Initialisiert die Tahoma API Verbindung
|
|
|
|
Args:
|
|
gateway_ip: IP-Adresse der Tahoma Box
|
|
api_token: API Token (Bearer Token)
|
|
"""
|
|
self.base_url = f"https://{gateway_ip}:8443/enduser-mobile-web/1/enduserAPI"
|
|
self.headers = {
|
|
"Authorization": f"Bearer {api_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
def get_setup(self) -> Optional[Dict]:
|
|
"""
|
|
Ruft die komplette Setup-Konfiguration ab
|
|
|
|
Returns:
|
|
Dictionary mit allen Geräten oder None bei Fehler
|
|
"""
|
|
try:
|
|
url = f"{self.base_url}/setup"
|
|
response = requests.get(url, headers=self.headers, verify=False, timeout=10)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Fehler beim Abrufen der Setup-Daten: {e}")
|
|
return None
|
|
|
|
def get_devices(self) -> List[Dict]:
|
|
"""
|
|
Extrahiert alle Geräte aus dem Setup
|
|
|
|
Returns:
|
|
Liste aller Geräte
|
|
"""
|
|
setup = self.get_setup()
|
|
if not setup:
|
|
return []
|
|
|
|
devices = setup.get('devices', [])
|
|
logger.info(f"{len(devices)} Geräte gefunden")
|
|
return devices
|
|
|
|
def get_device_definition(self, device_url: str) -> Optional[Dict]:
|
|
"""
|
|
Ruft die detaillierte Definition eines Geräts ab
|
|
|
|
Args:
|
|
device_url: URL des Geräts
|
|
|
|
Returns:
|
|
Dictionary mit Gerätedefinition oder None bei Fehler
|
|
"""
|
|
try:
|
|
# Device URL encodieren
|
|
from urllib.parse import quote
|
|
encoded_url = quote(device_url, safe='')
|
|
url = f"{self.base_url}/setup/devices/{encoded_url}"
|
|
response = requests.get(url, headers=self.headers, verify=False, timeout=10)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.debug(f"Fehler beim Abrufen der Device-Definition für {device_url}: {e}")
|
|
return None
|
|
|
|
def get_device_states(self, device_url: str) -> List[Dict]:
|
|
"""
|
|
Ruft die aktuellen States eines Geräts ab
|
|
|
|
Args:
|
|
device_url: URL des Geräts
|
|
|
|
Returns:
|
|
Liste der States
|
|
"""
|
|
try:
|
|
from urllib.parse import quote
|
|
encoded_url = quote(device_url, safe='')
|
|
url = f"{self.base_url}/setup/devices/{encoded_url}/states"
|
|
response = requests.get(url, headers=self.headers, verify=False, timeout=10)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.debug(f"Fehler beim Abrufen der Device-States für {device_url}: {e}")
|
|
return []
|
|
|
|
|
|
class DatabaseManager:
|
|
"""Klasse für die MySQL-Datenbankoperationen"""
|
|
|
|
def __init__(self, host: str, database: str, user: str, password: str, port: int = 3306):
|
|
"""
|
|
Initialisiert die Datenbankverbindung
|
|
|
|
Args:
|
|
host: MySQL Host
|
|
database: Datenbankname
|
|
user: Benutzername
|
|
password: Passwort
|
|
port: Port (Standard: 3306)
|
|
"""
|
|
self.host = host
|
|
self.database = database
|
|
self.user = user
|
|
self.password = password
|
|
self.port = port
|
|
self.connection = None
|
|
|
|
def connect(self) -> bool:
|
|
"""
|
|
Stellt Verbindung zur Datenbank her
|
|
|
|
Returns:
|
|
True bei Erfolg, False bei Fehler
|
|
"""
|
|
try:
|
|
self.connection = pymysql.connect(
|
|
host=self.host,
|
|
database=self.database,
|
|
user=self.user,
|
|
password=self.password,
|
|
port=self.port,
|
|
charset='utf8mb4'
|
|
)
|
|
logger.info("Erfolgreich mit MariaDB/MySQL-Datenbank verbunden")
|
|
return True
|
|
except Error as e:
|
|
logger.error(f"Fehler bei der Datenbankverbindung: {e}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Schließt die Datenbankverbindung"""
|
|
if self.connection:
|
|
self.connection.close()
|
|
logger.info("Datenbankverbindung geschlossen")
|
|
|
|
def clear_tables(self):
|
|
"""Löscht alle Einträge aus allen Tabellen"""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
# Foreign Key Constraints temporär deaktivieren
|
|
cursor.execute("SET FOREIGN_KEY_CHECKS=0")
|
|
|
|
# Alle Tabellen leeren
|
|
cursor.execute("DELETE FROM command_parameters")
|
|
cursor.execute("DELETE FROM actor_commands")
|
|
cursor.execute("DELETE FROM actor_states")
|
|
cursor.execute("DELETE FROM actors")
|
|
|
|
cursor.execute("DELETE FROM sensor_states")
|
|
cursor.execute("DELETE FROM sensors")
|
|
|
|
# Foreign Key Constraints wieder aktivieren
|
|
cursor.execute("SET FOREIGN_KEY_CHECKS=1")
|
|
|
|
self.connection.commit()
|
|
logger.info("Alle Tabellen geleert")
|
|
cursor.close()
|
|
except Error as e:
|
|
logger.error(f"Fehler beim Leeren der Tabellen: {e}")
|
|
self.connection.rollback()
|
|
|
|
def insert_actor(self, device_type: str, name: str, url: str,
|
|
commands: list, states: list) -> bool:
|
|
"""
|
|
Fügt einen Aktor mit Commands und States in die Datenbank ein
|
|
|
|
Args:
|
|
device_type: Gerätetyp (z.B. RollerShutter)
|
|
name: Gerätename
|
|
url: URL zum Gerät
|
|
commands: Liste der Commands mit Parametern
|
|
states: Liste der States
|
|
|
|
Returns:
|
|
True bei Erfolg, False bei Fehler
|
|
"""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
# 1. Aktor einfügen
|
|
query = """
|
|
INSERT INTO actors (type, name, parameters, url)
|
|
VALUES (%s, %s, NULL, %s)
|
|
"""
|
|
cursor.execute(query, (device_type, name, url))
|
|
actor_id = cursor.lastrowid
|
|
|
|
# 2. Commands einfügen
|
|
for cmd in commands:
|
|
command_name = cmd.get('command', '')
|
|
|
|
# Command einfügen
|
|
cmd_query = """
|
|
INSERT INTO actor_commands (actor_id, command_name)
|
|
VALUES (%s, %s)
|
|
"""
|
|
cursor.execute(cmd_query, (actor_id, command_name))
|
|
command_id = cursor.lastrowid
|
|
|
|
# Parameter des Commands einfügen
|
|
cmd_params = cmd.get('parameters', [])
|
|
for param in cmd_params:
|
|
param_query = """
|
|
INSERT INTO command_parameters
|
|
(command_id, parameter_name, parameter_type, min_value, max_value, possible_values)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
"""
|
|
param_name = param.get('name', '')
|
|
param_type = param.get('type', '')
|
|
min_val = param.get('min')
|
|
max_val = param.get('max')
|
|
possible_vals = json.dumps(param.get('values')) if 'values' in param else None
|
|
|
|
cursor.execute(param_query,
|
|
(command_id, param_name, param_type, min_val, max_val, possible_vals))
|
|
|
|
# 3. States einfügen
|
|
for state in states:
|
|
state_query = """
|
|
INSERT INTO actor_states
|
|
(actor_id, state_name, state_type, current_value)
|
|
VALUES (%s, %s, %s, %s)
|
|
"""
|
|
state_name = state.get('name', '')
|
|
state_type = state.get('type', 0)
|
|
current_value = str(state.get('current_value', '')) if 'current_value' in state else None
|
|
|
|
cursor.execute(state_query, (actor_id, state_name, state_type, current_value))
|
|
|
|
self.connection.commit()
|
|
cursor.close()
|
|
return True
|
|
|
|
except Error as e:
|
|
logger.error(f"Fehler beim Einfügen des Aktors {name}: {e}")
|
|
self.connection.rollback()
|
|
return False
|
|
|
|
def insert_sensor(self, device_type: str, name: str, url: str,
|
|
states: list) -> bool:
|
|
"""
|
|
Fügt einen Sensor mit States in die Datenbank ein
|
|
|
|
Args:
|
|
device_type: Gerätetyp (z.B. TemperatureSensor)
|
|
name: Gerätename
|
|
url: URL zum Gerät
|
|
states: Liste der States
|
|
|
|
Returns:
|
|
True bei Erfolg, False bei Fehler
|
|
"""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
# 1. Sensor einfügen
|
|
query = """
|
|
INSERT INTO sensors (type, name, parameters, url)
|
|
VALUES (%s, %s, NULL, %s)
|
|
"""
|
|
cursor.execute(query, (device_type, name, url))
|
|
sensor_id = cursor.lastrowid
|
|
|
|
# 2. States einfügen
|
|
for state in states:
|
|
state_query = """
|
|
INSERT INTO sensor_states
|
|
(sensor_id, state_name, state_type, current_value)
|
|
VALUES (%s, %s, %s, %s)
|
|
"""
|
|
state_name = state.get('name', '')
|
|
state_type = state.get('type', 0)
|
|
current_value = str(state.get('current_value', '')) if 'current_value' in state else None
|
|
|
|
cursor.execute(state_query, (sensor_id, state_name, state_type, current_value))
|
|
|
|
self.connection.commit()
|
|
cursor.close()
|
|
return True
|
|
|
|
except Error as e:
|
|
logger.error(f"Fehler beim Einfügen des Sensors {name}: {e}")
|
|
self.connection.rollback()
|
|
return False
|
|
|
|
|
|
class DeviceClassifier:
|
|
"""Klassifiziert Geräte als Aktoren oder Sensoren"""
|
|
|
|
# Bekannte Aktor-Typen (können erweitert werden)
|
|
ACTOR_TYPES = {
|
|
'RollerShutter', 'ExteriorScreen', 'Awning', 'Blind',
|
|
'GarageDoor', 'Window', 'Light', 'OnOff', 'DimmableLight',
|
|
'HeatingSystem', 'Valve', 'Switch', 'Door', 'Curtain',
|
|
'VenetianBlind', 'PergolaScreen'
|
|
}
|
|
|
|
# Bekannte Sensor-Typen (können erweitert werden)
|
|
SENSOR_TYPES = {
|
|
'TemperatureSensor', 'LightSensor', 'HumiditySensor',
|
|
'ContactSensor', 'OccupancySensor', 'SmokeSensor',
|
|
'WaterDetectionSensor', 'WindowHandle', 'MotionSensor',
|
|
'SunSensor', 'WindSensor', 'RainSensor', 'ConsumptionSensor'
|
|
}
|
|
|
|
@classmethod
|
|
def is_actor(cls, device: Dict) -> bool:
|
|
"""
|
|
Prüft, ob ein Gerät ein Aktor ist
|
|
|
|
Args:
|
|
device: Geräte-Dictionary
|
|
|
|
Returns:
|
|
True wenn Aktor, False sonst
|
|
"""
|
|
device_type = device.get('uiClass', '')
|
|
|
|
# Prüfung nach bekannten Typen
|
|
if device_type in cls.ACTOR_TYPES:
|
|
return True
|
|
|
|
# Prüfung nach Commandos (Aktoren haben typischerweise Commands)
|
|
commands = device.get('definition', {}).get('commands', [])
|
|
if commands and len(commands) > 0:
|
|
# Wenn Commands wie open, close, on, off existieren
|
|
command_names = [cmd.get('commandName', '') for cmd in commands]
|
|
actor_commands = {'open', 'close', 'on', 'off', 'up', 'down', 'setPosition', 'dim'}
|
|
if any(cmd in actor_commands for cmd in command_names):
|
|
return True
|
|
|
|
return False
|
|
|
|
@classmethod
|
|
def is_sensor(cls, device: Dict) -> bool:
|
|
"""
|
|
Prüft, ob ein Gerät ein Sensor ist
|
|
|
|
Args:
|
|
device: Geräte-Dictionary
|
|
|
|
Returns:
|
|
True wenn Sensor, False sonst
|
|
"""
|
|
device_type = device.get('uiClass', '')
|
|
|
|
# Prüfung nach bekannten Typen
|
|
if device_type in cls.SENSOR_TYPES:
|
|
return True
|
|
|
|
# Prüfung nach States (Sensoren haben typischerweise nur States, keine Commands)
|
|
states = device.get('states', [])
|
|
commands = device.get('definition', {}).get('commands', [])
|
|
|
|
# Sensor hat States aber keine oder nur wenige Commands
|
|
if states and len(states) > 0 and len(commands) <= 1:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def extract_actor_data(device: Dict) -> tuple:
|
|
"""
|
|
Extrahiert Commands und States aus einem Aktor
|
|
|
|
Args:
|
|
device: Geräte-Dictionary von der Tahoma API
|
|
|
|
Returns:
|
|
Tuple (commands_list, states_list)
|
|
"""
|
|
commands = []
|
|
states = []
|
|
|
|
# Commands aus der Definition extrahieren
|
|
cmd_definitions = device.get('definition', {}).get('commands', [])
|
|
|
|
for cmd in cmd_definitions:
|
|
command_name = cmd.get('commandName', '')
|
|
|
|
command_entry = {
|
|
'command': command_name,
|
|
'parameters': []
|
|
}
|
|
|
|
# Alle Parameter des Commands durchgehen
|
|
cmd_params = cmd.get('parameters', [])
|
|
for cmd_param in cmd_params:
|
|
param_detail = {
|
|
'name': cmd_param.get('name', 'value')
|
|
}
|
|
|
|
# Datentyp
|
|
param_type = cmd_param.get('type')
|
|
if param_type:
|
|
param_detail['type'] = param_type
|
|
|
|
# Min/Max Werte für numerische Parameter
|
|
if 'min' in cmd_param:
|
|
param_detail['min'] = cmd_param['min']
|
|
if 'max' in cmd_param:
|
|
param_detail['max'] = cmd_param['max']
|
|
|
|
# Mögliche Werte (enum)
|
|
if 'values' in cmd_param:
|
|
param_detail['values'] = cmd_param['values']
|
|
|
|
command_entry['parameters'].append(param_detail)
|
|
|
|
commands.append(command_entry)
|
|
|
|
# States extrahieren
|
|
state_definitions = device.get('states', [])
|
|
for state in state_definitions:
|
|
state_name = state.get('name', '')
|
|
if state_name:
|
|
state_entry = {
|
|
'name': state_name,
|
|
'type': state.get('type', 0)
|
|
}
|
|
if 'value' in state:
|
|
state_entry['current_value'] = state['value']
|
|
states.append(state_entry)
|
|
|
|
return commands, states
|
|
|
|
|
|
def extract_sensor_data(device: Dict) -> list:
|
|
"""
|
|
Extrahiert States aus einem Sensor
|
|
|
|
Args:
|
|
device: Geräte-Dictionary von der Tahoma API
|
|
|
|
Returns:
|
|
Liste der States
|
|
"""
|
|
states = []
|
|
|
|
# States extrahieren
|
|
state_definitions = device.get('states', [])
|
|
for state in state_definitions:
|
|
state_name = state.get('name', '')
|
|
if state_name:
|
|
state_entry = {
|
|
'name': state_name,
|
|
'type': state.get('type', 0)
|
|
}
|
|
|
|
# Aktueller Wert falls vorhanden
|
|
if 'value' in state:
|
|
state_entry['current_value'] = state['value']
|
|
|
|
states.append(state_entry)
|
|
|
|
return states
|
|
|
|
|
|
def process_devices(tahoma: TahomaAPI, db: DatabaseManager, clear_before_insert: bool = True):
|
|
"""
|
|
Verarbeitet alle Geräte und speichert sie in der Datenbank
|
|
|
|
Args:
|
|
tahoma: TahomaAPI Instanz
|
|
db: DatabaseManager Instanz
|
|
clear_before_insert: Tabellen vor dem Einfügen leeren (Standard: True)
|
|
"""
|
|
# Geräte von der API abrufen
|
|
devices = tahoma.get_devices()
|
|
|
|
if not devices:
|
|
logger.warning("Keine Geräte gefunden")
|
|
return
|
|
|
|
# Optional: Tabellen leeren
|
|
if clear_before_insert:
|
|
db.clear_tables()
|
|
|
|
actor_count = 0
|
|
sensor_count = 0
|
|
unknown_count = 0
|
|
|
|
for device in devices:
|
|
device_url = device.get('deviceURL', '')
|
|
device_name = device.get('label', 'Unbekannt')
|
|
device_type = device.get('uiClass', 'Unknown')
|
|
|
|
# Gerät klassifizieren
|
|
is_actor = DeviceClassifier.is_actor(device)
|
|
is_sensor = DeviceClassifier.is_sensor(device)
|
|
|
|
if is_actor:
|
|
# Daten extrahieren
|
|
commands, states = extract_actor_data(device)
|
|
|
|
# In Datenbank speichern
|
|
if db.insert_actor(device_type, device_name, device_url, commands, states):
|
|
actor_count += 1
|
|
logger.info(f"Aktor hinzugefügt: {device_name} ({device_type}) - "
|
|
f"{len(commands)} Commands, {len(states)} States")
|
|
|
|
elif is_sensor:
|
|
# Daten extrahieren
|
|
states = extract_sensor_data(device)
|
|
|
|
# In Datenbank speichern
|
|
if db.insert_sensor(device_type, device_name, device_url, states):
|
|
sensor_count += 1
|
|
logger.info(f"Sensor hinzugefügt: {device_name} ({device_type}) - "
|
|
f"{len(states)} States")
|
|
|
|
else:
|
|
unknown_count += 1
|
|
logger.warning(f"Unbekanntes Gerät: {device_name} ({device_type})")
|
|
|
|
logger.info(f"\nZusammenfassung:")
|
|
logger.info(f"Aktoren gespeichert: {actor_count}")
|
|
logger.info(f"Sensoren gespeichert: {sensor_count}")
|
|
logger.info(f"Unbekannte Geräte: {unknown_count}")
|
|
|
|
|
|
def main():
|
|
"""Hauptfunktion"""
|
|
|
|
# ===== KONFIGURATION =====
|
|
# Tahoma Box Einstellungen
|
|
TAHOMA_IP = "192.168.1.XXX" # IP-Adresse Ihrer Tahoma Box
|
|
TAHOMA_TOKEN = "YOUR_API_TOKEN_HERE" # Ihr API Token
|
|
|
|
# MySQL Datenbank Einstellungen
|
|
DB_HOST = "localhost"
|
|
DB_NAME = "EnergyFlow"
|
|
DB_USER = "your_username"
|
|
DB_PASSWORD = "your_password"
|
|
DB_PORT = 3306
|
|
|
|
# Optionen
|
|
CLEAR_TABLES = True # Tabellen vor dem Import leeren
|
|
# =========================
|
|
|
|
# Tahoma API initialisieren
|
|
logger.info("Verbinde mit Tahoma Box...")
|
|
tahoma = TahomaAPI(TAHOMA_IP, TAHOMA_TOKEN)
|
|
|
|
# Datenbank initialisieren
|
|
logger.info("Verbinde mit MySQL-Datenbank...")
|
|
db = DatabaseManager(DB_HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT)
|
|
|
|
if not db.connect():
|
|
logger.error("Datenbankverbindung fehlgeschlagen. Abbruch.")
|
|
return
|
|
|
|
try:
|
|
# Geräte verarbeiten und in Datenbank speichern
|
|
process_devices(tahoma, db, clear_before_insert=CLEAR_TABLES)
|
|
logger.info("Import erfolgreich abgeschlossen!")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fehler während der Verarbeitung: {e}")
|
|
|
|
finally:
|
|
# Datenbankverbindung schließen
|
|
db.disconnect()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|