croxycontin/croxy.py

437 lines
12 KiB
Python
Executable File

#!/usr/bin/python3
"""Croxy: IRC encrypting proxy.
See README.md at: https://github.com/grahamking/croxy
---
Copyright 2013 Graham King
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
For full license details see <http://www.gnu.org/licenses/>.
"""
import sys
import getpass
import socketserver
import socket
import threading
import base64
import binascii
import ssl
import os
import typer
DEFAULT_SALT = b"CROXYSALT IS A LOW SODIUM SALT" # For pbkdf2 only
PBKDF2_ITERATIONS = 5000
def main(host:str="ircd.chat", port:int=6697, password:str=None, listen_port:int=6667):
if not password:
password = getpass.getpass("Today's password: ")
print("Now point your IRC client at: localhost:{}".format(listen_port))
local = ClientServer(('localhost', listen_port),
ClientHandler,
host,
port,
password)
try:
local.serve_forever()
except KeyboardInterrupt:
print("Bye")
return 0
class ClientServer(socketserver.TCPServer):
allow_reuse_address = True
def __init__(self, addr, handler_class, host, port, password):
super().__init__(addr, handler_class)
self.host = host
self.port = port
self.password = password
class ClientHandler(socketserver.StreamRequestHandler):
"""Handles connection from user's IRC client"""
def __init__(self, request, addr, server):
"""server: Instance of ClientServer."""
self.server = server
self.host = server.host
self.port = server.port
self.password = server.password
self.local_f = None
super().__init__(request, addr, server)
def handle(self):
"""Called by socketserver.TCPServer once for each request,
We only expect a single request (IRC client) at a time.
"""
self.local_f = self.wfile
remote_f = self.connect_remote()
if not remote_f:
# Error connecting to IRC. Abort.
self.server.server_close()
return
while 1:
line_bytes = self.rfile.readline()
line = decode(line_bytes)
if not line:
print("EOF")
break
handle_client_line(line, self.password, remote_f)
def connect_remote(self):
"""Connect to IRC server"""
remote = ServerWorker(
self.host,
self.port,
self.password,
self.local_f)
if not remote.remote_conn:
# Connect failed
return None
remote.start()
return remote.remote_f
def handle_client_line(line, password, remote_f):
"""Handle a single line from the client.
line: str
remote_f: file
"""
if line.startswith("PRIVMSG"):
prefix, body = parse_out(line)
ciphertext = croxy_encrypt(body, password)
line = (prefix + ":" + ciphertext + "\r\n")
print("> ", line.strip())
if remote_f:
as_bytes = line.encode('utf8')
remote_f.write(as_bytes)
remote_f.flush()
class ServerWorker(threading.Thread):
"""Connect to the real IRC server."""
def __init__(self, host, port, password, local_f):
"""
host: IRC server to connect to.
port: Port IRC server is listening on.
password: Password for symmetric encryption.
local_f: File-like object connected to IRC _client_.
"""
super().__init__()
print("Connecting to: {}:{}".format(host, port))
self.host = host
self.port = port
self.password = password
self.local_f = local_f
sock = socket.create_connection((host, port))
try:
ssock = ssl.wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLSv1)
msg = ("TLS socket connection established. {openssl}\n"
"Cipher: {cipher}\n"
"Server certificate (not checked):\n{cert}\n")
print(msg.format(
openssl=ssl.OPENSSL_VERSION,
cipher=ssock.cipher(),
cert=ssock.getpeercert()
))
self.remote_conn = ssock
except ssl.SSLError as exc:
print("SSLError: {}".format(exc))
print("Could not establish TLS/SSL connection. Are you sure "
"port {} supports TLSv1?".format(port))
self.remote_conn = None # Stops the program
return
# Set socket file to binary, handle encoding ourselves
self.remote_f = self.remote_conn.makefile(mode='rwb')
def run(self):
"""Thread main method."""
while 1:
line = decode(self.remote_f.readline())
if not line:
print("SERVER EOF")
break
try:
handle_server_line(line, self.password, self.local_f)
except CloseException:
print("CLIENT EOF")
break
self.remote_conn.close()
def handle_server_line(line, password, local_f):
"""Handle a single IRC line from the server.
"""
print("< ", line.strip())
prefix, command, args = parse_in(line)
if command == "PRIVMSG":
start = ":" + prefix + " " + command + " " + args[0] + " :"
body = args[1] # This is the message
try:
body = croxy_decrypt(body, password)
body = body.replace("\r", "<CR>")
body = body.replace("\n", "<LF>")
except NotEncrypted:
body = "(UNENCRYPTED) " + body
line = start + body + "\r\n"
try:
local_f.write(line.encode('utf8'))
local_f.flush()
except ValueError:
raise CloseException()
def decode(line):
"""Takes bytes and returns unicode. Tries utf8 and iso-8859-1."""
try:
return str(line, 'utf8')
except UnicodeDecodeError:
return str(line, 'iso-8859-1')
except TypeError:
# Already unicode
return line
def mpad(msg, size):
"""Pad a byte string to multiple of size bytes. """
amount = size - len(msg) % size
return msg + b'\0' * amount
def parse_out(line):
"""Parses an outoing IRC message into prefix and body.
e.g: "PRIVMSG #test :testing" ->
prefix="PRIVMSG #test ", body="testing"
Outgoing messages are simpler in format than incoming messages.
"""
parts = line.strip().split(":")
prefix = parts[0]
body = ":".join(parts[1:])
return prefix, body
def parse_in(line):
"""Parse an incoming IRC message."""
prefix = ''
trailing = []
if not line:
print("Bad IRC message: ", line)
return None
if line[0] == ':':
prefix, line = line[1:].split(' ', 1)
if line.find(' :') != -1:
line, trailing = line.split(' :', 1)
args = line.split()
args.append(trailing)
else:
args = line.split()
command = args.pop(0)
return prefix, command, args
class NotEncrypted(Exception):
"""Is not an encrypted message"""
class CloseException(Exception):
"""Client or server closed connection"""
## CRYPTO LIBRARY WRAPPERS
# This is our only entry points to the next section.
def croxy_encrypt(msg, key, discard=False):
"""AES-256 encrypt the msg (str) with key (str).
Returns base64 encoded (str).
"""
msg = msg.encode('utf8')
msg = mpad(msg, 32)
derived = croxy_pbkdf2(key)
iv = os.urandom(16)
flags = 0b0000000
if discard:
flags = flags | FLAG_DISCARD
from Crypto.Cipher import AES
cipher = AES.new(derived, AES.MODE_CBC, iv)
sec = cipher.encrypt(msg)
return str(base64.b64encode(iv + sec), 'ascii')
def croxy_decrypt(msg, key):
"""AES-256 decrypt the msg (str) with key (str).
Return a str (unicode).
"""
if isinstance(msg, str):
try:
msg = msg.encode('ascii')
except UnicodeEncodeError:
# If it's not ascii, then it's not base64, so not encrypted
raise NotEncrypted()
if len(msg) < 64:
raise NotEncrypted()
try:
sec = base64.b64decode(msg)
except binascii.Error:
raise NotEncrypted()
derived = croxy_pbkdf2(key)
iv = sec[:16]
sec = sec[16:]
from Crypto.Cipher import AES
cipher = AES.new(derived, AES.MODE_CBC, iv)
try:
clear = str(cipher.decrypt(sec), "utf8")
except (ValueError, AssertionError):
raise NotEncrypted()
return clear.strip('\0')
def croxy_pbkdf2(key, iterations=PBKDF2_ITERATIONS, salt=DEFAULT_SALT):
"""32-bit PBKDF2 derived from 'key'"""
bkey = key.encode("utf8")
dklen = 32
# Use our own from Django (inline below)
derived = pbkdf2(bkey, salt, iterations, dklen=dklen)
return derived
# pbkdf2 and support function (_fast_hmac, _bin_to_long, _long_to_bin)
# are from Django (git revision bc02a96).
#
# https://github.com/django/django/blob/master/django/utils/crypto.py
#
# The only changes are:
# - two force_bytes lines commented out (we pass bytes)
# - xrange -> range (python3 upgrade)
#
import hashlib
import operator
import struct
from functools import reduce
def pbkdf2(password, salt, iterations, dklen=0, digest=None):
"""
Implements PBKDF2 as defined in RFC 2898, section 5.2
HMAC+SHA256 is used as the default pseudo random function.
Right now 10,000 iterations is the recommended default which takes
100ms on a 2.2Ghz Core 2 Duo. This is probably the bare minimum
for security given 1000 iterations was recommended in 2001. This
code is very well optimized for CPython and is only four times
slower than openssl's implementation.
"""
assert iterations > 0
if not digest:
digest = hashlib.sha256
#password = force_bytes(password)
#salt = force_bytes(salt)
hlen = digest().digest_size
if not dklen:
dklen = hlen
if dklen > (2 ** 32 - 1) * hlen:
raise OverflowError('dklen too big')
l = -(-dklen // hlen)
r = dklen - (l - 1) * hlen
hex_format_string = "%%0%ix" % (hlen * 2)
def F(i):
def U():
u = salt + struct.pack(b'>I', i)
for j in range(int(iterations)):
u = _fast_hmac(password, u, digest).digest()
yield _bin_to_long(u)
return _long_to_bin(reduce(operator.xor, U()), hex_format_string)
T = [F(x) for x in range(1, l + 1)]
return b''.join(T[:-1]) + T[-1][:r]
_trans_5c = bytearray([(x ^ 0x5C) for x in range(256)])
_trans_36 = bytearray([(x ^ 0x36) for x in range(256)])
def _bin_to_long(x):
"""
Convert a binary string into a long integer
This is a clever optimization for fast xor vector math
"""
return int(binascii.hexlify(x), 16)
def _long_to_bin(x, hex_format_string):
"""
Convert a long integer into a binary string.
hex_format_string is like "%020x" for padding 10 characters.
"""
return binascii.unhexlify((hex_format_string % x).encode('ascii'))
def _fast_hmac(key, msg, digest):
"""
A trimmed down version of Python's HMAC implementation.
This function operates on bytes.
"""
dig1, dig2 = digest(), digest()
if len(key) > dig1.block_size:
key = digest(key).digest()
key += b'\x00' * (dig1.block_size - len(key))
dig1.update(key.translate(_trans_36))
dig1.update(msg)
dig2.update(key.translate(_trans_5c))
dig2.update(dig1.digest())
return dig2
if __name__ == "__main__":
sys.exit(typer.run(main))