core: allow pass the second parameter to next(buf, isReverse)

This commit is contained in:
Micooz 2017-08-10 14:33:34 +08:00
parent fcdc1adc01
commit ad588542eb
2 changed files with 14 additions and 8 deletions

@ -46,24 +46,32 @@ export class Middleware extends EventEmitter {
const broadcast = this._broadcast;
const _fail = (message) => fail(this.getName(), message);
const next = (buf) => {
// NOTE: next(buf, isReverse) is not available in beforeOut/beforeIn
const next = (buf/* , isReverse = false */) => {
const args = {
buffer: buf,
next: (processed) => this.emit(`next_${direction}`, processed),
next: (processed, isReverse = false) => {
const hasListener = this.emit(`next_${isReverse ? -direction : direction}`, processed);
// oh my nice hack to deal with reverse pipeline if haven't been created
if (!hasListener) {
direct(processed, isReverse);
}
},
broadcast,
direct,
fail: _fail
};
// clientOut, serverOut, clientIn, serverIn
const ret = __IS_CLIENT__ ? this._impl[`client${type}`](args) : this._impl[`server${type}`](args);
if (typeof ret !== 'undefined') {
if (ret instanceof Buffer) {
args.next(ret);
}
};
// beforeOut, beforeIn
const r = this._impl[`before${type}`]({buffer, next, broadcast, direct, fail: _fail});
if (typeof r !== 'undefined') {
if (r instanceof Buffer) {
next(r);
}
}

@ -57,9 +57,7 @@ export class Pipe extends EventEmitter {
const middlewares = this.getMiddlewares(direction);
// methods to be injected
const direct = (buf, isReverse = false) => {
this.emit(isReverse ? `next_${-direction}` : eventName, buf);
};
const direct = (buf, isReverse = false) => this.emit(isReverse ? `next_${-direction}` : eventName, buf);
const fail = (name, message) => this.onBroadcast(direction, {
type: PROCESSING_FAILED,
payload: {
@ -77,7 +75,7 @@ export class Pipe extends EventEmitter {
return next;
});
// the last middleware send data out via direct(buf)
// the last middleware send data out via direct(buf, false)
if (last.listenerCount(eventName) < 1) {
last.on(eventName, direct);
}