2022-03-09 20:48:56 +00:00
|
|
|
from enum import Enum, IntEnum
|
|
|
|
from io import BytesIO
|
|
|
|
from struct import pack, unpack
|
|
|
|
|
|
|
|
from .opcodes import RecvOps, SendOps
|
|
|
|
from .tools import to_string
|
|
|
|
|
|
|
|
|
|
|
|
class ByteBuffer(BytesIO):
|
|
|
|
"""Base class for packet write and read operations"""
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def encode(self, _bytes) -> 'ByteBuffer':
|
2022-03-09 20:48:56 +00:00
|
|
|
self.write(_bytes)
|
|
|
|
return self
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def encode_byte(self, value) -> 'ByteBuffer':
|
2022-03-09 20:48:56 +00:00
|
|
|
if isinstance(value, Enum):
|
|
|
|
value = value.value
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
self.write(bytearray([value]))
|
2022-03-09 20:48:56 +00:00
|
|
|
return self
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def encode_short(self, value) -> 'ByteBuffer':
|
2022-03-09 20:48:56 +00:00
|
|
|
self.write(pack("H", value))
|
|
|
|
return self
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def encode_int(self, value) -> 'ByteBuffer':
|
2022-03-09 20:48:56 +00:00
|
|
|
self.write(pack("I", value))
|
|
|
|
return self
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def encode_long(self, value) -> 'ByteBuffer':
|
2022-03-09 20:48:56 +00:00
|
|
|
self.write(pack("Q", value))
|
|
|
|
return self
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def encode_buffer(self, buffer) -> 'ByteBuffer':
|
2022-03-09 20:48:56 +00:00
|
|
|
self.write(buffer)
|
|
|
|
return self
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def skip(self, count) -> 'ByteBuffer':
|
|
|
|
self.write(bytearray(count))
|
2022-03-09 20:48:56 +00:00
|
|
|
return self
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def encode_string(self, string) -> 'ByteBuffer':
|
2022-03-09 20:48:56 +00:00
|
|
|
self.write(pack("H", len(string)))
|
|
|
|
|
|
|
|
for ch in string:
|
|
|
|
self.write(ch.encode())
|
|
|
|
|
|
|
|
return self
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def encode_fixed_string(self, string, length=13) -> 'ByteBuffer':
|
2022-03-09 20:48:56 +00:00
|
|
|
for i in range(length):
|
|
|
|
if i < len(string):
|
|
|
|
self.write(string[i].encode())
|
|
|
|
continue
|
|
|
|
|
|
|
|
self.encode_byte(0)
|
|
|
|
|
|
|
|
return self
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def encode_hex_string(self, string) -> 'ByteBuffer':
|
2022-03-09 20:48:56 +00:00
|
|
|
string = string.strip(" -")
|
2022-03-13 18:41:05 +00:00
|
|
|
self.write(bytearray(bytes.fromhex(string)))
|
2022-03-09 20:48:56 +00:00
|
|
|
return self
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def decode_byte(self) -> int:
|
2022-03-09 20:48:56 +00:00
|
|
|
return self.read(1)[0]
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def decode_bool(self) -> bool:
|
2022-03-09 20:48:56 +00:00
|
|
|
return bool(self.decode_byte())
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def decode_short(self) -> int:
|
2022-03-09 20:48:56 +00:00
|
|
|
return unpack("H", self.read(2))[0]
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def decode_int(self) -> int:
|
2022-03-09 20:48:56 +00:00
|
|
|
return unpack("I", self.read(4))[0]
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def decode_long(self) -> int:
|
2022-03-09 20:48:56 +00:00
|
|
|
return unpack("Q", self.read(8))[0]
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def decode_buffer(self, size) -> bytearray:
|
|
|
|
return bytearray(self.read(size))
|
2022-03-09 20:48:56 +00:00
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
def decode_fixed_string(self, size=13) -> str:
|
2022-03-13 05:43:37 +00:00
|
|
|
string = ""
|
|
|
|
for i in range(size):
|
|
|
|
if i == size - 1:
|
|
|
|
self.read(1)
|
2022-03-13 18:41:05 +00:00
|
|
|
break
|
2022-03-13 05:43:37 +00:00
|
|
|
|
|
|
|
string += self.read(1).decode()
|
|
|
|
|
2022-03-13 18:41:05 +00:00
|
|
|
return string
|
|
|
|
|
|
|
|
def decode_string(self) -> str:
|
2022-03-09 20:48:56 +00:00
|
|
|
length = self.decode_short()
|
|
|
|
string = ""
|
|
|
|
|
|
|
|
for _ in range(length):
|
|
|
|
string += self.read(1).decode()
|
|
|
|
|
|
|
|
return string
|
|
|
|
|
|
|
|
|
|
|
|
class Packet(ByteBuffer):
|
|
|
|
"""Packet class use in all send / recv opertions
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
----------
|
|
|
|
data: bytes
|
|
|
|
The initial data to load into the packet
|
|
|
|
op_code: :class:`OpCodes`
|
|
|
|
OpCode used to encode the first short onto the packet
|
|
|
|
op_codes: :class:`OpCodes`
|
|
|
|
Which enum to try to get the op_code from
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
_op_codes: type[IntEnum]
|
|
|
|
|
|
|
|
def __init__(self, data=None, op_code=None, raw=False):
|
|
|
|
self.op_code: IntEnum | int
|
|
|
|
|
|
|
|
if not data:
|
|
|
|
data = bytearray()
|
|
|
|
|
|
|
|
if isinstance(data, IntEnum):
|
|
|
|
if op_code and isinstance(op_code, bytearray):
|
|
|
|
op_code, data = data, op_code
|
|
|
|
else:
|
|
|
|
op_code, data = data, bytearray()
|
|
|
|
|
|
|
|
super().__init__(data)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def name(self):
|
|
|
|
if isinstance(self.op_code, IntEnum):
|
|
|
|
return self.op_code.name
|
|
|
|
|
|
|
|
return self.op_code
|
|
|
|
|
|
|
|
def to_array(self):
|
|
|
|
return self.getvalue()
|
|
|
|
|
|
|
|
def to_string(self):
|
|
|
|
return to_string(self.getvalue())
|
|
|
|
|
|
|
|
def __len__(self):
|
|
|
|
return len(self.getvalue())
|
|
|
|
|
|
|
|
@property
|
|
|
|
def length(self):
|
|
|
|
return len(self.getvalue())
|
|
|
|
|
|
|
|
|
|
|
|
class iPacket(Packet):
|
|
|
|
_op_codes = RecvOps
|
|
|
|
|
|
|
|
def __init__(self, data=None):
|
|
|
|
super().__init__(data=data)
|
|
|
|
self.op_code = self._op_codes(self.decode_short())
|
|
|
|
|
|
|
|
|
|
|
|
class oPacket(Packet):
|
|
|
|
_op_codes = SendOps
|
|
|
|
|
|
|
|
def __init__(self, op_code=None):
|
|
|
|
super().__init__(op_code=op_code)
|
|
|
|
self.op_code = op_code.value if hasattr(op_code, "value") else op_code
|
|
|
|
if self.op_code:
|
|
|
|
self.encode_short(self.op_code)
|
|
|
|
|
|
|
|
|
|
|
|
class PacketHandler:
|
|
|
|
|
|
|
|
def __init__(self, name, callback, op_code=None, **kwargs):
|
|
|
|
self.name = name
|
|
|
|
self.callback = callback
|
|
|
|
self.op_code = op_code
|
|
|
|
|
|
|
|
|
|
|
|
def packet_handler(op_code=None):
|
|
|
|
|
|
|
|
def wrap(func):
|
|
|
|
return PacketHandler(func.__name__, func, op_code=op_code)
|
|
|
|
|
|
|
|
return wrap
|