core: add mux-relay and refactor multiplexing

This commit is contained in:
Micooz 2018-02-03 21:38:47 +08:00
parent 1ec6cfca94
commit 8d0fa29891
8 changed files with 356 additions and 145 deletions

@ -1,4 +1,4 @@
import {Middleware, cleanup} from '../middleware';
import {Middleware} from '../middleware';
test('Middleware#constructor', () => {
expect(() => new Middleware({'name': 'unknown-preset'})).toThrow();
@ -21,7 +21,3 @@ test('Middleware#getImplement', () => {
const middleware = new Middleware({'name': 'ss-base'});
expect(middleware.getImplement()).toBeDefined();
});
test('cleanup', () => {
expect(() => cleanup()).not.toThrow();
});

@ -4,13 +4,11 @@ import net from 'net';
import tls from 'tls';
import ws from 'ws';
import LRU from 'lru-cache';
import uniqueId from 'lodash.uniqueid';
import * as MiddlewareManager from './middleware';
import {Balancer} from './balancer';
import {Config} from './config';
import {Multiplexer} from './multiplexer';
import {Relay} from './relay';
import {dumpHex, logger} from '../utils';
import {MuxRelay} from './mux-relay';
import {dumpHex, getRandomInt, logger} from '../utils';
import {http, socks, tcp} from '../proxies';
export class Hub {
@ -21,9 +19,9 @@ export class Hub {
_udpServer = null;
_mux = null;
_tcpRelays = new Map(/* id: <Relay> */);
_tcpRelays = new Map(/* id: <relay> */);
_muxRelays = new Map(/* id: <MuxRelay> */);
_udpRelays = null; // LRU cache
@ -35,15 +33,17 @@ export class Hub {
dispose: (key, relay) => relay.destroy(),
maxAge: 1e5
});
this._mux = new Multiplexer();
}
terminate(callback) {
// relays
this._udpRelays.reset();
if (__MUX__) {
this._muxRelays.forEach((relay) => relay.destroy());
this._muxRelays.clear();
}
this._tcpRelays.forEach((relay) => relay.destroy());
this._tcpRelays.clear();
MiddlewareManager.cleanup();
// balancer
if (__IS_CLIENT__) {
Balancer.destroy();
@ -181,8 +181,11 @@ export class Hub {
const key = `${address}:${port}`;
let relay = relays.get(key);
if (relay === undefined) {
const remoteInfo = {host: address, port: port};
relay = new Relay({transport: 'udp', presets: __UDP_PRESETS__, context: server, remoteInfo});
const context = {
socket: server,
remoteInfo: {host: address, port: port}
};
relay = this._createUdpRelay(context);
relay.init({proxyRequest});
relay.on('close', function onRelayClose() {
// relays.del(key);
@ -225,40 +228,108 @@ export class Hub {
});
}
_onConnection(context, proxyRequest = null) {
logger.verbose(`[hub] [${context.remoteAddress}:${context.remotePort}] connected`);
_onConnection(socket, proxyRequest = null) {
logger.verbose(`[hub] [${socket.remoteAddress}:${socket.remotePort}] connected`);
if (__IS_CLIENT__) {
this._switchServer();
}
const remoteInfo = {host: context.remoteAddress, port: context.remotePort};
const relay = this._createRelay(context, remoteInfo);
relay.init({proxyRequest});
relay.id = uniqueId() | 0;
relay.on('close', () => this._tcpRelays.delete(relay.id));
this._tcpRelays.set(relay.id, relay);
const context = {
socket,
proxyRequest,
remoteInfo: {
host: socket.remoteAddress,
port: socket.remotePort
}
};
let muxRelay = null;
if (__MUX__) {
__IS_CLIENT__ ? this._mux.couple({relay, remoteInfo, proxyRequest}) : this._mux.decouple({relay, remoteInfo});
if (__IS_CLIENT__) {
// get or create a mux relay
muxRelay = this.getMuxRelay() || this._createMuxRelay(context);
if (!muxRelay.isOutboundReady()) {
muxRelay.init({proxyRequest});
} else {
proxyRequest.onConnected();
}
// add mux relay instance to context
Object.assign(context, {muxRelay});
} else {
Object.assign(context, {getMuxRelay: this.getMuxRelay.bind(this)});
}
}
const relay = this._createRelay(context);
relay.init({proxyRequest});
relay.on('close', () => this.onRelayClose(relay));
if (__MUX__) {
if (__IS_CLIENT__) {
muxRelay.addSubRelay(relay);
} else {
this._muxRelays.set(relay.id, relay);
}
}
this._tcpRelays.set(relay.id, relay);
}
_createRelay(context, remoteInfo) {
_createRelay(context) {
const props = {
context: context,
remoteInfo: remoteInfo,
transport: __TRANSPORT__,
presets: __PRESETS__
};
if (__MUX__) {
if (__IS_CLIENT__) {
return new Relay({...props, presets: []});
return new Relay({...props, transport: 'mux', presets: []});
} else {
return new Relay({...props, isMux: true});
return new MuxRelay(props);
}
} else {
return new Relay(props);
}
}
_createUdpRelay(context) {
return new Relay({transport: 'udp', context, presets: __UDP_PRESETS__});
}
// client only
_createMuxRelay(context) {
const relay = new MuxRelay({transport: __TRANSPORT__, context, presets: __PRESETS__});
relay.on('close', () => this.onRelayClose(relay));
this._muxRelays.set(relay.id, relay);
logger.info(`[mux-${relay.id}] create mux connection, total: ${this._muxRelays.size}`);
return relay;
}
getMuxRelay() {
const relays = this._muxRelays;
const concurrency = relays.size;
if (concurrency < 1) {
return null;
}
if (__IS_CLIENT__ && concurrency < __MUX_CONCURRENCY__ && getRandomInt(0, 1) === 0) {
return null;
}
return relays.get([...relays.keys()][getRandomInt(0, concurrency - 1)]);
}
onRelayClose(relay) {
if (relay instanceof MuxRelay) {
relay.destroy();
}
if (__MUX__ && __IS_CLIENT__) {
const ctx = relay.getContext();
if (ctx && ctx.muxRelay) {
ctx.muxRelay.destroySubRelay(relay.id);
}
}
this._tcpRelays.delete(relay.id);
this._muxRelays.delete(relay.id);
}
_switchServer() {
const server = Balancer.getFastest();
if (server) {

@ -121,12 +121,3 @@ export class Middleware extends EventEmitter {
}
}
/**
* destroy cached presets when program exit()
*/
export function cleanup() {
for (const preset of staticPresetCache.values()) {
preset.onDestroy();
}
}

188
src/core/mux-relay.js Normal file

@ -0,0 +1,188 @@
import {Relay} from './relay';
import {logger} from '../utils';
import {
TcpInbound, TcpOutbound,
TlsInbound, TlsOutbound,
WsInbound, WsOutbound,
MuxInbound, MuxOutbound,
} from '../transports';
import {
MUX_DATA_FRAME,
MUX_NEW_CONN,
MUX_CLOSE_CONN,
} from '../presets/defs';
export class MuxRelay extends Relay {
static allSubRelays = new Map();
_subRelays = new Map();
// overwrites
getBounds(transport) {
const mapping = {
'tcp': [TcpInbound, TcpOutbound],
'tls': [TlsInbound, TlsOutbound],
'ws': [WsInbound, WsOutbound],
};
const [Inbound, Outbound] = __IS_CLIENT__ ? [MuxInbound, mapping[transport][1]] : [mapping[transport][0], MuxOutbound];
return {Inbound, Outbound};
}
onBroadcast(action) {
switch (action.type) {
case MUX_NEW_CONN:
return this.onNewSubConn(action.payload);
case MUX_DATA_FRAME:
return this.onDataFrame(action.payload);
case MUX_CLOSE_CONN:
return this.onSubConnCloseByProtocol(action.payload);
}
this._inbound && this._inbound.onBroadcast(action);
this._outbound && this._outbound.onBroadcast(action);
}
preparePresets(presets) {
const first = presets[0];
// add "mux" preset to the top if it's a mux relay
if (!first || first.name !== 'mux') {
presets = [{'name': 'mux'}].concat(presets);
}
return presets;
}
destroy() {
super.destroy();
const subRelays = this.getSubRelays();
if (subRelays) {
logger.info(`[mux-${this.id}] connection destroyed, cleanup ${subRelays.size} sub connections`);
// cleanup associate relays
for (const relay of subRelays.values()) {
relay.destroy();
}
subRelays.clear();
this._subRelays = null;
}
}
// events
onNewSubConn({cid, host, port}) {
// const muxRelay = this;
const muxRelay = this._ctx.getMuxRelay();
if (muxRelay) {
const context = {
socket: this._ctx.socket,
remoteInfo: this._ctx.remoteInfo,
cid,
muxRelay, // NOTE: associate the mux relay here
};
const relay = new Relay({transport: 'mux', context});
const proxyRequest = {
host: host,
port: port,
onConnected: () => {
// logger.debug(`[mux-${muxRelay.id}] flush ${relay.__pendingFrames.length} pending frames`);
for (const frame of relay.__pendingFrames) {
relay.decode(frame);
}
relay.__pendingFrames = null;
}
};
function onClose() {
const relay = muxRelay._getSubRelay(cid);
if (relay) {
muxRelay.destroySubRelay(cid);
logger.debug(`[mux-${muxRelay.id}] sub connection(cid=${cid}) closed by self, remains: ${muxRelay.getSubRelays().size}`);
}
// else {
// logger.warn(`[mux-${muxRelay.id}] fail to close sub connection by self, no such sub connection(cid=${cid})`);
// }
}
relay.__pendingFrames = [];
relay.init({proxyRequest});
// NOTE: here we should replace relay.id to cid
relay.id = cid;
relay.on('close', onClose);
// create relations between mux relay and its sub relays,
// when mux relay destroyed, all sub relays should be destroyed as well.
muxRelay.addSubRelay(relay);
logger.debug(`[mux-${muxRelay.id}] create sub connection(cid=${relay.id}), total: ${muxRelay.getSubRelays().size}`);
return relay;
} else {
logger.warn(`[mux-${muxRelay.id}] cannot create new sub connection due to no mux connection are available`);
}
}
onDataFrame({cid, data}) {
const relay = MuxRelay.allSubRelays.get(cid);
if (!relay) {
logger.error(`[mux-${this.id}] fail to dispatch data frame(size=${data.length}), no such sub connection(cid=${cid})`);
return;
}
if (__IS_CLIENT__ || relay.isOutboundReady()) {
relay.decode(data);
} else {
// TODO: find a way to avoid using relay._pendingFrames
// cache data frames to the array
// before sub relay(newly created) established connection to destination
relay.__pendingFrames = [];
relay.__pendingFrames.push(data);
}
}
onSubConnCloseByProtocol({cid}) {
const relay = MuxRelay.allSubRelays.get(cid);
if (relay) {
this._removeSubRelay(cid);
relay.destroy();
logger.debug(`[mux-${this.id}] sub connection(cid=${cid}) closed by protocol`);
}
// else {
// logger.warn(`[mux-${this.id}] fail to close sub connection by protocol, no such sub connection(cid=${cid})`);
// }
}
// methods
addSubRelay(relay) {
this._subRelays.set(relay.id, relay);
MuxRelay.allSubRelays.set(relay.id, relay);
}
getSubRelays() {
return this._subRelays;
}
destroySubRelay(cid) {
const relay = this._getSubRelay(cid);
if (relay) {
this.encode(Buffer.alloc(0), {cid, isClosing: true});
this._removeSubRelay(cid);
relay.destroy();
}
// else {
// logger.warn(`[mux-${this.id}] fail to close sub connection by calling destroySubRelay(), no such sub connection(cid=${cid})`);
// }
}
_removeSubRelay(cid) {
this._subRelays.delete(cid);
MuxRelay.allSubRelays.delete(cid);
}
_getSubRelay(cid) {
if (this._subRelays) {
return this._subRelays.get(cid);
}
}
}

@ -1,4 +1,5 @@
import EventEmitter from 'events';
import uniqueId from 'lodash.uniqueid';
import {Pipe} from './pipe';
import {DNSCache} from './dns-cache';
import {PIPE_ENCODE, PIPE_DECODE} from '../constants';
@ -8,7 +9,8 @@ import {
TcpInbound, TcpOutbound,
UdpInbound, UdpOutbound,
TlsInbound, TlsOutbound,
WsInbound, WsOutbound
WsInbound, WsOutbound,
MuxInbound, MuxOutbound,
} from '../transports';
import {
@ -17,56 +19,26 @@ import {
CONNECTION_CLOSED,
CONNECTION_WILL_CLOSE,
CHANGE_PRESET_SUITE,
MUX_NEW_CONN,
MUX_DATA_FRAME,
MUX_CLOSE_CONN
} from '../presets/defs';
/**
* get Inbound and Outbound classes by transport
* @param transport
* @returns {{Inbound: *, Outbound: *}}
* [client side]
* app <==> (TcpInbound) relay (MuxOutbound)
* <==> (MuxInbound) muxRelay (TcpOutbound, ...) <--->
*
* [server side]
* <---> (TcpInbound, ...) muxRelay (MuxOutbound) <==>
* (MuxInbound) relay (TcpOutbound) <==> app
*/
function getBounds(transport) {
const mapping = {
'tcp': [TcpInbound, TcpOutbound],
'udp': [UdpInbound, UdpOutbound],
'tls': [TlsInbound, TlsOutbound],
'ws': [WsInbound, WsOutbound]
};
let Inbound = null;
let Outbound = null;
if (transport === 'udp') {
[Inbound, Outbound] = [UdpInbound, UdpOutbound];
} else {
[Inbound, Outbound] = __IS_CLIENT__ ? [TcpInbound, mapping[transport][1]] : [mapping[transport][0], TcpOutbound];
}
return {Inbound, Outbound};
}
/**
* set properties to target object
* @param target
* @param props
*/
function setProperties(target, props = {}) {
const propNames = Object.keys(props);
for (const name of propNames) {
Object.defineProperty(target, name, {value: props[name]});
}
}
// .on('close')
// .on('encode')
// .on('decode')
// .on('muxNewConn')
// .on('muxDataFrame')
// .on('muxCloseConn')
export class Relay extends EventEmitter {
_transport = null;
id = null;
_isMux = false;
_ctx = null;
_transport = null;
_remoteInfo = null;
@ -82,28 +54,38 @@ export class Relay extends EventEmitter {
_destroyed = false;
constructor({transport, remoteInfo, context = null, presets = [], isMux = false}) {
constructor({transport, context, presets = []}) {
super();
this.updatePresets = this.updatePresets.bind(this);
this.onBroadcast = this.onBroadcast.bind(this);
this.onEncoded = this.onEncoded.bind(this);
this.onDecoded = this.onDecoded.bind(this);
this.id = uniqueId() | 0;
this._transport = transport;
this._isMux = isMux;
this._remoteInfo = remoteInfo;
this._remoteInfo = context.remoteInfo;
// pipe
this._presets = this.preparePresets(presets);
this._pipe = this.createPipe(this._presets);
this._ctx = {
cid: this.id,
pipe: this._pipe,
dnsCache: new DNSCache({expire: __DNS_EXPIRE__}),
thisRelay: this,
...context,
};
// bounds
const {Inbound, Outbound} = this.getBounds(transport);
const inbound = new Inbound({context: this._ctx});
const outbound = new Outbound({context: this._ctx});
this._inbound = inbound;
this._outbound = outbound;
// outbound
const {Inbound, Outbound} = getBounds(transport);
this._inbound = new Inbound({context, remoteInfo, pipe: this._pipe});
this._outbound = new Outbound({remoteInfo, pipe: this._pipe, dnsCache: new DNSCache({expire: __DNS_EXPIRE__})});
this._outbound.setInbound(this._inbound);
this._outbound.on('close', () => this.onBoundClose(this._outbound, this._inbound));
this._outbound.on('close', () => this.onBoundClose(outbound, inbound));
this._outbound.on('updatePresets', this.updatePresets);
// inbound
this._inbound.setOutbound(this._outbound);
this._inbound.on('close', () => this.onBoundClose(this._inbound, this._outbound));
this._inbound.on('close', () => this.onBoundClose(inbound, outbound));
this._inbound.on('updatePresets', this.updatePresets);
}
@ -121,6 +103,28 @@ export class Relay extends EventEmitter {
}
}
/**
* get Inbound and Outbound classes by transport
* @param transport
* @returns {{Inbound: *, Outbound: *}}
*/
getBounds(transport) {
const mapping = {
'tcp': [TcpInbound, TcpOutbound],
'udp': [UdpInbound, UdpOutbound],
'tls': [TlsInbound, TlsOutbound],
'ws': [WsInbound, WsOutbound],
'mux': [MuxInbound, MuxOutbound],
};
let Inbound = null, Outbound = null;
if (transport === 'udp') {
[Inbound, Outbound] = [UdpInbound, UdpOutbound];
} else {
[Inbound, Outbound] = __IS_CLIENT__ ? [TcpInbound, mapping[transport][1]] : [mapping[transport][0], TcpOutbound];
}
return {Inbound, Outbound};
}
onBoundClose(thisBound, anotherBound) {
if (anotherBound.__closed) {
if (!this._pipe.destroyed) {
@ -146,12 +150,8 @@ export class Relay extends EventEmitter {
return this._inbound;
}
getLoad(type = PIPE_ENCODE) {
if (type === PIPE_ENCODE) {
return this._outbound.bufferSize;
} else {
return this._inbound.bufferSize;
}
getContext() {
return this._ctx;
}
// hooks of pipe
@ -159,26 +159,11 @@ export class Relay extends EventEmitter {
onBroadcast(action) {
const type = action.type;
if (__MUX__ && this._transport !== 'udp') {
switch (type) {
case CONNECT_TO_REMOTE:
if (__IS_CLIENT__ && !this._isMux) {
const remote = `${this._remoteInfo.host}:${this._remoteInfo.port}`;
const target = `${action.payload.host}:${action.payload.port}`;
logger.info(`[relay] [${remote}] request(over mux): ${target}`);
return;
}
if (__IS_SERVER__ && this._isMux) {
return;
}
break;
case MUX_NEW_CONN:
return this.emit('muxNewConn', action.payload);
case MUX_DATA_FRAME:
return this.emit('muxDataFrame', action.payload);
case MUX_CLOSE_CONN:
return this.emit('muxCloseConn', action.payload);
default:
break;
if (__IS_CLIENT__ && type === CONNECT_TO_REMOTE) {
const remote = `${this._remoteInfo.host}:${this._remoteInfo.port}`;
const target = `${action.payload.host}:${action.payload.port}`;
logger.info(`[relay] [${remote}] request over mux(id=${this._ctx.muxRelay.id}): ${target}`);
return;
}
}
if (type === CHANGE_PRESET_SUITE) {
@ -215,26 +200,18 @@ export class Relay extends EventEmitter {
}
onEncoded(buffer) {
if (this.hasListener('encode')) {
this.emit('encode', buffer);
if (__IS_CLIENT__) {
this._outbound.write(buffer);
} else {
if (__IS_CLIENT__) {
this._outbound.write(buffer);
} else {
this._inbound.write(buffer);
}
this._inbound.write(buffer);
}
}
onDecoded(buffer) {
if (this.hasListener('decode')) {
this.emit('decode', buffer);
if (__IS_CLIENT__) {
this._inbound.write(buffer);
} else {
if (__IS_CLIENT__) {
this._inbound.write(buffer);
} else {
this._outbound.write(buffer);
}
this._outbound.write(buffer);
}
}
@ -260,28 +237,15 @@ export class Relay extends EventEmitter {
return this._outbound && this._outbound.writable;
}
setPropsForInbound(props) {
setProperties(this._inbound, props);
}
setPropsForOutbound(props) {
setProperties(this._outbound, props);
}
/**
* preprocess preset list
* @param presets
* @returns {[]}
*/
preparePresets(presets) {
const first = presets[0];
const last = presets[presets.length - 1];
// add mux preset to the top if it's a mux relay
if (this._isMux && (!first || first.name !== 'mux')) {
presets = [{'name': 'mux'}].concat(presets);
}
// add at least one "tracker" preset to the list
if (!this._isMux && (!last || last.name !== 'tracker')) {
if (!last || last.name !== 'tracker') {
presets = presets.concat([{'name': 'tracker'}]);
}
return presets;
@ -315,6 +279,7 @@ export class Relay extends EventEmitter {
this._pipe && this._pipe.destroy();
this._inbound && this._inbound.close();
this._outbound && this._outbound.close();
this._ctx = null;
this._pipe = null;
this._inbound = null;
this._outbound = null;

@ -160,7 +160,7 @@ export default class SsAeadCipherPreset extends IPreset {
salt = crypto.randomBytes(saltSize);
this._cipherKey = HKDF(HKDF_HASH_ALGORITHM, salt, evpKey, info, keySize);
}
const chunks = getRandomChunks(buffer, MIN_CHUNK_SPLIT_LEN, MAX_CHUNK_SPLIT_LEN - 1).map((chunk) => {
const chunks = getRandomChunks(buffer, MIN_CHUNK_SPLIT_LEN, MAX_CHUNK_SPLIT_LEN).map((chunk) => {
const dataLen = numberToBuffer(chunk.length);
const [encLen, lenTag] = this.encrypt(dataLen);
const [encData, dataTag] = this.encrypt(chunk);

@ -177,7 +177,7 @@ export default class SsrAuthAes128Preset extends IPreset {
createChunks(buffer) {
const userKey = this._userKey;
return getRandomChunks(buffer, 0x1fff - 0xff - 8 - 1, 0x2000 - 0xff - 8 - 1).map((payload) => {
return getRandomChunks(buffer, 0x1fff - 0xff - 8 - 3, 0x2000 - 0xff - 8 - 3).map((payload) => {
const [first, len] = crypto.randomBytes(2);
let random_bytes = null;
if (first < 128) {

@ -16,7 +16,7 @@ export class MuxInbound extends Inbound {
get bufferSize() {
if (__IS_CLIENT__) {
let totalBufferSize = 0;
const totalBufferSize = 0;
// const subRelays = this.ctx.thisRelay.getSubRelays();
// if (subRelays) {
// for (const relay of subRelays.values()) {
@ -115,7 +115,7 @@ export class MuxOutbound extends Outbound {
return 0;
}
} else {
let totalBufferSize = 0;
const totalBufferSize = 0;
// const subRelays = this.ctx.thisRelay.getSubRelays();
// if (subRelays) {
// for (const relay of subRelays.values()) {