phlyght/phlyght/phlyght.py
2023-05-27 03:45:43 -07:00

353 lines
12 KiB
Python

from asyncio import gather, new_event_loop, sleep
from io import StringIO
from pathlib import Path
from typing import Optional
from aiofiles import open as aio_open
from httpcore._exceptions import ReadTimeout
from httpx import AsyncClient, ConnectError, ConnectTimeout
from httpx._exceptions import ReadTimeout as HTTPxReadTimeout
from orjson import dumps as ordumps
from orjson import loads
from pydantic import BaseConfig, BaseModel, Field
from rich import print
from yaml import Loader
from yaml import dump as yaml_dump
from yaml import load
from .models import Entity, HueEntsV2
from .utils import LRU, MSG_RE_BYTES
from .http import HueAPIv2, HueEDK, TYPE_CACHE, Event, UUID_CMP
class Phlyght(HueAPIv2, HueEDK):
class Aliases(BaseModel):
class Config(BaseConfig):
smart_union = True
use_enum_values = True
allow_population_by_field_name = True
behavior_instances: Optional[dict[str, HueEntsV2.BehaviorInstance]] = Field(
default_factory=dict
)
behavior_scripts: Optional[dict[str, HueEntsV2.BehaviorScript]] = Field(
default_factory=dict
)
bridges: Optional[dict[str, HueEntsV2.Bridge]] = Field(default_factory=dict)
bridge_homes: Optional[dict[str, HueEntsV2.BridgeHome]] = Field(
default_factory=dict
)
buttons: Optional[dict[str, HueEntsV2.Button]] = Field(default_factory=dict)
device_powers: Optional[dict[str, HueEntsV2.DevicePower]] = Field(
default_factory=dict
)
entertainments: Optional[dict[str, HueEntsV2.Entertainment]] = Field(
default_factory=dict
)
entertainment_configurations: Optional[
dict[str, HueEntsV2.EntertainmentConfiguration]
] = Field(default_factory=dict)
geofence_clients: Optional[dict[str, HueEntsV2.GeofenceClient]] = Field(
default_factory=dict
)
geolocations: Optional[dict[str, HueEntsV2.Geolocation]] = Field(
default_factory=dict
)
lights: Optional[dict[str, HueEntsV2.Light]] = Field(default_factory=dict)
light_levels: Optional[dict[str, HueEntsV2.LightLevel]] = Field(
default_factory=dict
)
motions: Optional[dict[str, HueEntsV2.Motion]] = Field(default_factory=dict)
relative_rotaries: Optional[dict[str, HueEntsV2.RelativeRotary]] = Field(
default_factory=dict
)
rooms: Optional[dict[str, HueEntsV2.Room]] = Field(default_factory=dict)
scenes: Optional[dict[str, HueEntsV2.Scene]] = Field(default_factory=dict)
temperatures: Optional[dict[str, HueEntsV2.Temperature]] = Field(
default_factory=dict
)
zigbee_connectivities: Optional[
dict[str, HueEntsV2.ZigbeeConnectivity]
] = Field(default_factory=dict)
zigbee_device_discoveries: Optional[
dict[str, HueEntsV2.ZigbeeDeviceDiscovery]
] = Field(default_factory=dict)
zgb_connectivities: Optional[dict[str, HueEntsV2.ZigbeeConnectivity]] = Field(
default_factory=dict
)
zones: Optional[dict[str, HueEntsV2.Zone]] = Field(default_factory=dict)
def items(self):
return ((k, getattr(self, k)) for k in self.__fields__.keys())
def keys(self):
return self.__fields__.keys()
def __setitem__(self, key, value):
self.__setattr__(key, value)
def __getitem__(self, key, default=None):
return self.__getattribute__(key)
# def get_entity(self, id: UUID | str):
def __new__(
cls,
max_cache_size: int = 10,
**kwargs,
):
cls = super().__new__(
cls,
**kwargs,
)
return cls
def __init__(
self,
max_cache_size=10,
**kwargs,
):
from .abc import YAMLConfig
_config = Path("config.yaml")
if not _config.exists():
_config.touch()
self.config = YAMLConfig(
api_key=kwargs.get("api_key", None),
bridge_host=kwargs.get("bridge_host", None),
aliases={
"behavior_instances": {},
"behavior_scripts": {},
"bridge_homes": {},
"bridges": {},
"buttons": {},
"device_powers": {},
"entertainment_configurations": {},
"entertainments": {},
"geofence_clients": {},
"geolocations": {},
"light_levels": {},
"lights": {},
"motions": {},
"relative_rotaries": {},
"rooms": {},
"scenes": {},
"temperatures": {},
"zgb_connectivities": {},
"zigbee_connectivities": {},
"zigbee_device_discoveries": {},
"zones": {},
},
)
with _config.open("w+") as f:
yaml_dump(self.config, f, default_flow_style=False)
else:
with _config.open("r+") as f:
self.config = load(f, Loader=Loader)
super().__init__(
kwargs.get("hue_api_key", None) or self.config.api_key or exit(1)
)
Entity.cache_client(self)
self.cache = LRU(max_cache_size)
self._client = AsyncClient(headers=self._headers, verify=False)
self._subscription = None
self._bridge_host = f"""https://{(
kwargs.get("hue_bridge_ip", None) or self.config["bridge_host"] or exit(1)
)}"""
self._api_key = kwargs.get("hue_api_key", None) or self.config["api_key"]
self._tasks = []
self._entities = self.Aliases()
self.behavior_instances = {}
self.behavior_scripts = {}
self.bridges = {}
self.bridge_homes = {}
self.buttons = {}
self.device_powers = {}
self.entertainments = {}
self.entertainment_configurations = {}
self.geofence_clients = {}
self.geolocations = {}
self.lights = {}
self.light_levels = {}
self.motions = {}
self.relative_rotaries = {}
self.rooms = {}
self.scenes = {}
self.temperatures = {}
self.zigbee_connectivities = {}
self.zigbee_device_discoveries = {}
self.zgb_connectivities = {}
self.zones = {}
def subscribe(
self,
*args,
**kwargs,
):
if not self._subscription or self._subscription.done():
self._subscription = self.new_task(
self._subscribe(
*args,
**kwargs,
)
)
def run(self):
self.loop = new_event_loop()
self.loop.set_debug(True)
try:
self.loop.run_until_complete(self._startup())
self.loop.run_forever()
except KeyboardInterrupt:
for t in self._tasks:
t.cancel()
for t in self.cache:
t.value.cancel()
print("Exiting..")
self.loop.stop()
self.loop.close()
def new_task(self, coro):
task = self.loop.create_task(coro)
self._tasks.append(task)
return task
async def _startup(self):
try:
gotten = {}
for k, v in self.config["aliases"].items():
if k not in gotten:
gotten.setdefault(k, {"aliased": {}, "unaliased": {}})
objs = await getattr(self, f"get_{k}")()
for obj in objs:
gotten[k][obj.id] = obj
getattr(self, k)[obj.id] = obj
alias = v.get(str(obj.id))
if alias:
gotten[k]["aliased"][alias] = obj
getattr(self, k)[alias] = obj
setattr(self, alias, obj)
if hasattr(obj, "metadata"):
nm = (
(
obj.metadata["name"]
if isinstance(obj.metadata, dict)
else obj.metadata.name
)
.replace(" ", "_")
.replace("-", "_")
.lower()
)
setattr(self, nm, obj)
self._entities[k][nm] = obj
else:
self._entities[k][obj.id] = obj
self.new_task(self._subscribe())
while self.loop.is_running():
await sleep(60)
except KeyboardInterrupt:
await self.dump_state()
await gather(*self._tasks)
async def dump_state(self):
async with aio_open("state.json", "wb") as f:
await f.write(ordumps(self._entities))
def _parse_payload(self, payload: bytes):
_match = MSG_RE_BYTES.search(payload)
if not _match:
return None
_events, _data, _id = [], b"", b""
if (_data := _match.group("data")) and (_id := _match.group("id")):
_events.extend(loads(_data))
_evs = []
for _event in _events:
for _ent in _event["data"]:
_event_id = _id.decode()
_event_type = _event["type"]
if hasattr(self, f"on_{_ent['type']}_{_event['type']}"):
_object = TYPE_CACHE[_ent["type"]](**_ent)
event = Event(id=_event_id, object=_object, type_=_event_type)
_evs.append(
self.new_task(
getattr(self, f"on_{event.object.type}_{event.type_}")(
_object
)
)
)
self.cache.extend(*_evs)
async def dump(self, filename: Optional[Path | str] = None):
aliases = {}
for key, sub_val in self._entities.items():
defa = 0
aliases.setdefault(key, {})
for k, v in sub_val.items():
aliased = UUID_CMP.match(str(k))
if not aliased:
_key = k
else:
defa += 1
_key = v.cfg_prefix + str(defa)
if not aliased:
aliases[key][_key] = str(v.id)
else:
aliases[key][_key] = v.id
cfg = {
"bridge_ip": self.config.get("bridge_ip", ""),
"api_key": self.config.get("api_key", ""),
"aliases": {
k[0]: {vk: v for v, vk in sorted(k[1].items(), key=lambda i: i[1])}
for k in sorted(aliases.items(), key=lambda k: k[0])
},
}
if isinstance(filename, str):
file_path = Path(filename)
elif isinstance(filename, Path):
file_path = filename
else:
file_path = Path("dump.yaml")
f = await aio_open(file_path, "w+")
buf = StringIO()
yaml_dump(
cfg,
buf,
default_flow_style=False,
allow_unicode=True,
sort_keys=True,
canonical=False,
)
buf.seek(0)
await f.write(buf.getvalue())
await f.close()
async def _subscribe(
self,
*args,
**kwargs,
):
if hasattr(self, "on_ready"):
self.new_task(self.on_ready())
while self.loop.is_running():
try:
stream = await self.listen_events(
headers={**self._headers, **{"Accept": "text/event-stream"}}
)
async with stream as _iter:
async for msg in _iter.aiter_bytes():
self._parse_payload(msg)
except (ReadTimeout, HTTPxReadTimeout, ConnectTimeout, ConnectError):
...
await sleep(1)