From 5c973a661c1e0cc4dd43ff504ee4440e1c6f043c Mon Sep 17 00:00:00 2001 From: Micooz Date: Wed, 13 Dec 2017 10:01:45 +0800 Subject: [PATCH] core: refine relay creation --- src/core/hub.js | 28 +++++++++++----------- src/core/relay.js | 59 +++++++++++++++++++++++++---------------------- 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/src/core/hub.js b/src/core/hub.js index 0584c04..9c5b89d 100644 --- a/src/core/hub.js +++ b/src/core/hub.js @@ -5,12 +5,12 @@ import tls from 'tls'; import ws from 'ws'; import LRU from 'lru-cache'; import uniqueId from 'lodash.uniqueid'; +import * as MiddlewareManager from './middleware'; import {Balancer} from './balancer'; import {Config} from './config'; -import * as MiddlewareManager from './middleware'; -import {createRelay} from './relay'; -import {logger, dumpHex} from '../utils'; -import {tcp, http, socks} from '../proxies'; +import {Relay} from './relay'; +import {dumpHex, logger} from '../utils'; +import {http, socks, tcp} from '../proxies'; export class Hub { @@ -20,7 +20,7 @@ export class Hub { _udpServer = null; - _relays = new Map(/* id: */); + _tcpRelays = new Map(/* id: */); _udpRelays = null; // LRU cache @@ -39,8 +39,8 @@ export class Hub { terminate(callback) { // relays this._udpRelays.reset(); - this._relays.forEach((relay) => relay.destroy()); - this._relays.clear(); + this._tcpRelays.forEach((relay) => relay.destroy()); + this._tcpRelays.clear(); MiddlewareManager.cleanup(); // balancer if (__IS_CLIENT__) { @@ -174,7 +174,7 @@ export class Hub { if (relay === undefined) { server.remoteAddress = address; server.remotePort = port; - relay = createRelay('udp', server, proxyRequest); + relay = new Relay({transport: 'udp', context: server, proxyRequest}); relay.on('close', function onRelayClose() { // relays.del(key); }); @@ -218,21 +218,19 @@ export class Hub { if (__IS_CLIENT__) { this._switchServer(); } - const cid = uniqueId(`${__TRANSPORT__}_`); logger.verbose(`[hub] [${context.remoteAddress}:${context.remotePort}] connected`); - const relay = createRelay(__TRANSPORT__, context, proxyRequest); + const cid = uniqueId() | 0; + const relay = new Relay({transport: __TRANSPORT__, context, proxyRequest}); relay.id = cid; - relay.on('close', () => { - this._relays.delete(cid); - }); - this._relays.set(cid, relay); + relay.on('close', () => this._tcpRelays.delete(cid)); + this._tcpRelays.set(cid, relay); } _switchServer() { const server = Balancer.getFastest(); if (server) { Config.initServer(server); - logger.info(`[balancer] use server: ${__SERVER_HOST__}:${__SERVER_PORT__}`); + logger.info(`[balancer-${this._wkId}] use server: ${__SERVER_HOST__}:${__SERVER_PORT__}`); } } diff --git a/src/core/relay.js b/src/core/relay.js index c51976c..bcb239e 100644 --- a/src/core/relay.js +++ b/src/core/relay.js @@ -19,6 +19,28 @@ function preparePresets(presets) { return presets; } +/** + * get Inbound and Outbound classes by transport + * @param transport + * @returns {{Inbound: *, Outbound: *}} + */ +function getBounds(transport) { + const mapping = { + 'tcp': [TcpInbound, TcpOutbound], + 'udp': [UdpInbound, UdpOutbound], + 'tls': [TlsInbound, TlsOutbound], + 'ws': [WsInbound, WsOutbound] + }; + let Inbound = null; + let Outbound = null; + if (transport === 'udp') { + [Inbound, Outbound] = [UdpInbound, UdpOutbound]; + } else { + [Inbound, Outbound] = __IS_CLIENT__ ? [TcpInbound, mapping[transport][1]] : [mapping[transport][0], TcpOutbound]; + } + return {Inbound, Outbound}; +} + // .on('close') export class Relay extends EventEmitter { @@ -36,12 +58,12 @@ export class Relay extends EventEmitter { _presets = []; - constructor({transport, context, Inbound, Outbound, proxyRequest = null}) { + constructor({transport, context, proxyRequest = null}) { super(); this.updatePresets = this.updatePresets.bind(this); this.onBroadcast = this.onBroadcast.bind(this); - this.postPipeEncode = this.postPipeEncode.bind(this); - this.postPipeDecode = this.postPipeDecode.bind(this); + this.onPipeEncoded = this.onPipeEncoded.bind(this); + this.onPipeDecoded = this.onPipeDecoded.bind(this); this._transport = transport; this._context = context; this._proxyRequest = proxyRequest; @@ -49,6 +71,7 @@ export class Relay extends EventEmitter { this._presets = preparePresets(__PRESETS__); this._pipe = this.createPipe(this._presets); // outbound + const {Inbound, Outbound} = getBounds(transport); this._inbound = new Inbound({context: context, pipe: this._pipe}); this._outbound = new Outbound({inbound: this._inbound, pipe: this._pipe}); this._outbound.updatePresets = this.updatePresets; @@ -96,8 +119,7 @@ export class Relay extends EventEmitter { // 1. update preset list this.updatePresets(preparePresets([ ...suite.presets, - {'name': 'auto-conf'}, - // TODO(discussion): need any other protections here, or improve auto-conf itself instead? + {'name': 'auto-conf'} ])); // 2. initialize newly created presets const transport = this._transport; @@ -121,7 +143,7 @@ export class Relay extends EventEmitter { this._pipe.feed(type, data); } - postPipeEncode(buffer) { + onPipeEncoded(buffer) { if (__IS_CLIENT__) { this._outbound.write(buffer); } else { @@ -129,7 +151,7 @@ export class Relay extends EventEmitter { } } - postPipeDecode(buffer) { + onPipeDecoded(buffer) { if (__IS_CLIENT__) { this._inbound.write(buffer); } else { @@ -154,8 +176,8 @@ export class Relay extends EventEmitter { createPipe(presets) { const pipe = new Pipe({presets, isUdp: this._transport === 'udp'}); pipe.on('broadcast', this.onBroadcast.bind(this)); // if no action were caught by presets - pipe.on(`post_${PIPE_ENCODE}`, this.postPipeEncode); - pipe.on(`post_${PIPE_DECODE}`, this.postPipeDecode); + pipe.on(`post_${PIPE_ENCODE}`, this.onPipeEncoded); + pipe.on(`post_${PIPE_DECODE}`, this.onPipeDecoded); return pipe; } @@ -175,22 +197,3 @@ export class Relay extends EventEmitter { } } - -const mapping = { - 'tcp': [TcpInbound, TcpOutbound], - 'udp': [UdpInbound, UdpOutbound], - 'tls': [TlsInbound, TlsOutbound], - 'ws': [WsInbound, WsOutbound] -}; - -export function createRelay(transport, context, proxyRequest = null) { - let Inbound = null; - let Outbound = null; - if (transport === 'udp') { - [Inbound, Outbound] = [UdpInbound, UdpOutbound]; - } else { - [Inbound, Outbound] = __IS_CLIENT__ ? [TcpInbound, mapping[transport][1]] : [mapping[transport][0], TcpOutbound]; - } - const props = {transport, context, Inbound, Outbound, proxyRequest}; - return new Relay(props); -}