353 lines
12 KiB
Python
353 lines
12 KiB
Python
from asyncio import gather, new_event_loop, sleep
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from aiofiles import open as aio_open
|
|
from httpcore._exceptions import ReadTimeout
|
|
from httpx import AsyncClient, ConnectError, ConnectTimeout
|
|
from httpx._exceptions import ReadTimeout as HTTPxReadTimeout
|
|
from orjson import dumps as ordumps
|
|
from orjson import loads
|
|
from pydantic import BaseConfig, BaseModel, Field
|
|
from rich import print
|
|
from yaml import Loader
|
|
from yaml import dump as yaml_dump
|
|
from yaml import load
|
|
|
|
from .models import Entity, HueEntsV2
|
|
from .utils import LRU, MSG_RE_BYTES
|
|
from .http import HueAPIv2, HueEDK, TYPE_CACHE, Event, UUID_CMP
|
|
|
|
|
|
class Phlyght(HueAPIv2, HueEDK):
|
|
class Aliases(BaseModel):
|
|
class Config(BaseConfig):
|
|
smart_union = True
|
|
use_enum_values = True
|
|
allow_population_by_field_name = True
|
|
|
|
behavior_instances: Optional[dict[str, HueEntsV2.BehaviorInstance]] = Field(
|
|
default_factory=dict
|
|
)
|
|
behavior_scripts: Optional[dict[str, HueEntsV2.BehaviorScript]] = Field(
|
|
default_factory=dict
|
|
)
|
|
bridges: Optional[dict[str, HueEntsV2.Bridge]] = Field(default_factory=dict)
|
|
bridge_homes: Optional[dict[str, HueEntsV2.BridgeHome]] = Field(
|
|
default_factory=dict
|
|
)
|
|
buttons: Optional[dict[str, HueEntsV2.Button]] = Field(default_factory=dict)
|
|
device_powers: Optional[dict[str, HueEntsV2.DevicePower]] = Field(
|
|
default_factory=dict
|
|
)
|
|
entertainments: Optional[dict[str, HueEntsV2.Entertainment]] = Field(
|
|
default_factory=dict
|
|
)
|
|
entertainment_configurations: Optional[
|
|
dict[str, HueEntsV2.EntertainmentConfiguration]
|
|
] = Field(default_factory=dict)
|
|
geofence_clients: Optional[dict[str, HueEntsV2.GeofenceClient]] = Field(
|
|
default_factory=dict
|
|
)
|
|
geolocations: Optional[dict[str, HueEntsV2.Geolocation]] = Field(
|
|
default_factory=dict
|
|
)
|
|
lights: Optional[dict[str, HueEntsV2.Light]] = Field(default_factory=dict)
|
|
light_levels: Optional[dict[str, HueEntsV2.LightLevel]] = Field(
|
|
default_factory=dict
|
|
)
|
|
motions: Optional[dict[str, HueEntsV2.Motion]] = Field(default_factory=dict)
|
|
relative_rotaries: Optional[dict[str, HueEntsV2.RelativeRotary]] = Field(
|
|
default_factory=dict
|
|
)
|
|
rooms: Optional[dict[str, HueEntsV2.Room]] = Field(default_factory=dict)
|
|
scenes: Optional[dict[str, HueEntsV2.Scene]] = Field(default_factory=dict)
|
|
temperatures: Optional[dict[str, HueEntsV2.Temperature]] = Field(
|
|
default_factory=dict
|
|
)
|
|
zigbee_connectivities: Optional[
|
|
dict[str, HueEntsV2.ZigbeeConnectivity]
|
|
] = Field(default_factory=dict)
|
|
zigbee_device_discoveries: Optional[
|
|
dict[str, HueEntsV2.ZigbeeDeviceDiscovery]
|
|
] = Field(default_factory=dict)
|
|
zgb_connectivities: Optional[dict[str, HueEntsV2.ZigbeeConnectivity]] = Field(
|
|
default_factory=dict
|
|
)
|
|
zones: Optional[dict[str, HueEntsV2.Zone]] = Field(default_factory=dict)
|
|
|
|
def items(self):
|
|
return ((k, getattr(self, k)) for k in self.__fields__.keys())
|
|
|
|
def keys(self):
|
|
return self.__fields__.keys()
|
|
|
|
def __setitem__(self, key, value):
|
|
self.__setattr__(key, value)
|
|
|
|
def __getitem__(self, key, default=None):
|
|
return self.__getattribute__(key)
|
|
|
|
# def get_entity(self, id: UUID | str):
|
|
|
|
def __new__(
|
|
cls,
|
|
max_cache_size: int = 10,
|
|
**kwargs,
|
|
):
|
|
cls = super().__new__(
|
|
cls,
|
|
**kwargs,
|
|
)
|
|
return cls
|
|
|
|
def __init__(
|
|
self,
|
|
max_cache_size=10,
|
|
**kwargs,
|
|
):
|
|
from .abc import YAMLConfig
|
|
|
|
_config = Path("config.yaml")
|
|
if not _config.exists():
|
|
_config.touch()
|
|
self.config = YAMLConfig(
|
|
api_key=kwargs.get("api_key", None),
|
|
bridge_host=kwargs.get("bridge_host", None),
|
|
aliases={
|
|
"behavior_instances": {},
|
|
"behavior_scripts": {},
|
|
"bridge_homes": {},
|
|
"bridges": {},
|
|
"buttons": {},
|
|
"device_powers": {},
|
|
"entertainment_configurations": {},
|
|
"entertainments": {},
|
|
"geofence_clients": {},
|
|
"geolocations": {},
|
|
"light_levels": {},
|
|
"lights": {},
|
|
"motions": {},
|
|
"relative_rotaries": {},
|
|
"rooms": {},
|
|
"scenes": {},
|
|
"temperatures": {},
|
|
"zgb_connectivities": {},
|
|
"zigbee_connectivities": {},
|
|
"zigbee_device_discoveries": {},
|
|
"zones": {},
|
|
},
|
|
)
|
|
with _config.open("w+") as f:
|
|
yaml_dump(self.config, f, default_flow_style=False)
|
|
else:
|
|
with _config.open("r+") as f:
|
|
self.config = load(f, Loader=Loader)
|
|
super().__init__(
|
|
kwargs.get("hue_api_key", None) or self.config.api_key or exit(1)
|
|
)
|
|
Entity.cache_client(self)
|
|
self.cache = LRU(max_cache_size)
|
|
self._client = AsyncClient(headers=self._headers, verify=False)
|
|
self._subscription = None
|
|
self._bridge_host = f"""https://{(
|
|
kwargs.get("hue_bridge_ip", None) or self.config["bridge_host"] or exit(1)
|
|
)}"""
|
|
self._api_key = kwargs.get("hue_api_key", None) or self.config["api_key"]
|
|
self._tasks = []
|
|
self._entities = self.Aliases()
|
|
|
|
self.behavior_instances = {}
|
|
self.behavior_scripts = {}
|
|
self.bridges = {}
|
|
self.bridge_homes = {}
|
|
self.buttons = {}
|
|
self.device_powers = {}
|
|
self.entertainments = {}
|
|
self.entertainment_configurations = {}
|
|
self.geofence_clients = {}
|
|
self.geolocations = {}
|
|
self.lights = {}
|
|
self.light_levels = {}
|
|
self.motions = {}
|
|
self.relative_rotaries = {}
|
|
self.rooms = {}
|
|
self.scenes = {}
|
|
self.temperatures = {}
|
|
self.zigbee_connectivities = {}
|
|
self.zigbee_device_discoveries = {}
|
|
self.zgb_connectivities = {}
|
|
self.zones = {}
|
|
|
|
def subscribe(
|
|
self,
|
|
*args,
|
|
**kwargs,
|
|
):
|
|
if not self._subscription or self._subscription.done():
|
|
self._subscription = self.new_task(
|
|
self._subscribe(
|
|
*args,
|
|
**kwargs,
|
|
)
|
|
)
|
|
|
|
def run(self):
|
|
self.loop = new_event_loop()
|
|
self.loop.set_debug(True)
|
|
try:
|
|
self.loop.run_until_complete(self._startup())
|
|
self.loop.run_forever()
|
|
except KeyboardInterrupt:
|
|
for t in self._tasks:
|
|
t.cancel()
|
|
for t in self.cache:
|
|
t.value.cancel()
|
|
print("Exiting..")
|
|
self.loop.stop()
|
|
self.loop.close()
|
|
|
|
def new_task(self, coro):
|
|
task = self.loop.create_task(coro)
|
|
self._tasks.append(task)
|
|
return task
|
|
|
|
async def _startup(self):
|
|
try:
|
|
gotten = {}
|
|
|
|
for k, v in self.config["aliases"].items():
|
|
if k not in gotten:
|
|
gotten.setdefault(k, {"aliased": {}, "unaliased": {}})
|
|
objs = await getattr(self, f"get_{k}")()
|
|
for obj in objs:
|
|
gotten[k][obj.id] = obj
|
|
getattr(self, k)[obj.id] = obj
|
|
|
|
alias = v.get(str(obj.id))
|
|
if alias:
|
|
gotten[k]["aliased"][alias] = obj
|
|
getattr(self, k)[alias] = obj
|
|
setattr(self, alias, obj)
|
|
|
|
if hasattr(obj, "metadata"):
|
|
nm = (
|
|
(
|
|
obj.metadata["name"]
|
|
if isinstance(obj.metadata, dict)
|
|
else obj.metadata.name
|
|
)
|
|
.replace(" ", "_")
|
|
.replace("-", "_")
|
|
.lower()
|
|
)
|
|
setattr(self, nm, obj)
|
|
self._entities[k][nm] = obj
|
|
else:
|
|
self._entities[k][obj.id] = obj
|
|
|
|
self.new_task(self._subscribe())
|
|
|
|
while self.loop.is_running():
|
|
await sleep(60)
|
|
except KeyboardInterrupt:
|
|
await self.dump_state()
|
|
await gather(*self._tasks)
|
|
|
|
async def dump_state(self):
|
|
async with aio_open("state.json", "wb") as f:
|
|
await f.write(ordumps(self._entities))
|
|
|
|
def _parse_payload(self, payload: bytes):
|
|
_match = MSG_RE_BYTES.search(payload)
|
|
if not _match:
|
|
return None
|
|
_events, _data, _id = [], b"", b""
|
|
if (_data := _match.group("data")) and (_id := _match.group("id")):
|
|
_events.extend(loads(_data))
|
|
_evs = []
|
|
for _event in _events:
|
|
for _ent in _event["data"]:
|
|
_event_id = _id.decode()
|
|
_event_type = _event["type"]
|
|
if hasattr(self, f"on_{_ent['type']}_{_event['type']}"):
|
|
_object = TYPE_CACHE[_ent["type"]](**_ent)
|
|
event = Event(id=_event_id, object=_object, type_=_event_type)
|
|
_evs.append(
|
|
self.new_task(
|
|
getattr(self, f"on_{event.object.type}_{event.type_}")(
|
|
_object
|
|
)
|
|
)
|
|
)
|
|
|
|
self.cache.extend(*_evs)
|
|
|
|
async def dump(self, filename: Optional[Path | str] = None):
|
|
aliases = {}
|
|
for key, sub_val in self._entities.items():
|
|
defa = 0
|
|
aliases.setdefault(key, {})
|
|
for k, v in sub_val.items():
|
|
aliased = UUID_CMP.match(str(k))
|
|
if not aliased:
|
|
_key = k
|
|
else:
|
|
defa += 1
|
|
_key = v.cfg_prefix + str(defa)
|
|
|
|
if not aliased:
|
|
aliases[key][_key] = str(v.id)
|
|
else:
|
|
aliases[key][_key] = v.id
|
|
|
|
cfg = {
|
|
"bridge_ip": self.config.get("bridge_ip", ""),
|
|
"api_key": self.config.get("api_key", ""),
|
|
"aliases": {
|
|
k[0]: {vk: v for v, vk in sorted(k[1].items(), key=lambda i: i[1])}
|
|
for k in sorted(aliases.items(), key=lambda k: k[0])
|
|
},
|
|
}
|
|
|
|
if isinstance(filename, str):
|
|
file_path = Path(filename)
|
|
elif isinstance(filename, Path):
|
|
file_path = filename
|
|
else:
|
|
file_path = Path("dump.yaml")
|
|
f = await aio_open(file_path, "w+")
|
|
buf = StringIO()
|
|
yaml_dump(
|
|
cfg,
|
|
buf,
|
|
default_flow_style=False,
|
|
allow_unicode=True,
|
|
sort_keys=True,
|
|
canonical=False,
|
|
)
|
|
buf.seek(0)
|
|
await f.write(buf.getvalue())
|
|
await f.close()
|
|
|
|
async def _subscribe(
|
|
self,
|
|
*args,
|
|
**kwargs,
|
|
):
|
|
if hasattr(self, "on_ready"):
|
|
self.new_task(self.on_ready())
|
|
while self.loop.is_running():
|
|
try:
|
|
stream = await self.listen_events(
|
|
headers={**self._headers, **{"Accept": "text/event-stream"}}
|
|
)
|
|
async with stream as _iter:
|
|
async for msg in _iter.aiter_bytes():
|
|
self._parse_payload(msg)
|
|
|
|
except (ReadTimeout, HTTPxReadTimeout, ConnectTimeout, ConnectError):
|
|
...
|
|
await sleep(1)
|