fix up example a bit, modify defaults for models

This commit is contained in:
ra 2023-01-04 01:17:06 -08:00
parent 39162d3013
commit 9e23aef973
6 changed files with 694 additions and 644 deletions

View File

@ -1,3 +1,2 @@
[flake8]
ignore = W503
ignoreExtra = E501, E203
ignore = W503, E501

9
config_example.yaml Normal file
View File

@ -0,0 +1,9 @@
api_key:
bridge_ip: 192.168.1.1
aliases:
lights:
00000000-0000-0000-0000-000000000000: entry
11111111-1111-1111-1111-111111111111: bed
22222222-2222-2222-2222-222222222222: kitchen
33333333-3333-3333-3333-333333333333: bathroom
44444444-4444-4444-4444-444444444444: footrest

View File

@ -1,56 +1,54 @@
from asyncio import get_running_loop, sleep, create_task
from uuid import UUID
from asyncio import get_running_loop, sleep
from phlyght import HueEntsV2, Router
from phlyght import models
from phlyght import HueEntsV2, Router, Attributes, _XY
from rich import print
from random import random, randint
from random import random
from uvloop import install
try:
from uvloop import install
install()
install()
except ImportError:
...
class HueRouter(Router):
async def on_light_update(self, light: HueEntsV2.Light):
print(f"A light was updated: {light}")
return True
async def on_button_update(self, button: HueEntsV2.Button):
print(f"A button was pressed: {button}")
return True
async def on_grouped_light_update(self, grouped_light: HueEntsV2.GroupedLight):
print(f"A grouped light was updated: {grouped_light}")
async def on_ready(self):
create_task(self._random_cycle("1795db24-4b81-4763-a6db-1d0d44373599"))
await sleep(0.3)
create_task(self._random_cycle("3c9d5ba7-ea43-45f8-a823-f3b32168776d"))
await sleep(0.3)
create_task(self._random_cycle("93a1533e-bfeb-4103-a150-180943a3ff3b"))
await sleep(0.3)
create_task(self._random_cycle("a9093d1b-8508-4311-8d8a-62b5389dae30"))
await sleep(0.3)
create_task(self._random_cycle("e3b73801-9040-440c-9284-fa16ac3d8713"))
await sleep(0.3)
async def _random_cycle(self, id: str):
async def _shift(self, light: HueEntsV2.Light):
while get_running_loop().is_running():
light.color = Attributes.LightColor(xy=_XY(x=random(), y=random()))
light.dimming = Attributes.Dimming(brightness=100.0, min_dim_level=0.2)
# We can modify the attributes of the lights and send the light object as the parameter to set_light()
await self.set_light(
id,
HueEntsV2.Light(
color=models.LightColor(xy=models._XY(x=random(), y=random())),
on=models.On(on=True),
dimming=models.Dimming(brightness=randint(25, 100)),
effects=models.LightEffect(effect="candle"),
),
light.id,
light,
)
await sleep(0.3)
# Could potentially get more in / at a faster update rate but its getting pretty close to making the bridge unresponsive at this rate
async def on_ready(self): # This will be called once, right after startup
for light in [
self.entry,
self.bed,
self.kitchen,
self.bathroom,
self.footrest,
]:
# These are all aliases defined in the config, accessible as an attribute using the name on the router
self.new_task(self._shift(light))
await sleep(0.5)
return
router = HueRouter(
"your access key",
bridge_ip="https://your gateway address",
max_cache_size=1024,
"Your API Key",
bridge_ip="https://192.168.1.1",
max_cache_size=64,
)
router.run()

View File

