event polling w/ lru for events

This commit is contained in:
rooba 2022-11-30 04:08:35 -08:00
parent cf95790544
commit 5be9ef5543
7 changed files with 619 additions and 527 deletions

3
.flake8 Normal file
View File

@ -0,0 +1,3 @@
[flake8]
ignore = W503
ignoreExtra = E501, E203

4
.gitignore vendored
View File

@ -1,6 +1,10 @@
**/.vscode
**/.mypy_cache
**/config.yaml
**/.venv
**/poetry.lock
**/__pycache__
**/testing
!test.py
test2.py

View File

@ -1,6 +1,6 @@
from .api import Router
from .http import Router
from .models import (
_Archetype,
Archetype,
Room,
Light,
Scene,
@ -25,7 +25,7 @@ from .models import (
__all__ = (
"Router",
"_Archetype",
"Archetype",
"Room",
"Light",
"Scene",

View File

@ -5,11 +5,12 @@ from inspect import signature
from re import compile
from typing import Any, Literal, Optional
from uuid import UUID
from time import time
from httpx import AsyncClient
from httpx._urls import URL as _URL
from httpx._exceptions import ConnectTimeout
from httpcore._exceptions import ReadTimeout
from attrs import define, field
from json import loads
from pydantic import BaseModel
@ -29,6 +30,7 @@ from . import models
STR_FMT_RE = compile(r"""(?=(\{([^:]+)(?::([^}]+))?\}))\1""")
URL_TYPES = {"str": str, "int": int}
IP_RE = compile(r"(?=(?:(?<=[^0-9])|)((?:[0-9]{,3}\.){3}[0-9]{,3}))\1")
MSG_RE = compile(
b"""(?=((?P<hello>^hi\\n\\n$)|^id:\\s(?P<id>[0-9]+:\\d*?)\\ndata:(?P<data>[^$]+)\\n\\n))\\1"""
@ -42,7 +44,6 @@ for k in models.__all__:
or not issubclass(getattr(models, k), BaseModel)
):
continue
# print(getattr(models, k).__dict__)
TYPE_CACHE[getattr(models, k).__fields__["type"].default] = getattr(models, k)
@ -59,6 +60,50 @@ def get_url_args(url):
return kwds
@define
class LRUItem:
access_time: int = field(init=False, factory=lambda: int(time()))
value: Any = object
def __id__(self):
return id(self.value.id)
def __hash__(self):
return hash(self.value.id)
class LRU(set):
def __init__(self, *items, maxsize=128):
self.maxsize = maxsize
super().__init__()
for item in items[:maxsize]:
self.add(LRUItem(value=item))
def add(self, item):
if len(self) + 1 > self.maxsize:
self ^= set(
sorted(self, key=lambda x: x.access_time)[
: len(self) + 1 - self.maxsize
]
)
super().add(LRUItem(value=item))
def pop(self):
super().pop().value
def remove(self, item):
super().remove(*filter(lambda x: x.value == item, self))
def extend(self, *items):
len_new = len(self) + len(items)
if len_new > self.maxsize:
self ^= set(
sorted(self, key=lambda x: x.access_time)[: len_new - self.maxsize]
)
self |= set([LRUItem(value=item) for item in items])
class URL(_URL):
def __truediv__(self, other):
# Why am i doing this? good question.
@ -96,7 +141,7 @@ def ret_cls(cls):
def route(method, endpoint) -> Any:
def wrapped(fn):
async def sub_wrap(
self: SubRouter,
self: "SubRouter",
base_uri=None,
content: Optional[bytes] = None,
data: Optional[dict[str, str]] = None,
@ -155,11 +200,15 @@ def route(method, endpoint) -> Any:
data = data | kwargs if data else kwargs
except Exception:
...
_match_bridge = IP_RE.search(self._bridge_ip)
if not _match_bridge:
raise ValueError(f"Invalid bridge ip {self._bridge_ip}")
_url_base = f"https://{_match_bridge.group(1)}/{base_uri}"
if url_args:
new_endpoint = URL(f"{self._api_path}") / endpoint.format(**url_args)
new_endpoint = URL(_url_base) / endpoint.format(**url_args)
else:
new_endpoint = URL(f"{self._api_path}") / endpoint
new_endpoint = URL(_url_base) / endpoint
if headers and headers.get("Accept", "") == "text/event-stream":
return self._client.stream(
@ -194,13 +243,11 @@ class RouterMeta(type):
def __new__(cls, name, bases, kwds, **kwargs):
cells = {}
_root = kwargs.get("root", "")
_base = kwds.get("BASE_URI", "")
_api = f"{_root}{_base}"
def set_key(v, base_uri=None):
def set_key(v):
def wrap(self, *args, **_kwds):
return v(self, *args, base_uri=base_uri, **_kwds)
return v(self, *args, base_uri=_base, **_kwds)
return wrap
@ -213,7 +260,7 @@ class RouterMeta(type):
for k, v in funcs:
if hasattr(v, "__closure__"):
# val = v.__closure__[0].cell_contents
cells[k] = set_key(v, base_uri=_api)
cells[k] = set_key(v)
kwds["handlers"] = cells
@ -233,10 +280,11 @@ class SubRouter(metaclass=RouterMeta):
BASE_URI: str
_api_path: str
_client: AsyncClient
_bridge_ip: str
def __new__(cls, hue_api_key: str):
if not hasattr(cls, "handlers"):
cls.handlers = {}
cls.handlers: dict[str, type] = {}
for base in cls.__bases__:
if hasattr(base, "handlers"):
@ -270,7 +318,7 @@ class HughApi(SubRouter):
async def get_light(self, light_id: str):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/light/{light_id}")
async def set_light(
self,
@ -294,7 +342,7 @@ class HughApi(SubRouter):
async def get_scenes(self):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("POST", "/resource/scene")
async def create_scene(self, **kwargs):
...
@ -304,12 +352,12 @@ class HughApi(SubRouter):
async def get_scene(self, scene_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/scene/{scene_id}")
async def set_scene(self, scene_id: UUID, **kwargs):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("DELETE", "/resource/scene/{scene_id}")
async def delete_scene(self, scene_id: UUID):
...
@ -319,7 +367,7 @@ class HughApi(SubRouter):
async def get_rooms(self):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("POST", "/resource/room")
async def create_room(self, **kwargs):
...
@ -329,17 +377,17 @@ class HughApi(SubRouter):
async def get_room(self, room_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/room/{room_id}")
async def set_room(
self,
room_id: UUID,
metadata: Optional[dict[str, str]] = None,
children: Optional[models._Identifier] = None,
children: Optional[models.Identifier] = None,
):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("DELETE", "/resource/room/{room_id}")
async def delete_room(self, room_id: UUID):
...
@ -349,7 +397,7 @@ class HughApi(SubRouter):
async def get_zones(self):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("POST", "/resource/zone")
async def create_zone(self, **kwargs):
...
@ -359,12 +407,12 @@ class HughApi(SubRouter):
async def get_zone(self, zone_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/zone/{zone_id}")
async def set_zone(self, zone_id: UUID, **kwargs):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("DELETE", "/resource/zone/{zone_id}")
async def delete_zone(self, zone_id: UUID):
...
@ -379,7 +427,7 @@ class HughApi(SubRouter):
async def get_bridge_home(self, bridge_home_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/bridge_home/{bridge_home_id}")
async def set_bridge_home(self, bridge_home_id: UUID, **kwargs):
...
@ -394,7 +442,7 @@ class HughApi(SubRouter):
async def get_grouped_light(self, grouped_light_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/grouped_light/{grouped_light_id}")
async def set_grouped_light(self, grouped_light_id: UUID, **kwargs):
...
@ -409,7 +457,7 @@ class HughApi(SubRouter):
async def get_device(self, device_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/device/{device_id}")
async def set_device(self, device_id: UUID, **kwargs):
...
@ -424,7 +472,7 @@ class HughApi(SubRouter):
async def get_bridge(self, bridge_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/bridges/{bridge_id}")
async def set_bridge(self, bridge_id: UUID, **kwargs):
...
@ -439,7 +487,7 @@ class HughApi(SubRouter):
async def get_device_power(self, device_power_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/device_power/{device_power_id}")
async def set_device_power(self, device_power_id: UUID, **kwargs):
...
@ -454,7 +502,7 @@ class HughApi(SubRouter):
async def get_zigbee_connectivity(self, zigbee_connectivity_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/zigbee_connectivity/{zigbee_connectivity_id}")
async def set_zigbee_connectivity(self, zigbee_connectivity_id: UUID, **kwargs):
...
@ -469,7 +517,7 @@ class HughApi(SubRouter):
async def get_zgb_connectivity(self, zgb_connectivity_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/zgb_connectivity/{zgb_connectivity_id}")
async def set_zgb_connectivity(self, zgb_connectivity_id: UUID, **kwargs):
...
@ -484,7 +532,7 @@ class HughApi(SubRouter):
async def get_motion(self, motion_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/motion/{motion_id}")
async def set_motion(self, motion_id: UUID, **kwargs):
...
@ -499,7 +547,7 @@ class HughApi(SubRouter):
async def get_temperature(self, temperature_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/temperature/{temperature_id}")
async def set_temperature(self, temperature_id: UUID, **kwargs):
...
@ -514,7 +562,7 @@ class HughApi(SubRouter):
async def get_light_level(self, light_level_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/light_level/{light_level_id}")
async def set_light_level(self, light_level_id: UUID, **kwargs):
...
@ -529,7 +577,7 @@ class HughApi(SubRouter):
async def get_button(self, button_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/button/{button_id}")
async def set_button(self, button_id: UUID, **kwargs):
...
@ -549,7 +597,7 @@ class HughApi(SubRouter):
async def get_behavior_instances(self):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("POST", "/resource/behavior_instance")
async def create_behavior_instance(self, **kwargs):
...
@ -559,12 +607,12 @@ class HughApi(SubRouter):
async def get_behavior_instance(self, behavior_instance_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/behavior_instance/{behavior_instance_id}")
async def set_behavior_instance(self, behavior_instance_id: UUID, **kwargs):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("DELETE", "/resource/behavior_instance/{behavior_instance_id}")
async def delete_behavior_instance(self, behavior_instance_id: UUID):
...
@ -574,7 +622,7 @@ class HughApi(SubRouter):
async def get_geofence_clients(self):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("POST", "/resource/geofence_client")
async def create_geofence_client(self, **kwargs):
...
@ -584,12 +632,12 @@ class HughApi(SubRouter):
async def get_geofence_client(self, geofence_client_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/geofence_client/{geofence_client_id}")
async def set_geofence_client(self, geofence_client_id: UUID, **kwargs):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("DELETE", "/resource/geofence_client/{geofence_client_id}")
async def delete_geofence_client(self, geofence_client_id: UUID):
...
@ -604,7 +652,7 @@ class HughApi(SubRouter):
async def get_geolocation(self, geolocation_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/geolocation/{geolocation_id}")
async def set_geolocation(self, geolocation_id: UUID, **kwargs):
...
@ -614,7 +662,7 @@ class HughApi(SubRouter):
async def get_entertainment_configurations(self):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("POST", "/resource/entertainment_configuration")
async def create_entertainment_configuration(self, **kwargs):
...
@ -628,7 +676,7 @@ class HughApi(SubRouter):
):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route(
"PUT", "/resource/entertainment_configuration/{entertainment_configuration_id}"
)
@ -637,7 +685,7 @@ class HughApi(SubRouter):
):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route(
"DELETE",
"/resource/entertainment_configuration/{entertainment_configuration_id}",
@ -657,7 +705,7 @@ class HughApi(SubRouter):
async def get_entertainment(self, entertainment_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/entertainment/{entertainment_id}")
async def set_entertainment(self, entertainment_id: UUID, **kwargs):
...
@ -672,7 +720,7 @@ class HughApi(SubRouter):
async def get_homekit(self, homekit_id: UUID):
...
@ret_cls(models._Identifier)
@ret_cls(models.Identifier)
@route("PUT", "/resource/homekit/{homekit_id}")
async def set_homekit(
self,
@ -692,21 +740,29 @@ class HughApi(SubRouter):
...
class Router(HughApi, root="https://192.168.69.104"):
def __init__(self, hue_api_key: str):
class Router(HughApi):
def __new__(cls, hue_api_key: str, bridge_ip: str = "", max_cache_size: int = 512):
cls = super().__new__(cls, hue_api_key)
return cls
def __init__(
self, hue_api_key: str, bridge_ip: Optional[str] = None, max_cache_size=512
):
super().__init__(hue_api_key)
self._client = AsyncClient(headers=self._headers, verify=False)
self._subscription = None
self._bridge_ip = bridge_ip or ""
self.cache = LRU(maxsize=max_cache_size)
def subscribe(self, *args, **kwargs):
if not self._subscription or self._subscription.done():
self._subscription = get_running_loop().create_task(self._subscribe())
async def _subscribe(self, *args, **kwargs):
stream = await self.listen_events(
headers={"Accept": "text/event-stream"} | self._headers
)
while get_running_loop().is_running():
resp = await stream.gen.__anext__()
_bound = resp.stream._stream._httpcore_stream
@ -721,13 +777,13 @@ class Router(HughApi, root="https://192.168.69.104"):
...
else:
payload = loads(_match.group("data"))
id_ = _match.group("id").decode()
objs = []
id_ = _match.group("id")
for event in payload:
for ob in event["data"]:
objs.append(TYPE_CACHE[ob["type"]](**ob))
self.cache.add(TYPE_CACHE[ob["type"]](**ob))
print(objs)
print(len(self.cache))
except ReadTimeout:
stream = await self.listen_events(

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,16 @@ orjson = "^3.8.2"
black = ">=22.10.0"
rich = ">=12.6.0"
[tool.poetry.group.dev.dependencies]
pycodestyle = "^2.10.0"
pylint = "^2.15.7"
mypy = "^0.991"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
[tool.flake8]
ignore = ["W503", "E501", "E203"]
extras = ["E501", "E203"]

37
test.py
View File

@ -1,6 +1,7 @@
from asyncio import run, sleep
from asyncio import run, get_running_loop, sleep
from phlyght.api import Router
from phlyght.http import Router
from phlyght import models
try:
from rich import print # noqa
@ -9,34 +10,14 @@ except ImportError:
async def main():
router = Router("ur key")
router = Router("Your Bridge Auth key", bridge_ip="https://192.168.1.1")
# this will start listening in the background for all events sent out by the bridge
router.subscribe()
print(await router.get_lights())
print(await router.get_scenes())
print(await router.get_devices())
await router.get_rooms()
await router.get_zones()
await router.get_bridge_homes()
await router.get_grouped_lights()
await router.get_bridges()
await router.get_device_powers()
await router.get_zigbee_connectivities()
await router.get_zgb_connectivities()
print(await router.get_motions())
print(await router.get_temperatures())
await router.get_light_levels()
await router.get_buttons()
await router.get_behavior_scripts()
await router.get_behavior_instances()
await router.get_geofence_clients()
await router.get_geolocations()
await router.get_entertainment_configurations()
await router.get_entertainments()
await router.get_homekits()
await router.get_resources()
await router._subscribe()
while True:
await sleep(5)
# this will query for all lights every 10 seconds
print(await router.get_lights)
await sleep(10.0)
run(main())