diff --git a/src/core/pipe.js b/src/core/pipe.js index 3909b1e..58ce1b3 100644 --- a/src/core/pipe.js +++ b/src/core/pipe.js @@ -1,8 +1,5 @@ import EventEmitter from 'events'; -import { - MIDDLEWARE_DIRECTION_UPWARD, - MIDDLEWARE_DIRECTION_DOWNWARD -} from './middleware'; +import {MIDDLEWARE_DIRECTION_UPWARD} from './middleware'; import {PRESET_INIT, PRESET_FAILED} from '../presets/defs'; export class Pipe extends EventEmitter { @@ -11,15 +8,14 @@ export class Pipe extends EventEmitter { _downstream_middlewares = []; - _onNotified = () => 0; + _cacheBuffer = null; // buffer - constructor(props = {}) { + constructor() { super(); - this._onNotified = typeof props.onNotified === 'undefined' ? () => 0 : props.onNotified; - this.onBroadcast = this.onBroadcast.bind(this); + this.broadcast = this.broadcast.bind(this); } - onBroadcast(action) { + broadcast(action) { const middlewares = this.getMiddlewares(); const results = []; for (const middleware of middlewares) { @@ -27,31 +23,40 @@ export class Pipe extends EventEmitter { } // if no middleware handled this action, bubble up to where pipe created. if (results.every((result) => !!result === false)) { - this._onNotified(action); + this.emit('broadcast', action); } } - setMiddlewares(direction, middlewares) { + /** + * setup two-way middlewares + * @param middlewares + */ + setMiddlewares(middlewares) { + // set listeners for (const middleware of middlewares) { - middleware.setMaxListeners(2); - middleware.subscribe(this.onBroadcast); + middleware.setMaxListeners(4); + middleware.on('broadcast', this.broadcast); + middleware.on('fail', (name, message) => void this.broadcast({ + type: PRESET_FAILED, + payload: { + name, + message, + orgData: this._cacheBuffer + } + })); } - if (direction === MIDDLEWARE_DIRECTION_UPWARD) { - this._upstream_middlewares = middlewares; - this._downstream_middlewares = [].concat(middlewares).reverse(); - } else { - this._downstream_middlewares = middlewares; - this._upstream_middlewares = [].concat(middlewares).reverse(); - } - // make initial broadcast to all presets - this.onBroadcast(direction, {type: PRESET_INIT, payload: {broadcast: this.onBroadcast}}); + this._upstream_middlewares = middlewares; + this._downstream_middlewares = [].concat(middlewares).reverse(); + // initial broadcast + this.broadcast({type: PRESET_INIT, payload: {broadcast: this.broadcast}}); } getMiddlewares(direction = MIDDLEWARE_DIRECTION_UPWARD) { - return { - [MIDDLEWARE_DIRECTION_UPWARD]: this._upstream_middlewares, - [MIDDLEWARE_DIRECTION_DOWNWARD]: this._downstream_middlewares - }[direction]; + if (direction === MIDDLEWARE_DIRECTION_UPWARD) { + return this._upstream_middlewares; + } else { + return this._downstream_middlewares; + } } feed(direction, buffer) { @@ -60,30 +65,28 @@ export class Pipe extends EventEmitter { // methods to be injected const direct = (buf, isReverse = false) => this.emit(isReverse ? `next_${-direction}` : eventName, buf); - const fail = (name, message) => this.onBroadcast({ - type: PRESET_FAILED, - payload: { - name, - message, - orgData: buffer - } - }); // create event chain among middlewares const last = middlewares.reduce((prev, next) => { - if (prev.listenerCount(eventName) < 1) { - prev.on(eventName, (buf) => next.write(direction, {buffer: buf, direct, fail})); + if (!prev.hasListener(eventName)) { + prev.on(eventName, (buf) => next.write(direction, {buffer: buf, direct})); } return next; }); // the last middleware send data out via direct(buf, false) - if (last.listenerCount(eventName) < 1) { + if (!last.hasListener(eventName)) { last.on(eventName, direct); } // begin pipe - middlewares[0].write(direction, {buffer, direct, fail}); + middlewares[0].write(direction, {buffer, direct}); + + // TODO(fix): here only cached the first incoming data for carrying "orgData" in PRESET_FAILED. + // NOTE: cache full data may be a bad practice. + if (this._cacheBuffer === null) { + this._cacheBuffer = buffer; + } } destroy() { @@ -93,6 +96,8 @@ export class Pipe extends EventEmitter { } this._upstream_middlewares = null; this._downstream_middlewares = null; + this._staging = null; + this.removeAllListeners(); } } diff --git a/src/core/relay.js b/src/core/relay.js index 15243dd..3199272 100644 --- a/src/core/relay.js +++ b/src/core/relay.js @@ -84,7 +84,7 @@ export class Relay extends EventEmitter { } this._presets = presets; this._pipe = this.createPipe(presets); - this._pipe.onBroadcast({ + this._pipe.broadcast({ type: CONNECTION_CREATED, payload: { host: this._remoteHost, @@ -187,7 +187,7 @@ export class Relay extends EventEmitter { if (this._bsocket) { this._bsocket.destroy(); this._bsocket = null; - this._pipe.onBroadcast({ + this._pipe.broadcast({ type: CONNECTION_CLOSED, payload: { host: this._remoteHost, @@ -273,7 +273,6 @@ export class Relay extends EventEmitter { */ setPresets(callback) { this._presets = callback(this._presets); - this._pipe.removeAllListeners(); this._pipe.destroy(); this._pipe = this.createPipe(this._presets); } @@ -283,10 +282,11 @@ export class Relay extends EventEmitter { */ createPipe(presets) { const middlewares = presets.map((preset) => createMiddleware(preset.name, preset.params || {})); - const pipe = new Pipe({onNotified: this.onPipeNotified}); - pipe.setMiddlewares(MIDDLEWARE_DIRECTION_UPWARD, middlewares); + const pipe = new Pipe(); + pipe.on('broadcast', this.onPipeNotified); pipe.on(`next_${MIDDLEWARE_DIRECTION_UPWARD}`, this.sendForward); pipe.on(`next_${MIDDLEWARE_DIRECTION_DOWNWARD}`, this.sendBackward); + pipe.setMiddlewares(middlewares); return pipe; }