@ -1,81 +1,13 @@
from .http import Router
from .models import (
Archetype,
HueEntsV2,
RoomType,
Entity,
HueEntsV1,
Action,
Actions,
Alert,
Button,
Color,
ColorMirekSchema,
ColorPoint,
ColorPointColor,
ColorTemp,
EntChannel,
LightColor,
PaletteColor,
ScenePaletteColorTemp,
Dependee,
Dimming,
DimmingDelta,
Dynamics,
SceneDynamics,
ProductData,
Effects,
EntLocation,
LightEffect,
SceneEffects,
TimedEffects,
RotaryEvent,
Gamut,
Gradient,
Identifier,
LightLevelValue,
ServiceLocation,
Motion,
SegmentManager,
)
from .models import Archetype, HueEntsV2, Attributes, RoomType, Entity, HueEntsV1, _XY
__all__ = (
"Router",
"Entity",
"Archetype",
"RoomType",
"Attributes",
"HueEntsV2",
"HueEntsV1",
"Action",
"Actions",
"Alert",
"Button",
"Color",
"ColorMirekSchema",
"ColorPoint",
"ColorPointColor",
"ColorTemp",
"EntChannel",
"LightColor",
"PaletteColor",
"ScenePaletteColorTemp",
"Dependee",
"Dimming",
"DimmingDelta",
"Dynamics",
"SceneDynamics",
"ProductData",
"Effects",
"EntLocation",
"LightEffect",
"SceneEffects",
"TimedEffects",
"RotaryEvent",
"Gamut",
"Gradient",
"Identifier",
"LightLevelValue",
"ServiceLocation",
"Motion",
"SegmentManager",
"_XY",
)

View File

