6
0
mirror of https://github.com/avast/ioc synced 2024-06-27 01:08:32 +00:00
ioc-collection/SmarterCoffee/extras/bin/easy_smarter.py
2020-09-25 09:54:52 +02:00

146 lines
3.6 KiB
Python

#!/usr/bin/python2.7
import socket,sys,time,logging
class easy_smarter():
def __init__(self, ip='192.168.4.1', port = 2081):
self.ip = ip
self.port = port
self.sock = None
def connect(self):
# create an ipv4 (AF_INET) socket object using the tcp protocol (SOCK_STREAM)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.settimeout(3)
# connect the client
self.sock.connect((self.ip, self.port))
self.sock.settimeout(.5) # 5 seconds. Set this after connecting.
return True
except Exception,e:
logging.warning("Could not connect %s" %(str(e)))
return False
def read_data(self):
resp = bytearray()
timeout = 10
while True:
try:
response = self.sock.recv(4096)
resp.extend(bytearray(response))
if resp[len(resp)-1]==0x7E:
return resp
except socket.timeout as exc:
logging.warning('timed out waiting for data')
timeout -=1
if timeout<=0:
return bytearray()
def send_command(self, indata, waitforresponse = True):
self.sock.send(bytearray(indata))
if waitforresponse:
return self.read_data()
else:
return bytearray()
def upload_firmware(self, filename, display = sys.stdout.write):
fw = None
total =0
try:
# try to open firmware file
fw = open(filename,"rb")
fw.seek(0,2)
total = fw.tell()/256
if (fw.tell()%256)>0:
total+=1
fw.seek(0)
except Exception, e:
logging.error("Could not load firmware from file %s ", str(e))
return False
display ("> Sending lead-in packet\n")
self.send_command([0x6E, 0x7E], True)
crc = 0
block =0
result=0
display ("> Starting update of %d blocks 256 bytes each\n" %(total))
# read chunks of 256 bytes and construct a packet
while True:
retry = 5
chunk = bytearray(fw.read(256))
if not chunk:
break
#chunks are always of size of 256
chunk.extend( bytearray([0]*(256-len(chunk))) )
# now for each chunk compute 'CRC'
for i in range(256):
crc = (crc + chunk[i]) % 0xFFFFFFFF
# retry loop
while True:
size = len(chunk)
packet = bytearray()
packet.extend(bytearray([0x6f, block+1, (size>>8) & 0xff, (size) & 0xff, 0x7d]))
packet.extend(chunk)
packet.extend(bytearray([0x7e,0x7e,0x7e]))
res = self.send_command(packet)
# respond packet should always end like this
result = 0
if len(res)==3 and res[2]==0x7E:
major = res[0]
if major==3:
result = res[1]
else:
result = res[0]<<4 & res[1]
if result!=1:
retry-=1
if retry>0:
time.sleep(1)
continue
display("\nError writing page no. %d\n" % (block+1))
break
#
if block%4==0 and result==1:
display(".")
if result==1:
break
#sys.stdout.write("sending page %d of %d " %(block+1, total))
if result!=1:
break
block +=1
print "\n"
if result==1:
time.sleep(1)
# sending CRC and closing block
res = self.send_command([0x70, (crc>>24) & 0xFF, (crc>>16) & 0xFF,(crc>>8) & 0xFF, crc & 0xFF, 0x7E], True)
if len(res)==3 and res[2]==0x7E:
major = res[0]
if major==3:
result = res[1]
else:
result = res[0]<<4 & res[1]
if result==1:
display("\nUpdate complete!\n")
else:
display ("\nError finishing firmware %d.\n" % (result))
else:
display ("\nError writing firmware %d.\n" %(result))
return result
def disconnect(self):
self.sock.close()