core: refine relay creation

This commit is contained in:
Micooz 2017-12-13 10:01:45 +08:00
parent 78a67c0709
commit 5c973a661c
No known key found for this signature in database
GPG Key ID: B100C9A159B1EBA0
2 changed files with 44 additions and 43 deletions

@ -5,12 +5,12 @@ import tls from 'tls';
import ws from 'ws';
import LRU from 'lru-cache';
import uniqueId from 'lodash.uniqueid';
import * as MiddlewareManager from './middleware';
import {Balancer} from './balancer';
import {Config} from './config';
import * as MiddlewareManager from './middleware';
import {createRelay} from './relay';
import {logger, dumpHex} from '../utils';
import {tcp, http, socks} from '../proxies';
import {Relay} from './relay';
import {dumpHex, logger} from '../utils';
import {http, socks, tcp} from '../proxies';
export class Hub {
@ -20,7 +20,7 @@ export class Hub {
_udpServer = null;
_relays = new Map(/* id: <relay> */);
_tcpRelays = new Map(/* id: <relay> */);
_udpRelays = null; // LRU cache
@ -39,8 +39,8 @@ export class Hub {
terminate(callback) {
// relays
this._udpRelays.reset();
this._relays.forEach((relay) => relay.destroy());
this._relays.clear();
this._tcpRelays.forEach((relay) => relay.destroy());
this._tcpRelays.clear();
MiddlewareManager.cleanup();
// balancer
if (__IS_CLIENT__) {
@ -174,7 +174,7 @@ export class Hub {
if (relay === undefined) {
server.remoteAddress = address;
server.remotePort = port;
relay = createRelay('udp', server, proxyRequest);
relay = new Relay({transport: 'udp', context: server, proxyRequest});
relay.on('close', function onRelayClose() {
// relays.del(key);
});
@ -218,21 +218,19 @@ export class Hub {
if (__IS_CLIENT__) {
this._switchServer();
}
const cid = uniqueId(`${__TRANSPORT__}_`);
logger.verbose(`[hub] [${context.remoteAddress}:${context.remotePort}] connected`);
const relay = createRelay(__TRANSPORT__, context, proxyRequest);
const cid = uniqueId() | 0;
const relay = new Relay({transport: __TRANSPORT__, context, proxyRequest});
relay.id = cid;
relay.on('close', () => {
this._relays.delete(cid);
});
this._relays.set(cid, relay);
relay.on('close', () => this._tcpRelays.delete(cid));
this._tcpRelays.set(cid, relay);
}
_switchServer() {
const server = Balancer.getFastest();
if (server) {
Config.initServer(server);
logger.info(`[balancer] use server: ${__SERVER_HOST__}:${__SERVER_PORT__}`);
logger.info(`[balancer-${this._wkId}] use server: ${__SERVER_HOST__}:${__SERVER_PORT__}`);
}
}

@ -19,6 +19,28 @@ function preparePresets(presets) {
return presets;
}
/**
* get Inbound and Outbound classes by transport
* @param transport
* @returns {{Inbound: *, Outbound: *}}
*/
function getBounds(transport) {
const mapping = {
'tcp': [TcpInbound, TcpOutbound],
'udp': [UdpInbound, UdpOutbound],
'tls': [TlsInbound, TlsOutbound],
'ws': [WsInbound, WsOutbound]
};
let Inbound = null;
let Outbound = null;
if (transport === 'udp') {
[Inbound, Outbound] = [UdpInbound, UdpOutbound];
} else {
[Inbound, Outbound] = __IS_CLIENT__ ? [TcpInbound, mapping[transport][1]] : [mapping[transport][0], TcpOutbound];
}
return {Inbound, Outbound};
}
// .on('close')
export class Relay extends EventEmitter {
@ -36,12 +58,12 @@ export class Relay extends EventEmitter {
_presets = [];
constructor({transport, context, Inbound, Outbound, proxyRequest = null}) {
constructor({transport, context, proxyRequest = null}) {
super();
this.updatePresets = this.updatePresets.bind(this);
this.onBroadcast = this.onBroadcast.bind(this);
this.postPipeEncode = this.postPipeEncode.bind(this);
this.postPipeDecode = this.postPipeDecode.bind(this);
this.onPipeEncoded = this.onPipeEncoded.bind(this);
this.onPipeDecoded = this.onPipeDecoded.bind(this);
this._transport = transport;
this._context = context;
this._proxyRequest = proxyRequest;
@ -49,6 +71,7 @@ export class Relay extends EventEmitter {
this._presets = preparePresets(__PRESETS__);
this._pipe = this.createPipe(this._presets);
// outbound
const {Inbound, Outbound} = getBounds(transport);
this._inbound = new Inbound({context: context, pipe: this._pipe});
this._outbound = new Outbound({inbound: this._inbound, pipe: this._pipe});
this._outbound.updatePresets = this.updatePresets;
@ -96,8 +119,7 @@ export class Relay extends EventEmitter {
// 1. update preset list
this.updatePresets(preparePresets([
...suite.presets,
{'name': 'auto-conf'},
// TODO(discussion): need any other protections here, or improve auto-conf itself instead?
{'name': 'auto-conf'}
]));
// 2. initialize newly created presets
const transport = this._transport;
@ -121,7 +143,7 @@ export class Relay extends EventEmitter {
this._pipe.feed(type, data);
}
postPipeEncode(buffer) {
onPipeEncoded(buffer) {
if (__IS_CLIENT__) {
this._outbound.write(buffer);
} else {
@ -129,7 +151,7 @@ export class Relay extends EventEmitter {
}
}
postPipeDecode(buffer) {
onPipeDecoded(buffer) {
if (__IS_CLIENT__) {
this._inbound.write(buffer);
} else {
@ -154,8 +176,8 @@ export class Relay extends EventEmitter {
createPipe(presets) {
const pipe = new Pipe({presets, isUdp: this._transport === 'udp'});
pipe.on('broadcast', this.onBroadcast.bind(this)); // if no action were caught by presets
pipe.on(`post_${PIPE_ENCODE}`, this.postPipeEncode);
pipe.on(`post_${PIPE_DECODE}`, this.postPipeDecode);
pipe.on(`post_${PIPE_ENCODE}`, this.onPipeEncoded);
pipe.on(`post_${PIPE_DECODE}`, this.onPipeDecoded);
return pipe;
}
@ -175,22 +197,3 @@ export class Relay extends EventEmitter {
}
}
const mapping = {
'tcp': [TcpInbound, TcpOutbound],
'udp': [UdpInbound, UdpOutbound],
'tls': [TlsInbound, TlsOutbound],
'ws': [WsInbound, WsOutbound]
};
export function createRelay(transport, context, proxyRequest = null) {
let Inbound = null;
let Outbound = null;
if (transport === 'udp') {
[Inbound, Outbound] = [UdpInbound, UdpOutbound];
} else {
[Inbound, Outbound] = __IS_CLIENT__ ? [TcpInbound, mapping[transport][1]] : [mapping[transport][0], TcpOutbound];
}
const props = {transport, context, Inbound, Outbound, proxyRequest};
return new Relay(props);
}