maple_clb/packet.py
2022-03-09 13:48:56 -07:00

177 lines
3.9 KiB
Python

from enum import Enum, IntEnum
from io import BytesIO
from struct import pack, unpack
from .opcodes import RecvOps, SendOps
from .tools import to_string
class ByteBuffer(BytesIO):
"""Base class for packet write and read operations"""
def encode(self, _bytes):
self.write(_bytes)
return self
def encode_byte(self, value):
if isinstance(value, Enum):
value = value.value
self.write(bytes([value]))
return self
def encode_short(self, value):
self.write(pack("H", value))
return self
def encode_int(self, value):
self.write(pack("I", value))
return self
def encode_long(self, value):
self.write(pack("Q", value))
return self
def encode_buffer(self, buffer):
self.write(buffer)
return self
def skip(self, count):
self.write(bytes(count))
return self
def encode_string(self, string):
self.write(pack("H", len(string)))
for ch in string:
self.write(ch.encode())
return self
def encode_fixed_string(self, string, length=13):
for i in range(length):
if i < len(string):
self.write(string[i].encode())
continue
self.encode_byte(0)
return self
def encode_hex_string(self, string):
string = string.strip(" -")
self.write(bytes.fromhex(string))
return self
def decode_byte(self):
return self.read(1)[0]
def decode_bool(self):
return bool(self.decode_byte())
def decode_short(self):
return unpack("H", self.read(2))[0]
def decode_int(self):
return unpack("I", self.read(4))[0]
def decode_long(self):
return unpack("Q", self.read(8))[0]
def decode_buffer(self, size):
return self.read(size)
def decode_string(self):
length = self.decode_short()
string = ""
for _ in range(length):
string += self.read(1).decode()
return string
class Packet(ByteBuffer):
"""Packet class use in all send / recv opertions
Parameters
----------
data: bytes
The initial data to load into the packet
op_code: :class:`OpCodes`
OpCode used to encode the first short onto the packet
op_codes: :class:`OpCodes`
Which enum to try to get the op_code from
"""
_op_codes: type[IntEnum]
def __init__(self, data=None, op_code=None, raw=False):
self.op_code: IntEnum | int
if not data:
data = bytearray()
if isinstance(data, IntEnum):
if op_code and isinstance(op_code, bytearray):
op_code, data = data, op_code
else:
op_code, data = data, bytearray()
super().__init__(data)
@property
def name(self):
if isinstance(self.op_code, IntEnum):
return self.op_code.name
return self.op_code
def to_array(self):
return self.getvalue()
def to_string(self):
return to_string(self.getvalue())
def __len__(self):
return len(self.getvalue())
@property
def length(self):
return len(self.getvalue())
class iPacket(Packet):
_op_codes = RecvOps
def __init__(self, data=None):
super().__init__(data=data)
self.op_code = self._op_codes(self.decode_short())
class oPacket(Packet):
_op_codes = SendOps
def __init__(self, op_code=None):
super().__init__(op_code=op_code)
self.op_code = op_code.value if hasattr(op_code, "value") else op_code
if self.op_code:
self.encode_short(self.op_code)
class PacketHandler:
def __init__(self, name, callback, op_code=None, **kwargs):
self.name = name
self.callback = callback
self.op_code = op_code
def packet_handler(op_code=None):
def wrap(func):
return PacketHandler(func.__name__, func, op_code=op_code)
return wrap