diff --git a/src/constants.js b/src/constants.js index 8562276..21b71bf 100644 --- a/src/constants.js +++ b/src/constants.js @@ -6,9 +6,6 @@ export const PIPE_DECODE = -1; export const PRESET_FAILED = 'PRESET_FAILED'; export const CONNECT_TO_REMOTE = 'CONNECT_TO_REMOTE'; -export const MUX_NEW_CONN = 'MUX_NEW_CONN'; -export const MUX_DATA_FRAME = 'MUX_DATA_FRAME'; -export const MUX_CLOSE_CONN = 'MUX_CLOSE_CONN'; // https://url.spec.whatwg.org/#url-miscellaneous export const PROTOCOL_DEFAULT_PORTS = { diff --git a/src/core/mux-relay.js b/src/core/mux-relay.js index e25e1a8..b01cf73 100644 --- a/src/core/mux-relay.js +++ b/src/core/mux-relay.js @@ -3,17 +3,8 @@ import uniqueId from 'lodash.uniqueid'; import { Pipe } from './pipe'; import { Tracker } from './tracker'; import { hash, logger, getRandomInt, dumpHex } from '../utils'; -import { getPresetClassByName } from '../presets'; -import { IPresetAddressing } from '../presets/defs'; - -import { - MUX_CLOSE_CONN, - MUX_DATA_FRAME, - MUX_NEW_CONN, - PIPE_DECODE, - PIPE_ENCODE, - APP_ID, PRESET_FAILED, -} from '../constants'; +import { getPresetClassByName, IPresetAddressing } from '../presets'; +import { PIPE_DECODE, PIPE_ENCODE, APP_ID, PRESET_FAILED } from '../constants'; import { TcpInbound, TcpOutbound, @@ -163,23 +154,14 @@ export class MuxRelay extends EventEmitter { // events onBroadcast = ({ action, source }) => { - if (action.type === MUX_NEW_CONN) { - return this.onNewSubConn(action.payload); - } - else if (action.type === MUX_DATA_FRAME) { - return this.onDataFrame(action.payload); - } - else if (action.type === MUX_CLOSE_CONN) { - return this.onSubConnCloseByProtocol(action.payload); - } - else if (action.type === PRESET_FAILED) { + if (action.type === PRESET_FAILED) { const { name, message } = action.payload; const remote = `${source.host}:${source.port}`; logger.error(`[mux-relay] [${this._transport}] [${remote}] preset "${name}" fail to process: ${message}`); } }; - async onNewSubConn({ cid, host, port }) { + onNewSubConn = async ({ cid, host, port }) => { const { _config: config, _transport: transport } = this; const { Outbound } = this._getBounds(transport); @@ -237,9 +219,9 @@ export class MuxRelay extends EventEmitter { outbound.__tracker.trace(PIPE_DECODE, buffer.length); setImmediate(() => this.emit('_read', buffer.length)); } - } + }; - onDataFrame({ cid, data }) { + onDataFrame = ({ cid, data }) => { if (this._config.is_client) { const inbound = this._inbounds.get(cid); if (inbound && inbound.writable) { @@ -264,16 +246,16 @@ export class MuxRelay extends EventEmitter { } } } - } + }; - onSubConnCloseByProtocol({ cid }) { + onSubConnCloseByProtocol = ({ cid }) => { const bounds = this._config.is_client ? this._inbounds : this._outbounds; const bound = bounds.get(cid); if (bound) { bound.close(); bounds.delete(cid); } - } + }; // methods @@ -297,12 +279,12 @@ export class MuxRelay extends EventEmitter { // client only _createMuxOutbound({ source }) { - const { _transport: transport, _config: config, _presets: presets } = this; + const { _transport: transport, _config: config } = this; const { Outbound } = this._getBounds(transport); const __id = uniqueId('mux_outbound_'); // pipe - const pipe = new Pipe({ config, presets, isUdp: transport === 'udp' }); + const pipe = new Pipe(this._getPipeProps()); pipe.on('broadcast', (action) => this.onBroadcast({ action, source })); pipe.on(`post_${PIPE_ENCODE}`, (buffer) => outbound.write(buffer)); this._pipes.set(__id, pipe); @@ -325,12 +307,12 @@ export class MuxRelay extends EventEmitter { // server only _createMuxInbound({ source, conn }) { - const { _transport: transport, _config: config, _presets: presets } = this; + const { _transport: transport, _config: config } = this; const { Inbound } = this._getBounds(transport); const __id = uniqueId('mux_inbound_'); // pipe - const pipe = new Pipe({ config, presets, isUdp: transport === 'udp' }); + const pipe = new Pipe(this._getPipeProps()); pipe.on('broadcast', (action) => this.onBroadcast({ action, source })); pipe.on(`post_${PIPE_ENCODE}`, (buffer) => inbound.write(buffer)); this._pipes.set(__id, pipe); @@ -365,6 +347,22 @@ export class MuxRelay extends EventEmitter { return [...bounds.values()][getRandomInt(0, concurrency - 1)]; } + _getPipeProps() { + const { _transport: transport, _config: config, _presets: presets } = this; + return { + config, + presets, + isUdp: transport === 'udp', + injector: (preset) => { + if (preset.name === 'mux') { + preset.muxNewConn = this.onNewSubConn; + preset.muxDataFrame = this.onDataFrame; + preset.muxCloseConn = this.onSubConnCloseByProtocol; + } + }, + }; + } + _preparePresets(presets) { // remove unnecessary presets presets = presets.filter( diff --git a/src/core/pipe.js b/src/core/pipe.js index 33f3a93..3ef8632 100644 --- a/src/core/pipe.js +++ b/src/core/pipe.js @@ -1,17 +1,7 @@ import EventEmitter from 'events'; import { logger } from '../utils'; - -import { - PIPE_ENCODE, - CONNECT_TO_REMOTE, - MUX_CLOSE_CONN, - MUX_DATA_FRAME, - MUX_NEW_CONN, - PRESET_FAILED -} from '../constants'; - -import { getPresetClassByName } from '../presets'; -import { IPresetAddressing } from '../presets/defs'; +import { PIPE_ENCODE, CONNECT_TO_REMOTE, PRESET_FAILED } from '../constants'; +import { getPresetClassByName, IPresetAddressing } from '../presets'; // .on('broadcast') // .on(`pre_${type}`) @@ -22,6 +12,8 @@ export class Pipe extends EventEmitter { _isPipingUdp = false; + _injector = () => void {}; + _encode_presets = []; _decode_presets = []; @@ -34,10 +26,11 @@ export class Pipe extends EventEmitter { return this._destroyed; } - constructor({ config, presets, isUdp = false }) { + constructor({ config, presets, isUdp = false, injector }) { super(); this._config = config; this._isPipingUdp = isUdp; + this._injector = injector; // presets const _presets = presets.map(this._createPreset.bind(this)); this._encode_presets = _presets; @@ -149,10 +142,8 @@ export class Pipe extends EventEmitter { }; } // inject methods for mux - if (preset.name === 'mux') { - preset.muxNewConn = (payload) => this.broadcast({ type: MUX_NEW_CONN, payload }); - preset.muxDataFrame = (payload) => this.broadcast({ type: MUX_DATA_FRAME, payload }); - preset.muxCloseConn = (payload) => this.broadcast({ type: MUX_CLOSE_CONN, payload }); + if (typeof this._injector === 'function') { + this._injector(preset); } // ::onInit() preset.onInit(params); diff --git a/src/core/relay.js b/src/core/relay.js index a7b2d52..721e075 100644 --- a/src/core/relay.js +++ b/src/core/relay.js @@ -87,7 +87,7 @@ export class Relay extends EventEmitter { }); } } catch (err) { - logger.error(`[mux-relay] [${remote}] onConnected callback error: ${err.message}`); + logger.error(`[relay] [${remote}] onConnected callback error: ${err.message}`); this.emit('_error', err); } }