314 lines
11 KiB
Python
314 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
WLED Module
|
|
Enthält NUR WLED-spezifische Geräte-Discovery Logik
|
|
KEINE Datenbank-Operationen!
|
|
"""
|
|
|
|
import requests
|
|
import socket
|
|
import concurrent.futures
|
|
import logging
|
|
from typing import List, Dict, Optional, Tuple
|
|
from modules.base_module import BaseModule
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WLEDDiscovery:
|
|
"""Original WLEDDiscovery Klasse - unverändert"""
|
|
|
|
@staticmethod
|
|
def discover_devices(timeout: int = 5) -> List[str]:
|
|
"""Sucht nach WLED-Geräten via mDNS"""
|
|
try:
|
|
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf
|
|
import time
|
|
|
|
class WLEDListener(ServiceListener):
|
|
def __init__(self):
|
|
self.devices = []
|
|
|
|
def add_service(self, zc, type_, name):
|
|
info = zc.get_service_info(type_, name)
|
|
if info:
|
|
addresses = [socket.inet_ntoa(addr) for addr in info.addresses]
|
|
for addr in addresses:
|
|
if addr not in self.devices:
|
|
self.devices.append(addr)
|
|
logger.info(f"WLED-Gerät gefunden: {name} ({addr})")
|
|
|
|
def remove_service(self, zc, type_, name):
|
|
pass
|
|
|
|
def update_service(self, zc, type_, name):
|
|
pass
|
|
|
|
zeroconf = Zeroconf()
|
|
listener = WLEDListener()
|
|
browser = ServiceBrowser(zeroconf, "_http._tcp.local.", listener)
|
|
|
|
logger.info(f"Suche nach WLED-Geräten (Timeout: {timeout}s)...")
|
|
time.sleep(timeout)
|
|
|
|
zeroconf.close()
|
|
|
|
# Nur WLED-Geräte filtern
|
|
wled_devices = []
|
|
for ip in listener.devices:
|
|
if WLEDDiscovery.is_wled_device(ip):
|
|
wled_devices.append(ip)
|
|
|
|
logger.info(f"{len(wled_devices)} WLED-Geräte gefunden")
|
|
return wled_devices
|
|
|
|
except ImportError:
|
|
logger.warning("zeroconf nicht installiert. Verwende Netzwerk-Scan...")
|
|
return WLEDDiscovery.scan_network()
|
|
except Exception as e:
|
|
logger.error(f"Fehler bei WLED-Discovery: {e}")
|
|
return []
|
|
|
|
@staticmethod
|
|
def scan_network(network: str = None, max_threads: int = 50) -> List[str]:
|
|
"""Scannt das Netzwerk nach WLED-Geräten"""
|
|
if network is None:
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.connect(("8.8.8.8", 80))
|
|
local_ip = s.getsockname()[0]
|
|
s.close()
|
|
network_prefix = '.'.join(local_ip.split('.')[:-1])
|
|
except:
|
|
logger.warning("Konnte lokale IP nicht ermitteln, verwende 192.168.1.x")
|
|
network_prefix = "192.168.1"
|
|
else:
|
|
network_prefix = '.'.join(network.split('.')[:3])
|
|
|
|
logger.info(f"Scanne Netzwerk {network_prefix}.0/24 nach WLED-Geräten...")
|
|
|
|
def check_ip(ip):
|
|
if WLEDDiscovery.is_wled_device(ip):
|
|
return ip
|
|
return None
|
|
|
|
wled_devices = []
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
|
|
futures = [executor.submit(check_ip, f"{network_prefix}.{i}")
|
|
for i in range(1, 255)]
|
|
|
|
for future in concurrent.futures.as_completed(futures):
|
|
result = future.result()
|
|
if result:
|
|
wled_devices.append(result)
|
|
logger.info(f"WLED-Gerät gefunden: {result}")
|
|
|
|
return wled_devices
|
|
|
|
@staticmethod
|
|
def is_wled_device(ip: str, timeout: float = 1.0) -> bool:
|
|
"""Prüft ob IP ein WLED-Gerät ist"""
|
|
try:
|
|
response = requests.get(
|
|
f"http://{ip}/json/info",
|
|
timeout=timeout,
|
|
headers={'User-Agent': 'DeviceDiscovery/1.0'}
|
|
)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return 'ver' in data or 'name' in data
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
|
|
class WLEDAPI:
|
|
"""Original WLEDAPI Klasse - unverändert"""
|
|
|
|
def __init__(self, ip: str):
|
|
self.ip = ip
|
|
self.base_url = f"http://{ip}"
|
|
|
|
def get_info(self) -> Optional[Dict]:
|
|
try:
|
|
response = requests.get(f"{self.base_url}/json/info", timeout=2)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen der WLED-Info von {self.ip}: {e}")
|
|
return None
|
|
|
|
def get_presets(self) -> Optional[List]:
|
|
try:
|
|
response = requests.get(f"{self.base_url}/presets.json", timeout=2)
|
|
response.raise_for_status()
|
|
presets_data = response.json()
|
|
preset_list = []
|
|
if isinstance(presets_data, dict):
|
|
for preset_id, preset_data in presets_data.items():
|
|
preset_name = preset_data.get('n', f'Preset {preset_id}')
|
|
preset_list.append({int(preset_id): preset_name})
|
|
return preset_list
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen der WLED-Presets von {self.ip}: {e}")
|
|
return None
|
|
|
|
def get_effects(self) -> Optional[List]:
|
|
try:
|
|
response = requests.get(f"{self.base_url}/json/eff", timeout=2)
|
|
response.raise_for_status()
|
|
eff_data = response.json()
|
|
eff_list = []
|
|
for eff_id, eff_name in enumerate(eff_data):
|
|
if not eff_name:
|
|
eff_name = f"Effect {eff_id}"
|
|
eff_list.append({int(eff_id): eff_name})
|
|
return eff_list
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen der WLED-Effects von {self.ip}: {e}")
|
|
return None
|
|
|
|
def get_state(self) -> Optional[Dict]:
|
|
try:
|
|
response = requests.get(f"{self.base_url}/json/state", timeout=2)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen des WLED-State von {self.ip}: {e}")
|
|
return None
|
|
|
|
def get_device_data(self) -> Optional[Dict]:
|
|
"""Erstellt Geräte-Dict im vereinheitlichten Format"""
|
|
info = self.get_info()
|
|
state = self.get_state()
|
|
|
|
if not info:
|
|
return None
|
|
|
|
name = info.get('name', f"WLED {self.ip}")
|
|
preset_values = self.get_presets()
|
|
eff_values = self.get_effects()
|
|
|
|
commands = [
|
|
{'command': 'on', 'parameters': []},
|
|
{'command': 'off', 'parameters': []},
|
|
{
|
|
'command': 'setBrightness',
|
|
'parameters': [
|
|
{'name': 'brightness', 'type': 'integer', 'min': 0, 'max': 255}
|
|
]
|
|
},
|
|
{
|
|
'command': 'setColor',
|
|
'parameters': [
|
|
{'name': 'red', 'type': 'integer', 'min': 0, 'max': 255},
|
|
{'name': 'green', 'type': 'integer', 'min': 0, 'max': 255},
|
|
{'name': 'blue', 'type': 'integer', 'min': 0, 'max': 255}
|
|
]
|
|
},
|
|
{
|
|
'command': 'setEffect',
|
|
'parameters': [
|
|
{'name': 'effect', 'type': 'integer', 'min': 0, 'max': 255, 'values': eff_values}
|
|
]
|
|
},
|
|
{
|
|
'command': 'setPreset',
|
|
'parameters': [
|
|
{'name': 'preset', 'type': 'integer', 'min': 1, 'max': 250, 'values': preset_values}
|
|
]
|
|
}
|
|
]
|
|
|
|
states = []
|
|
if state:
|
|
states.append({
|
|
'name': 'power',
|
|
'type': 'boolean',
|
|
'current_value': state.get('on', False)
|
|
})
|
|
states.append({
|
|
'name': 'brightness',
|
|
'type': 'integer',
|
|
'current_value': state.get('bri', 0)
|
|
})
|
|
|
|
segments = state.get('seg', [])
|
|
if segments and len(segments) > 0:
|
|
colors = segments[0].get('col', [[0,0,0]])
|
|
if colors and len(colors) > 0:
|
|
states.append({
|
|
'name': 'color_rgb',
|
|
'type': 'array',
|
|
'current_value': colors[0]
|
|
})
|
|
|
|
return {
|
|
'type': 'WLED',
|
|
'name': name,
|
|
'url': f"wled://{self.ip}",
|
|
'commands': commands,
|
|
'states': states
|
|
}
|
|
|
|
|
|
class WLEDModule(BaseModule):
|
|
"""
|
|
WLED Modul - Implementiert BaseModule Interface
|
|
Gibt nur Actors zurück, KEINE DB-Operationen
|
|
"""
|
|
|
|
def is_enabled(self) -> bool:
|
|
"""Prüft ob WLED aktiviert ist"""
|
|
return self.config.wled_enable
|
|
|
|
def discover(self) -> Tuple[List[Dict], List[Dict]]:
|
|
"""
|
|
Führt WLED Discovery durch
|
|
|
|
Returns:
|
|
Tuple (actors, sensors) - WLED sind immer Actors
|
|
"""
|
|
logger.info("\n" + "=" * 60)
|
|
logger.info("WLED-GERÄTE WERDEN GESUCHT")
|
|
logger.info("=" * 60)
|
|
|
|
actors = []
|
|
sensors = []
|
|
|
|
# Discovery
|
|
wled_ips = WLEDDiscovery.discover_devices(timeout=self.config.wled_discovery_timeout)
|
|
|
|
# Manuelle IPs hinzufügen
|
|
if self.config.wled_manual_ips:
|
|
logger.info(f"Füge {len(self.config.wled_manual_ips)} manuelle WLED-IPs hinzu...")
|
|
for manual_ip in self.config.wled_manual_ips:
|
|
if manual_ip not in wled_ips:
|
|
if WLEDDiscovery.is_wled_device(manual_ip):
|
|
wled_ips.append(manual_ip)
|
|
logger.info(f"✓ Manuelles WLED-Gerät: {manual_ip}")
|
|
else:
|
|
logger.warning(f"⚠ {manual_ip} ist kein WLED-Gerät")
|
|
|
|
if not wled_ips:
|
|
logger.info("Keine WLED-Geräte gefunden")
|
|
return actors, sensors
|
|
|
|
logger.info(f"{len(wled_ips)} WLED-Geräte gefunden")
|
|
|
|
# Gerätedaten abrufen
|
|
for ip in wled_ips:
|
|
try:
|
|
wled = WLEDAPI(ip)
|
|
device_data = wled.get_device_data()
|
|
|
|
if device_data:
|
|
actors.append(device_data)
|
|
else:
|
|
logger.warning(f"⚠ Konnte keine Daten von WLED {ip} abrufen")
|
|
except Exception as e:
|
|
logger.error(f"✗ Fehler beim Verarbeiten von WLED {ip}: {e}")
|
|
|
|
logger.info(f"WLED: {len(actors)} Aktoren gefunden")
|
|
return actors, sensors
|