from enum import Enum, IntEnum from io import BytesIO from struct import pack, unpack from .opcodes import RecvOps, SendOps from .tools import to_string class ByteBuffer(BytesIO): """Base class for packet write and read operations""" def encode(self, _bytes): self.write(_bytes) return self def encode_byte(self, value): if isinstance(value, Enum): value = value.value self.write(bytes([value])) return self def encode_short(self, value): self.write(pack("H", value)) return self def encode_int(self, value): self.write(pack("I", value)) return self def encode_long(self, value): self.write(pack("Q", value)) return self def encode_buffer(self, buffer): self.write(buffer) return self def skip(self, count): self.write(bytes(count)) return self def encode_string(self, string): self.write(pack("H", len(string))) for ch in string: self.write(ch.encode()) return self def encode_fixed_string(self, string, length=13): for i in range(length): if i < len(string): self.write(string[i].encode()) continue self.encode_byte(0) return self def encode_hex_string(self, string): string = string.strip(" -") self.write(bytes.fromhex(string)) return self def decode_byte(self): return self.read(1)[0] def decode_bool(self): return bool(self.decode_byte()) def decode_short(self): return unpack("H", self.read(2))[0] def decode_int(self): return unpack("I", self.read(4))[0] def decode_long(self): return unpack("Q", self.read(8))[0] def decode_buffer(self, size): return self.read(size) def decode_fixed_string(self, size=13): string = "" for i in range(size): if i == size - 1: self.read(1) return string string += self.read(1).decode() def decode_string(self): length = self.decode_short() string = "" for _ in range(length): string += self.read(1).decode() return string class Packet(ByteBuffer): """Packet class use in all send / recv opertions Parameters ---------- data: bytes The initial data to load into the packet op_code: :class:`OpCodes` OpCode used to encode the first short onto the packet op_codes: :class:`OpCodes` Which enum to try to get the op_code from """ _op_codes: type[IntEnum] def __init__(self, data=None, op_code=None, raw=False): self.op_code: IntEnum | int if not data: data = bytearray() if isinstance(data, IntEnum): if op_code and isinstance(op_code, bytearray): op_code, data = data, op_code else: op_code, data = data, bytearray() super().__init__(data) @property def name(self): if isinstance(self.op_code, IntEnum): return self.op_code.name return self.op_code def to_array(self): return self.getvalue() def to_string(self): return to_string(self.getvalue()) def __len__(self): return len(self.getvalue()) @property def length(self): return len(self.getvalue()) class iPacket(Packet): _op_codes = RecvOps def __init__(self, data=None): super().__init__(data=data) self.op_code = self._op_codes(self.decode_short()) class oPacket(Packet): _op_codes = SendOps def __init__(self, op_code=None): super().__init__(op_code=op_code) self.op_code = op_code.value if hasattr(op_code, "value") else op_code if self.op_code: self.encode_short(self.op_code) class PacketHandler: def __init__(self, name, callback, op_code=None, **kwargs): self.name = name self.callback = callback self.op_code = op_code def packet_handler(op_code=None): def wrap(func): return PacketHandler(func.__name__, func, op_code=op_code) return wrap