maple_clb/clb/packet.py
2022-03-17 14:52:04 +00:00

188 lines
4.4 KiB
Python

from enum import Enum, IntEnum
from io import BytesIO
from struct import pack, unpack
from .opcodes import RecvOps, SendOps
from .tools import to_string
class ByteBuffer(BytesIO):
"""Base class for packet write and read operations"""
def encode(self, _bytes) -> 'ByteBuffer':
self.write(_bytes)
return self
def encode_byte(self, value) -> 'ByteBuffer':
if isinstance(value, Enum):
value = value.value
self.write(bytearray([value]))
return self
def encode_short(self, value) -> 'ByteBuffer':
self.write(pack("H", value))
return self
def encode_int(self, value) -> 'ByteBuffer':
self.write(pack("I", value))
return self
def encode_long(self, value) -> 'ByteBuffer':
self.write(pack("Q", value))
return self
def encode_buffer(self, buffer) -> 'ByteBuffer':
self.write(buffer)
return self
def skip(self, count) -> 'ByteBuffer':
self.write(bytearray(count))
return self
def encode_string(self, string) -> 'ByteBuffer':
self.write(pack("H", len(string)))
for ch in string:
self.write(ch.encode())
return self
def encode_fixed_string(self, string, length=13) -> 'ByteBuffer':
for i in range(length):
if i < len(string):
self.write(string[i].encode())
continue
self.encode_byte(0)
return self
def encode_hex_string(self, string) -> 'ByteBuffer':
string = string.strip(" -")
self.write(bytearray(bytes.fromhex(string)))
return self
def decode_byte(self) -> int:
return self.read(1)[0]
def decode_bool(self) -> bool:
return bool(self.decode_byte())
def decode_short(self) -> int:
return unpack("H", self.read(2))[0]
def decode_int(self) -> int:
return unpack("I", self.read(4))[0]
def decode_long(self) -> int:
return unpack("Q", self.read(8))[0]
def decode_buffer(self, size) -> bytearray:
return bytearray(self.read(size))
def decode_fixed_string(self, size=13) -> str:
string = ""
for i in range(size):
if i == size - 1:
self.read(1)
break
string += self.read(1).decode()
return string
def decode_string(self) -> str:
length = self.decode_short()
string = ""
for _ in range(length):
string += self.read(1).decode()
return string
class Packet(ByteBuffer):
"""Packet class use in all send / recv opertions
Parameters
----------
data: bytes
The initial data to load into the packet
op_code: :class:`OpCodes`
OpCode used to encode the first short onto the packet
op_codes: :class:`OpCodes`
Which enum to try to get the op_code from
"""
_op_codes: type[IntEnum]
def __init__(self, data=None, op_code=None, raw=False):
self.op_code: IntEnum | int
if not data:
data = bytearray()
if isinstance(data, IntEnum):
if op_code and isinstance(op_code, bytearray):
op_code, data = data, op_code
else:
op_code, data = data, bytearray()
super().__init__(data)
@property
def name(self):
if isinstance(self.op_code, IntEnum):
return self.op_code.name
return self.op_code
def to_array(self):
return self.getvalue()
def to_string(self):
return to_string(self.getvalue())
def __len__(self):
return len(self.getvalue())
@property
def length(self):
return len(self.getvalue())
class iPacket(Packet):
_op_codes = RecvOps
def __init__(self, data=None):
super().__init__(data=data)
self.op_code = self._op_codes(self.decode_short())
class oPacket(Packet):
_op_codes = SendOps
def __init__(self, op_code=None):
super().__init__(op_code=op_code)
self.op_code = op_code.value if hasattr(op_code, "value") else op_code
if self.op_code:
self.encode_short(self.op_code)
class PacketHandler:
def __init__(self, name, callback, op_code=None, **kwargs):
self.name = name
self.callback = callback
self.op_code = op_code
def packet_handler(op_code=None):
def wrap(func):
return PacketHandler(func.__name__, func, op_code=op_code)
return wrap