@ -1,36 +1,45 @@
from abc import abstractmethod
from asyncio import get_running_loop, new_event_loop
from asyncio import gather, get_running_loop, new_event_loop, sleep
from inspect import signature, Parameter
from re import compile
from typing import Any, Literal, Optional
from typing import Any, Literal, Optional, TypeVar, Generic
from time import time
from httpx import AsyncClient, ConnectError, ConnectTimeout, _content
from httpx._urls import URL as HttpxURL
from httpx._urls import URL as _URL
from httpx._exceptions import ReadTimeout as HTTPxReadTimeout
from httpcore._exceptions import ReadTimeout
from ujson import loads, JSONDecodeError
from pydantic import BaseModel, Field
from yarl import URL as YarlURL
from pydantic.dataclasses import dataclass
from pydantic.generics import GenericModel
from yaml import Loader, load
from . import models
from .models import HueEntsV2, Entity, UUID
try:
from ujson import dumps
from ujson import dumps, loads, JSONDecodeError
except ImportError:
try:
from orjson import dumps
from orjson import dumps, loads, JSONDecodeError
except ImportError:
from json import dumps
from json import dumps, loads, JSONDecodeError
setattr(_content, "json_dumps", dumps)
try:
from yarl import URL as UR
except ImportError:
...
try:
from rich import print # noqa
except ImportError:
...
_T = TypeVar("_T")
__all__ = ("Router", "route", "RouterMeta", "SubRouter", "HueAPIv2")
ENDPOINT_METHOD = compile(r"^(?=((?:get|set|create|delete)\w+))\1")
@ -42,7 +51,7 @@ MSG_RE_BYTES = compile(
rb"(?=((?P<hello>^: hi\n\n$)|^id:\s(?P<id>[0-9]+:\d*?)\ndata:(?P<data>[^$]+)\n\n))\1"
)
MSG_RE_TEXT = compile(
r"(?=((?P<hello>^: hi\n*?$)|^(?:id:\s(?P<id>[0-9]+:[0-9]?)\n*?)|data:[\n\s]*?(?P<data>[^$]+)[\n\s]*?))\1"
r"(?=((?P<hello>^: hi\n\n$)|^id:\s(?P<id>[0-9]+:\d*?)\ndata:(?P<data>[^$]+)\n\n))\1"
)
TYPE_CACHE = {}
@ -52,6 +61,11 @@ for k, v in HueEntsV2.__dict__.items():
TYPE_CACHE[getattr(v, "type")] = v
@dataclass()
class PhlyghtConfig:
...
def get_url_args(url):
kwds = {}
match = STR_FMT_RE.finditer(url)
@ -65,9 +79,12 @@ def get_url_args(url):
return kwds
class Event(BaseModel):
class Event(GenericModel, Generic[_T]):
class Config:
__root__ = _T
id: str
object: Entity
object: _T
type: str
@ -76,26 +93,30 @@ class LRUItem(BaseModel):
value: Any = object()
def __id__(self):
return id(self.value.id)
return id(self.value)
def __hash__(self):
return hash(self.value.id)
return hash(self.value)
class LRU(set):
def __init__(self, *items, maxsize=128):
self.maxsize = maxsize
def __init__(self, maxsize, /, *items):
super().__init__()
self.maxsize = maxsize
for item in items[:maxsize]:
self.add(LRUItem(value=item))
def add(self, item):
if len(self) + 1 > self.maxsize:
self ^= set(
sorted(self, key=lambda x: x.access_time)[
new = self ^ set(
sorted(self, key=lambda x: x.access_time)[::-1][
: len(self) + 1 - self.maxsize
]
)
old = self - new
for fut in old:
fut.value.cancel()
self -= old
super().add(LRUItem(value=item))
@ -108,22 +129,29 @@ class LRU(set):
def extend(self, *items):
len_new = len(self) + len(items)
if len_new > self.maxsize:
self ^= set(
sorted(self, key=lambda x: x.access_time)[: len_new - self.maxsize]
new = self ^ set(
sorted(self, key=lambda x: x.access_time)[::-1][
: len_new - self.maxsize
]
)
old = self - new
for fut in old:
fut.value.cancel()
self -= old
self |= set([LRUItem(value=item) for item in items])
class URL(HttpxURL):
class URL(_URL):
def __truediv__(self, other):
# Why am i doing this? good question.
try:
return URL(str(YarlURL(f"{self}") / other.lstrip("/")))
return URL(str(UR(f"{self}") / other.lstrip("/")))
except NameError:
return URL(f"{self}{other.lstrip('/')}")
def __floordiv__(self, other):
try:
return URL(str(YarlURL(f"{self}") / other.lstrip("/")))
return URL(str(UR(f"{self}") / other.lstrip("/")))
except NameError:
return URL(f"{self}{other.lstrip('/')}")
@ -180,6 +208,7 @@ def route(method, endpoint) -> Any:
ents = set(filter(lambda x: isinstance(x, Entity), args))
args = tuple(args - ents)
ent: BaseModel
for ent in ents:
json |= loads(
ent.json(exclude_unset=True, exclude_none=True, skip_defaults=True)
@ -190,7 +219,7 @@ def route(method, endpoint) -> Any:
continue
if param.kind == Parameter.POSITIONAL_ONLY:
data[param_name] = param.annotation(args[0])
data[param_name] = param.annotation(str(args[0]))
if len(args) > 1:
args = args[1:]
@ -246,6 +275,7 @@ def route(method, endpoint) -> Any:
new_endpoint = URL(_url_base) / endpoint.format(**url_args)
else:
new_endpoint = URL(_url_base) / endpoint
if headers and headers.get("Accept", "") == "text/event-stream":
return self._client.stream(
method,
@ -256,7 +286,7 @@ def route(method, endpoint) -> Any:
headers=headers,
)
else:
return await self._client.request(
resp = await self._client.request(
method,
new_endpoint,
content=content,
@ -265,6 +295,7 @@ def route(method, endpoint) -> Any:
headers=headers,
json=json,
)
return resp
return sub_wrap
@ -349,9 +380,12 @@ class SubRouter(metaclass=RouterMeta):
if kwargs.get("root"):
cls._api_path = f'{kwargs.get("root")}{cls.BASE_URI}'
def __getattribute__(self, key) -> Any:
return object.__getattribute__(self, key)
class HueAPIv1(SubRouter):
BASEHttpxURL = ""
BASE_URL = ""
class HueAPIv2(SubRouter):
@ -367,7 +401,7 @@ class HueAPIv2(SubRouter):
async def get_light(self, light_id: str, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/light/{light_id}")
async def set_light(
self,
@ -392,7 +426,7 @@ class HueAPIv2(SubRouter):
async def get_scenes(self):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("POST", "/resource/scene")
async def create_scene(self, **kwargs):
...
@ -402,12 +436,12 @@ class HueAPIv2(SubRouter):
async def get_scene(self, scene_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/scene/{scene_id}")
async def set_scene(self, scene_id: UUID, /, **kwargs):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("DELETE", "/resource/scene/{scene_id}")
async def delete_scene(self, scene_id: UUID, /):
...
@ -417,7 +451,7 @@ class HueAPIv2(SubRouter):
async def get_rooms(self):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("POST", "/resource/room")
async def create_room(self, **kwargs):
...
@ -427,18 +461,18 @@ class HueAPIv2(SubRouter):
async def get_room(self, room_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/room/{room_id}")
async def set_room(
self,
room_id: UUID,
/,
metadata: Optional[dict[str, str]] = None,
children: Optional[models.Identifier] = None,
children: Optional[models.Attributes.Identifier] = None,
):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("DELETE", "/resource/room/{room_id}")
async def delete_room(self, room_id: UUID):
...
@ -448,7 +482,7 @@ class HueAPIv2(SubRouter):
async def get_zones(self):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("POST", "/resource/zone")
async def create_zone(self, **kwargs):
...
@ -458,12 +492,12 @@ class HueAPIv2(SubRouter):
async def get_zone(self, zone_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/zone/{zone_id}")
async def set_zone(self, zone_id: UUID, /, **kwargs):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("DELETE", "/resource/zone/{zone_id}")
async def delete_zone(self, zone_id: UUID, /):
...
@ -478,7 +512,7 @@ class HueAPIv2(SubRouter):
async def get_bridge_home(self, bridge_home_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/bridge_home/{bridge_home_id}")
async def set_bridge_home(self, bridge_home_id: UUID, /, **kwargs):
...
@ -493,7 +527,7 @@ class HueAPIv2(SubRouter):
async def get_grouped_light(self, grouped_light_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/grouped_light/{grouped_light_id}")
async def set_grouped_light(self, grouped_light_id: UUID, /, **kwargs):
...
@ -508,7 +542,7 @@ class HueAPIv2(SubRouter):
async def get_device(self, device_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/device/{device_id}")
async def set_device(self, device_id: UUID, /, **kwargs):
...
@ -523,7 +557,7 @@ class HueAPIv2(SubRouter):
async def get_bridge(self, bridge_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/bridges/{bridge_id}")
async def set_bridge(self, bridge_id: UUID, /, **kwargs):
...
@ -538,7 +572,7 @@ class HueAPIv2(SubRouter):
async def get_device_power(self, device_power_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/device_power/{device_power_id}")
async def set_device_power(self, device_power_id: UUID, /, **kwargs):
...
@ -553,7 +587,7 @@ class HueAPIv2(SubRouter):
async def get_zigbee_connectivity(self, zigbee_connectivity_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/zigbee_connectivity/{zigbee_connectivity_id}")
async def set_zigbee_connectivity(self, zigbee_connectivity_id: UUID, /, **kwargs):
...
@ -568,7 +602,7 @@ class HueAPIv2(SubRouter):
async def get_zgb_connectivity(self, zgb_connectivity_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/zgb_connectivity/{zgb_connectivity_id}")
async def set_zgb_connectivity(self, zgb_connectivity_id: UUID, /, **kwargs):
...
@ -583,7 +617,7 @@ class HueAPIv2(SubRouter):
async def get_motion(self, motion_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/motion/{motion_id}")
async def set_motion(self, motion_id: UUID, /, **kwargs):
...
@ -598,7 +632,7 @@ class HueAPIv2(SubRouter):
async def get_temperature(self, temperature_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/temperature/{temperature_id}")
async def set_temperature(self, temperature_id: UUID, /, **kwargs):
...
@ -613,7 +647,7 @@ class HueAPIv2(SubRouter):
async def get_light_level(self, light_level_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/light_level/{light_level_id}")
async def set_light_level(self, light_level_id: UUID, /, **kwargs):
...
@ -628,7 +662,7 @@ class HueAPIv2(SubRouter):
async def get_button(self, button_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/button/{button_id}")
async def set_button(self, button_id: UUID, /, **kwargs):
...
@ -648,7 +682,7 @@ class HueAPIv2(SubRouter):
async def get_behavior_instances(self):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("POST", "/resource/behavior_instance")
async def create_behavior_instance(self, **kwargs):
...
@ -658,12 +692,12 @@ class HueAPIv2(SubRouter):
async def get_behavior_instance(self, behavior_instance_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/behavior_instance/{behavior_instance_id}")
async def set_behavior_instance(self, behavior_instance_id: UUID, /, **kwargs):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("DELETE", "/resource/behavior_instance/{behavior_instance_id}")
async def delete_behavior_instance(self, behavior_instance_id: UUID, /):
...
@ -673,7 +707,7 @@ class HueAPIv2(SubRouter):
async def get_geofence_clients(self):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("POST", "/resource/geofence_client")
async def create_geofence_client(self, **kwargs):
...
@ -683,12 +717,12 @@ class HueAPIv2(SubRouter):
async def get_geofence_client(self, geofence_client_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/geofence_client/{geofence_client_id}")
async def set_geofence_client(self, geofence_client_id: UUID, /, **kwargs):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("DELETE", "/resource/geofence_client/{geofence_client_id}")
async def delete_geofence_client(self, geofence_client_id: UUID, /):
...
@ -703,7 +737,7 @@ class HueAPIv2(SubRouter):
async def get_geolocation(self, geolocation_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/geolocation/{geolocation_id}")
async def set_geolocation(self, geolocation_id: UUID, /, **kwargs):
...
@ -713,7 +747,7 @@ class HueAPIv2(SubRouter):
async def get_entertainment_configurations(self):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("POST", "/resource/entertainment_configuration")
async def create_entertainment_configuration(self, **kwargs):
...
@ -729,7 +763,7 @@ class HueAPIv2(SubRouter):
):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route(
"PUT", "/resource/entertainment_configuration/{entertainment_configuration_id}"
)
@ -738,7 +772,7 @@ class HueAPIv2(SubRouter):
):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route(
"DELETE",
"/resource/entertainment_configuration/{entertainment_configuration_id}",
@ -760,7 +794,7 @@ class HueAPIv2(SubRouter):
async def get_entertainment(self, entertainment_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/entertainment/{entertainment_id}")
async def set_entertainment(self, entertainment_id: UUID, /, **kwargs):
...
@ -775,7 +809,7 @@ class HueAPIv2(SubRouter):
async def get_homekit(self, homekit_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/homekit/{homekit_id}")
async def set_homekit(
self,
@ -800,7 +834,7 @@ class HueAPIv2(SubRouter):
async def get_rotaries(self):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("PUT", "/resource/relative_rotary/{relative_rotary_id}")
async def set_rotary(self, relative_rotary_id: UUID, /, **kwargs):
...
@ -810,12 +844,12 @@ class HueAPIv2(SubRouter):
async def get_rotary(self, relative_rotary_id: UUID, /):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("POST", "/resource/relative_rotary")
async def create_rotary(self, **kwargs):
...
@ret_cls(models.Identifier)
@ret_cls(models.Attributes.Identifier)
@route("DELETE", "/resource/relative_rotary/{relative_rotary_id}")
async def delete_rotary(self, relative_rotary_id: UUID, /):
...
@ -930,76 +964,121 @@ class HueAPIv2(SubRouter):
class Router(HueAPIv2):
def __new__(cls, hue_api_key: str, bridge_ip: str = "", max_cache_size: int = 512):
def __new__(cls, hue_api_key: str, bridge_ip: str = "", max_cache_size: int = 10):
cls = super().__new__(cls, hue_api_key)
return cls
def __init__(
self, hue_api_key: str, bridge_ip: Optional[str] = None, max_cache_size=512
self,
hue_api_key: Optional[str] = None,
bridge_ip: Optional[str] = None,
max_cache_size=10,
):
super().__init__(hue_api_key)
with open("config.yaml", "r+") as f:
self.config = load(f, Loader=Loader)
super().__init__(hue_api_key or self.config.api_key)
self.cache = LRU(max_cache_size)
self._client = AsyncClient(headers=self._headers, verify=False)
self._subscription = None
self._bridge_ip = bridge_ip or ""
self._last_id = None
self.cache = LRU(maxsize=max_cache_size)
self._bridge_ip = bridge_ip or self.config.bridge_ip
self._tasks = []
self.lights = {}
self.bridges = {}
self.entertainment_configurations = {}
self.entertainments = {}
self.buttons = {}
self.rooms = {}
self.scenes = {}
self.zones = {}
self.rotaries = {}
self.geofences = {}
self.behavior_instances = {}
self.bridge_homes = {}
self.zgb_connectivities = {}
self.zigbee_connectivities = {}
def subscribe(self, *args, **kwargs):
if not self._subscription or self._subscription.done():
self._subscription = get_running_loop().create_task(self._subscribe())
self._subscription = self.new_task(self._subscribe(*args, **kwargs))
def run(self):
loop = new_event_loop()
try:
loop.run_until_complete(self._subscribe())
loop.run_until_complete(self._startup())
loop.run_forever()
except KeyboardInterrupt:
for t in self._tasks:
t.cancel()
for t in self.cache:
t.value.cancel()
print("Exiting..")
loop.stop()
loop.close()
def _parse_payload(self, payload: str):
_match = MSG_RE_TEXT.search(payload)
def new_task(self, coro):
self._tasks.append(t := get_running_loop().create_task(coro))
return t
async def _startup(self):
try:
loop = get_running_loop()
for k, v in self.config["aliases"].items():
fn = getattr(self, f"get_{k}")
objs = await fn()
for obj in objs:
alias = v.get(str(obj.id))
if alias:
ob = obj.__class__(id=obj.id)
getattr(self, k)[alias] = ob
setattr(self, alias, ob)
self.new_task(self._subscribe())
while loop.is_running():
await sleep(60)
except KeyboardInterrupt:
await gather(*self._tasks)
def _parse_payload(self, payload: bytes):
_match = MSG_RE_BYTES.search(payload)
if not _match:
return None
_events, _data, _id = [], "", ""
_parsed_events = []
if _data := _match.group("data"):
_events, _data, _id = [], b"", b""
if (_data := _match.group("data")) and (_id := _match.group("id")):
_events.extend(loads(_data))
elif _id := _match.group("id"):
self._last_id = _id
return
for _event in _events:
for _ent in _event["data"]:
self.cache.add(
event := Event(
id=_id,
object=TYPE_CACHE[_ent["type"]](**_ent),
type=_event["type"],
)
_event_id = _id.decode()
_event_type = _event["type"]
_object = TYPE_CACHE[_ent["type"]](**_ent)
event = Event(
id=_event_id,
object=_object,
type=_event_type,
)
_parsed_events.append(event)
if hasattr(self, f"on_{event.object.type}_{event.type}"):
get_running_loop().create_task(
getattr(self, f"on_{event.object.type}_{event.type}")(event)
self.cache.add(
get_running_loop().create_task(
getattr(self, f"on_{event.object.type}_{event.type}")(
_object
)
)
)
async def _subscribe(self, *args, **kwargs):
loop = get_running_loop()
if hasattr(self, "on_ready"):
loop.create_task(getattr(self, "on_ready")())
while loop.is_running():
stream = await self.listen_events(
headers={**self._headers, **{"Accept": "text/event-stream"}}
)
self.new_task(self.on_ready())
while get_running_loop().is_running():
try:
stream = await self.listen_events(
headers={**self._headers, **{"Accept": "text/event-stream"}}
)
async with stream as _iter:
async for msg in _iter.aiter_lines():
async for msg in _iter.aiter_bytes():
self._parse_payload(msg)
except (ReadTimeout, HTTPxReadTimeout, ConnectTimeout, ConnectError):
...
await sleep(1)

File diff suppressed because it is too large Load Diff