core: refactor pipe.js

This commit is contained in:
Micooz 2017-09-02 21:31:27 +08:00
parent 93ab9d0390
commit a3cab733bd
2 changed files with 48 additions and 43 deletions

@ -1,8 +1,5 @@
import EventEmitter from 'events';
import {
MIDDLEWARE_DIRECTION_UPWARD,
MIDDLEWARE_DIRECTION_DOWNWARD
} from './middleware';
import {MIDDLEWARE_DIRECTION_UPWARD} from './middleware';
import {PRESET_INIT, PRESET_FAILED} from '../presets/defs';
export class Pipe extends EventEmitter {
@ -11,15 +8,14 @@ export class Pipe extends EventEmitter {
_downstream_middlewares = [];
_onNotified = () => 0;
_cacheBuffer = null; // buffer
constructor(props = {}) {
constructor() {
super();
this._onNotified = typeof props.onNotified === 'undefined' ? () => 0 : props.onNotified;
this.onBroadcast = this.onBroadcast.bind(this);
this.broadcast = this.broadcast.bind(this);
}
onBroadcast(action) {
broadcast(action) {
const middlewares = this.getMiddlewares();
const results = [];
for (const middleware of middlewares) {
@ -27,31 +23,40 @@ export class Pipe extends EventEmitter {
}
// if no middleware handled this action, bubble up to where pipe created.
if (results.every((result) => !!result === false)) {
this._onNotified(action);
this.emit('broadcast', action);
}
}
setMiddlewares(direction, middlewares) {
/**
* setup two-way middlewares
* @param middlewares
*/
setMiddlewares(middlewares) {
// set listeners
for (const middleware of middlewares) {
middleware.setMaxListeners(2);
middleware.subscribe(this.onBroadcast);
middleware.setMaxListeners(4);
middleware.on('broadcast', this.broadcast);
middleware.on('fail', (name, message) => void this.broadcast({
type: PRESET_FAILED,
payload: {
name,
message,
orgData: this._cacheBuffer
}
}));
}
if (direction === MIDDLEWARE_DIRECTION_UPWARD) {
this._upstream_middlewares = middlewares;
this._downstream_middlewares = [].concat(middlewares).reverse();
} else {
this._downstream_middlewares = middlewares;
this._upstream_middlewares = [].concat(middlewares).reverse();
}
// make initial broadcast to all presets
this.onBroadcast(direction, {type: PRESET_INIT, payload: {broadcast: this.onBroadcast}});
this._upstream_middlewares = middlewares;
this._downstream_middlewares = [].concat(middlewares).reverse();
// initial broadcast
this.broadcast({type: PRESET_INIT, payload: {broadcast: this.broadcast}});
}
getMiddlewares(direction = MIDDLEWARE_DIRECTION_UPWARD) {
return {
[MIDDLEWARE_DIRECTION_UPWARD]: this._upstream_middlewares,
[MIDDLEWARE_DIRECTION_DOWNWARD]: this._downstream_middlewares
}[direction];
if (direction === MIDDLEWARE_DIRECTION_UPWARD) {
return this._upstream_middlewares;
} else {
return this._downstream_middlewares;
}
}
feed(direction, buffer) {
@ -60,30 +65,28 @@ export class Pipe extends EventEmitter {
// methods to be injected
const direct = (buf, isReverse = false) => this.emit(isReverse ? `next_${-direction}` : eventName, buf);
const fail = (name, message) => this.onBroadcast({
type: PRESET_FAILED,
payload: {
name,
message,
orgData: buffer
}
});
// create event chain among middlewares
const last = middlewares.reduce((prev, next) => {
if (prev.listenerCount(eventName) < 1) {
prev.on(eventName, (buf) => next.write(direction, {buffer: buf, direct, fail}));
if (!prev.hasListener(eventName)) {
prev.on(eventName, (buf) => next.write(direction, {buffer: buf, direct}));
}
return next;
});
// the last middleware send data out via direct(buf, false)
if (last.listenerCount(eventName) < 1) {
if (!last.hasListener(eventName)) {
last.on(eventName, direct);
}
// begin pipe
middlewares[0].write(direction, {buffer, direct, fail});
middlewares[0].write(direction, {buffer, direct});
// TODO(fix): here only cached the first incoming data for carrying "orgData" in PRESET_FAILED.
// NOTE: cache full data may be a bad practice.
if (this._cacheBuffer === null) {
this._cacheBuffer = buffer;
}
}
destroy() {
@ -93,6 +96,8 @@ export class Pipe extends EventEmitter {
}
this._upstream_middlewares = null;
this._downstream_middlewares = null;
this._staging = null;
this.removeAllListeners();
}
}

@ -84,7 +84,7 @@ export class Relay extends EventEmitter {
}
this._presets = presets;
this._pipe = this.createPipe(presets);
this._pipe.onBroadcast({
this._pipe.broadcast({
type: CONNECTION_CREATED,
payload: {
host: this._remoteHost,
@ -187,7 +187,7 @@ export class Relay extends EventEmitter {
if (this._bsocket) {
this._bsocket.destroy();
this._bsocket = null;
this._pipe.onBroadcast({
this._pipe.broadcast({
type: CONNECTION_CLOSED,
payload: {
host: this._remoteHost,
@ -273,7 +273,6 @@ export class Relay extends EventEmitter {
*/
setPresets(callback) {
this._presets = callback(this._presets);
this._pipe.removeAllListeners();
this._pipe.destroy();
this._pipe = this.createPipe(this._presets);
}
@ -283,10 +282,11 @@ export class Relay extends EventEmitter {
*/
createPipe(presets) {
const middlewares = presets.map((preset) => createMiddleware(preset.name, preset.params || {}));
const pipe = new Pipe({onNotified: this.onPipeNotified});
pipe.setMiddlewares(MIDDLEWARE_DIRECTION_UPWARD, middlewares);
const pipe = new Pipe();
pipe.on('broadcast', this.onPipeNotified);
pipe.on(`next_${MIDDLEWARE_DIRECTION_UPWARD}`, this.sendForward);
pipe.on(`next_${MIDDLEWARE_DIRECTION_DOWNWARD}`, this.sendBackward);
pipe.setMiddlewares(middlewares);
return pipe;
}