core: decouple mux stuff from pipe

This commit is contained in:
Micooz 2018-07-07 10:23:19 +08:00
parent 1e61c6dab0
commit 1d852ee514
4 changed files with 38 additions and 52 deletions

@ -6,9 +6,6 @@ export const PIPE_DECODE = -1;
export const PRESET_FAILED = 'PRESET_FAILED';
export const CONNECT_TO_REMOTE = 'CONNECT_TO_REMOTE';
export const MUX_NEW_CONN = 'MUX_NEW_CONN';
export const MUX_DATA_FRAME = 'MUX_DATA_FRAME';
export const MUX_CLOSE_CONN = 'MUX_CLOSE_CONN';
// https://url.spec.whatwg.org/#url-miscellaneous
export const PROTOCOL_DEFAULT_PORTS = {

@ -3,17 +3,8 @@ import uniqueId from 'lodash.uniqueid';
import { Pipe } from './pipe';
import { Tracker } from './tracker';
import { hash, logger, getRandomInt, dumpHex } from '../utils';
import { getPresetClassByName } from '../presets';
import { IPresetAddressing } from '../presets/defs';
import {
MUX_CLOSE_CONN,
MUX_DATA_FRAME,
MUX_NEW_CONN,
PIPE_DECODE,
PIPE_ENCODE,
APP_ID, PRESET_FAILED,
} from '../constants';
import { getPresetClassByName, IPresetAddressing } from '../presets';
import { PIPE_DECODE, PIPE_ENCODE, APP_ID, PRESET_FAILED } from '../constants';
import {
TcpInbound, TcpOutbound,
@ -163,23 +154,14 @@ export class MuxRelay extends EventEmitter {
// events
onBroadcast = ({ action, source }) => {
if (action.type === MUX_NEW_CONN) {
return this.onNewSubConn(action.payload);
}
else if (action.type === MUX_DATA_FRAME) {
return this.onDataFrame(action.payload);
}
else if (action.type === MUX_CLOSE_CONN) {
return this.onSubConnCloseByProtocol(action.payload);
}
else if (action.type === PRESET_FAILED) {
if (action.type === PRESET_FAILED) {
const { name, message } = action.payload;
const remote = `${source.host}:${source.port}`;
logger.error(`[mux-relay] [${this._transport}] [${remote}] preset "${name}" fail to process: ${message}`);
}
};
async onNewSubConn({ cid, host, port }) {
onNewSubConn = async ({ cid, host, port }) => {
const { _config: config, _transport: transport } = this;
const { Outbound } = this._getBounds(transport);
@ -237,9 +219,9 @@ export class MuxRelay extends EventEmitter {
outbound.__tracker.trace(PIPE_DECODE, buffer.length);
setImmediate(() => this.emit('_read', buffer.length));
}
}
};
onDataFrame({ cid, data }) {
onDataFrame = ({ cid, data }) => {
if (this._config.is_client) {
const inbound = this._inbounds.get(cid);
if (inbound && inbound.writable) {
@ -264,16 +246,16 @@ export class MuxRelay extends EventEmitter {
}
}
}
}
};
onSubConnCloseByProtocol({ cid }) {
onSubConnCloseByProtocol = ({ cid }) => {
const bounds = this._config.is_client ? this._inbounds : this._outbounds;
const bound = bounds.get(cid);
if (bound) {
bound.close();
bounds.delete(cid);
}
}
};
// methods
@ -297,12 +279,12 @@ export class MuxRelay extends EventEmitter {
// client only
_createMuxOutbound({ source }) {
const { _transport: transport, _config: config, _presets: presets } = this;
const { _transport: transport, _config: config } = this;
const { Outbound } = this._getBounds(transport);
const __id = uniqueId('mux_outbound_');
// pipe
const pipe = new Pipe({ config, presets, isUdp: transport === 'udp' });
const pipe = new Pipe(this._getPipeProps());
pipe.on('broadcast', (action) => this.onBroadcast({ action, source }));
pipe.on(`post_${PIPE_ENCODE}`, (buffer) => outbound.write(buffer));
this._pipes.set(__id, pipe);
@ -325,12 +307,12 @@ export class MuxRelay extends EventEmitter {
// server only
_createMuxInbound({ source, conn }) {
const { _transport: transport, _config: config, _presets: presets } = this;
const { _transport: transport, _config: config } = this;
const { Inbound } = this._getBounds(transport);
const __id = uniqueId('mux_inbound_');
// pipe
const pipe = new Pipe({ config, presets, isUdp: transport === 'udp' });
const pipe = new Pipe(this._getPipeProps());
pipe.on('broadcast', (action) => this.onBroadcast({ action, source }));
pipe.on(`post_${PIPE_ENCODE}`, (buffer) => inbound.write(buffer));
this._pipes.set(__id, pipe);
@ -365,6 +347,22 @@ export class MuxRelay extends EventEmitter {
return [...bounds.values()][getRandomInt(0, concurrency - 1)];
}
_getPipeProps() {
const { _transport: transport, _config: config, _presets: presets } = this;
return {
config,
presets,
isUdp: transport === 'udp',
injector: (preset) => {
if (preset.name === 'mux') {
preset.muxNewConn = this.onNewSubConn;
preset.muxDataFrame = this.onDataFrame;
preset.muxCloseConn = this.onSubConnCloseByProtocol;
}
},
};
}
_preparePresets(presets) {
// remove unnecessary presets
presets = presets.filter(

@ -1,17 +1,7 @@
import EventEmitter from 'events';
import { logger } from '../utils';
import {
PIPE_ENCODE,
CONNECT_TO_REMOTE,
MUX_CLOSE_CONN,
MUX_DATA_FRAME,
MUX_NEW_CONN,
PRESET_FAILED
} from '../constants';
import { getPresetClassByName } from '../presets';
import { IPresetAddressing } from '../presets/defs';
import { PIPE_ENCODE, CONNECT_TO_REMOTE, PRESET_FAILED } from '../constants';
import { getPresetClassByName, IPresetAddressing } from '../presets';
// .on('broadcast')
// .on(`pre_${type}`)
@ -22,6 +12,8 @@ export class Pipe extends EventEmitter {
_isPipingUdp = false;
_injector = () => void {};
_encode_presets = [];
_decode_presets = [];
@ -34,10 +26,11 @@ export class Pipe extends EventEmitter {
return this._destroyed;
}
constructor({ config, presets, isUdp = false }) {
constructor({ config, presets, isUdp = false, injector }) {
super();
this._config = config;
this._isPipingUdp = isUdp;
this._injector = injector;
// presets
const _presets = presets.map(this._createPreset.bind(this));
this._encode_presets = _presets;
@ -149,10 +142,8 @@ export class Pipe extends EventEmitter {
};
}
// inject methods for mux
if (preset.name === 'mux') {
preset.muxNewConn = (payload) => this.broadcast({ type: MUX_NEW_CONN, payload });
preset.muxDataFrame = (payload) => this.broadcast({ type: MUX_DATA_FRAME, payload });
preset.muxCloseConn = (payload) => this.broadcast({ type: MUX_CLOSE_CONN, payload });
if (typeof this._injector === 'function') {
this._injector(preset);
}
// ::onInit()
preset.onInit(params);

@ -87,7 +87,7 @@ export class Relay extends EventEmitter {
});
}
} catch (err) {
logger.error(`[mux-relay] [${remote}] onConnected callback error: ${err.message}`);
logger.error(`[relay] [${remote}] onConnected callback error: ${err.message}`);
this.emit('_error', err);
}
}