core,test: support piping udp packets

This commit is contained in:
Micooz 2017-10-26 16:45:07 +08:00
parent 0b8ae8fe97
commit 0598950d91
No known key found for this signature in database
GPG Key ID: 002FB5DD584D6CB1
4 changed files with 25 additions and 15 deletions

@ -90,9 +90,10 @@ export class Middleware extends EventEmitter {
* @param direction
* @param buffer
* @param direct
* @param isUdp
*/
write(direction, {buffer, direct}) {
const type = (direction === PIPE_ENCODE) ? 'Out' : 'In';
write({direction, buffer, direct, isUdp}) {
const type = (direction === PIPE_ENCODE ? 'Out' : 'In') + (isUdp ? 'Udp' : '');
// prepare args
const broadcast = this.onPresetBroadcast;
@ -105,7 +106,7 @@ export class Middleware extends EventEmitter {
}
};
// clientOut, serverOut, clientIn, serverIn
// clientXXX, serverXXX
const nextLifeCycleHook = (buf/* , isReverse = false */) => {
const args = {buffer: buf, next, broadcast, direct, fail};
const ret = __IS_CLIENT__ ? this._impl[`client${type}`](args) : this._impl[`server${type}`](args);
@ -114,8 +115,8 @@ export class Middleware extends EventEmitter {
}
};
// beforeOut, beforeIn
// NOTE: next(buf, isReverse) is not available in beforeOut/beforeIn
// beforeXXX
// NOTE: next(buf, isReverse) is not available in beforeXXX
const args = {buffer, next: nextLifeCycleHook, broadcast, direct, fail};
const ret = this._impl[`before${type}`](args);
if (ret instanceof Buffer) {

@ -12,6 +12,8 @@ export class Pipe extends EventEmitter {
_downstream_middlewares = [];
_isPipingUdp = false;
_cacheBuffer = null;
_destroyed = false;
@ -20,10 +22,11 @@ export class Pipe extends EventEmitter {
return this._destroyed;
}
constructor(presets) {
constructor({presets, isUdp = false}) {
super();
this.broadcast = this.broadcast.bind(this);
this.createMiddlewares(presets);
this._isPipingUdp = isUdp;
}
broadcast(name, action) {
@ -80,7 +83,7 @@ export class Pipe extends EventEmitter {
this._feed(direction, buffer);
}
} catch (err) {
logger.error('[pipe] error occurred while piping', err);
logger.error('[pipe] error occurred while piping:', err);
}
}
@ -98,21 +101,22 @@ export class Pipe extends EventEmitter {
_feed(direction, buffer) {
const middlewares = this.getMiddlewares(direction);
// methods to be injected
// args to be injected
const isUdp = this._isPipingUdp;
const direct = (buf, isReverse = false) => this.emit(isReverse ? `post_${-direction}` : `post_${direction}`, buf);
// create event chain among middlewares
const event = `next_${direction}`;
const first = middlewares[0];
if (!first.hasListener(event)) {
const last = middlewares.reduce((prev, next) => {
prev.on(event, (buf) => next.write(direction, {buffer: buf, direct}));
prev.on(event, (buf) => next.write({direction, buffer: buf, direct, isUdp}));
return next;
});
// the last middleware send data out via direct(buf, false)
last.on(event, direct);
}
// begin pipe
first.write(direction, {buffer, direct});
first.write({direction, buffer, direct, isUdp});
}
}

@ -26,16 +26,19 @@ export class Relay extends EventEmitter {
_outbound = null;
_transport = null;
_pipe = null;
_presets = [];
constructor({context, Inbound, Outbound, proxyRequest = null}) {
constructor({transport, context, Inbound, Outbound, proxyRequest = null}) {
super();
this.setPresets = this.setPresets.bind(this);
this.onBroadcast = this.onBroadcast.bind(this);
this.postPipeForward = this.postPipeForward.bind(this);
this.postPipeBackward = this.postPipeBackward.bind(this);
this._transport = transport;
// pipe
this._presets = preparePresets();
this._pipe = this.createPipe(this._presets);
@ -104,7 +107,7 @@ export class Relay extends EventEmitter {
* create pipes for both data forward and backward
*/
createPipe(presets) {
const pipe = new Pipe(presets);
const pipe = new Pipe({presets, isUdp: this._transport === 'udp'});
pipe.on('broadcast', this.onBroadcast.bind(this)); // if no action were caught by presets
pipe.on(`post_${PIPE_ENCODE}`, this.postPipeForward);
pipe.on(`post_${PIPE_DECODE}`, this.postPipeBackward);
@ -141,7 +144,7 @@ export function createRelay(transport, context, proxyRequest = null) {
} else {
[Inbound, Outbound] = __IS_CLIENT__ ? [TcpInbound, mapping[transport][1]] : [mapping[transport][0], TcpOutbound];
}
const props = {context, Inbound, Outbound, proxyRequest};
const props = {transport, context, Inbound, Outbound, proxyRequest};
const relay = new Relay(props);
relay.id = uniqueId(`${transport}_`);
return relay;

@ -36,7 +36,8 @@ export class PresetRunner extends EventEmitter {
this.middleware.on('post_-1', resolve);
this.middleware.on('fail', reject);
this.middleware.on('broadcast', (name, action) => this.emit('broadcast', action));
this.middleware.write(__IS_CLIENT__ ? PIPE_ENCODE : PIPE_DECODE, {
this.middleware.write({
direction: __IS_CLIENT__ ? PIPE_ENCODE : PIPE_DECODE,
buffer: data,
direct: resolve
});
@ -52,7 +53,8 @@ export class PresetRunner extends EventEmitter {
this.middleware.on('post_-1', resolve);
this.middleware.on('fail', reject);
this.middleware.on('broadcast', (name, action) => this.emit('broadcast', action));
this.middleware.write(__IS_CLIENT__ ? PIPE_DECODE : PIPE_ENCODE, {
this.middleware.write({
direction: __IS_CLIENT__ ? PIPE_DECODE : PIPE_ENCODE,
buffer: data,
direct: resolve
});