core: extract onPipeNotified() from relay.hs to action-handler.js

This commit is contained in:
Micooz 2017-09-05 14:35:19 +08:00
parent 17d1e02abb
commit d938eda05b
No known key found for this signature in database
GPG Key ID: 002FB5DD584D6CB1
3 changed files with 128 additions and 97 deletions

118
src/core/action-handler.js Normal file

@ -0,0 +1,118 @@
// ======================================================
// + NOTICE:
// + This file is a partial of relay.js, please take care
// + of function calls and member access via "this".
// ======================================================
import {logger, getRandomInt} from '../utils';
import {
CONNECT_TO_REMOTE,
PRESET_FAILED,
PRESET_CLOSE_CONNECTION,
PRESET_PAUSE_RECV,
PRESET_PAUSE_SEND,
PRESET_RESUME_RECV,
PRESET_RESUME_SEND
} from '../presets/defs';
const mapping = {
[CONNECT_TO_REMOTE]: onConnectToRemote,
[PRESET_FAILED]: onPresetFailed,
[PRESET_CLOSE_CONNECTION]: onPresetCloseConnection,
[PRESET_PAUSE_RECV]: onPresetPauseRecv,
[PRESET_RESUME_RECV]: onPresetResumeRecv,
[PRESET_PAUSE_SEND]: onPresetPauseSend,
[PRESET_RESUME_SEND]: onPresetResumeSend
};
async function onConnectToRemote(action) {
const {host, port, onConnected} = action.payload;
if (__IS_SERVER__) {
await this.connect({host, port});
}
if (__IS_CLIENT__) {
logger.info(`[relay] [${this.remote}] request: ${host}:${port}`);
await this.connect({host: __SERVER_HOST__, port: __SERVER_PORT__});
}
this._isConnectedToRemote = true;
if (typeof onConnected === 'function') {
onConnected();
}
}
async function onPresetFailed(action) {
const {name, message} = action.payload;
logger.error(`[relay] [${this.remote}] preset "${name}" fail to process: ${message}`);
// close connection directly on client side
if (__IS_CLIENT__) {
logger.warn(`[rely] [${this.remote}] connection closed`);
this.destroy();
}
// for server side, redirect traffic if "redirect" is set, otherwise, close connection after a random timeout
if (__IS_SERVER__) {
if (__REDIRECT__) {
const {orgData} = action.payload;
const [host, port] = __REDIRECT__.split(':');
logger.warn(`[rely] [${this.remote}] connection is redirecting to: ${host}:${port}`);
// replace presets to tracker only
this.setPresets((/* prevPresets */) => [{name: 'tracker'}]);
// connect to "redirect" remote
const fsocket = await this.connect({host, port: +port});
if (fsocket && fsocket.writable) {
fsocket.write(orgData);
}
} else {
this._bsocket && this._bsocket.pause();
this._fsocket && this._fsocket.pause();
const timeout = getRandomInt(10, 40);
logger.warn(`[rely] [${this.remote}] connection will be closed in ${timeout}s...`);
setTimeout(this.destroy, timeout * 1e3);
}
}
}
function onPresetCloseConnection() {
logger.info(`[relay] [${this.remote}] preset request to close connection`);
this.destroy();
}
// traffic control
function onPresetPauseRecv() {
__IS_SERVER__ ?
(this._bsocket && this._bsocket.pause()) :
(this._fsocket && this._fsocket.pause());
}
function onPresetResumeRecv() {
__IS_SERVER__ ?
(this._bsocket && this._bsocket.resume()) :
(this._fsocket && this._fsocket.resume());
}
function onPresetPauseSend() {
__IS_SERVER__ ?
(this._fsocket && this._fsocket.pause()) :
(this._bsocket && this._bsocket.pause());
}
function onPresetResumeSend() {
__IS_SERVER__ ?
(this._fsocket && this._fsocket.resume()) :
(this._bsocket && this._bsocket.resume());
}
export default function ActionHandler(action) {
const handler = mapping[action.type];
if (typeof handler === 'function') {
handler.call(this, action);
} else {
logger.warn(`unhandled action type: ${action.type}`);
}
}

@ -62,6 +62,8 @@ export class Pipe extends EventEmitter {
}
feed(direction, buffer) {
this._cacheBuffer = buffer;
const eventName = `next_${direction}`;
const middlewares = this.getMiddlewares(direction);
@ -83,12 +85,6 @@ export class Pipe extends EventEmitter {
// begin pipe
middlewares[0].write(direction, {buffer, direct});
// TODO(fix): here only cached the first incoming data for carrying "orgData" in PRESET_FAILED.
// NOTE: cache full data may be a bad practice.
if (this._cacheBuffer === null) {
this._cacheBuffer = buffer;
}
}
destroy() {
@ -98,7 +94,7 @@ export class Pipe extends EventEmitter {
}
this._upstream_middlewares = null;
this._downstream_middlewares = null;
this._staging = null;
this._cacheBuffer = null;
this.removeAllListeners();
}

@ -1,28 +1,12 @@
import EventEmitter from 'events';
import net from 'net';
import tls from 'tls';
import {logger, isValidHostname, isValidPort} from '../utils';
import ActionHandler from './action-handler';
import {Pipe} from './pipe';
import {DNSCache} from './dns-cache';
import {
MIDDLEWARE_DIRECTION_UPWARD,
MIDDLEWARE_DIRECTION_DOWNWARD,
createMiddleware
} from './middleware';
import {
CONNECTION_CREATED,
CONNECTION_CLOSED,
CONNECT_TO_REMOTE,
PRESET_FAILED,
PRESET_CLOSE_CONNECTION,
PRESET_PAUSE_RECV,
PRESET_PAUSE_SEND,
PRESET_RESUME_RECV,
PRESET_RESUME_SEND
} from '../presets/defs';
import {BEHAVIOUR_EVENT_ON_PRESET_FAILED} from '../behaviours';
import {MIDDLEWARE_DIRECTION_UPWARD, MIDDLEWARE_DIRECTION_DOWNWARD, createMiddleware} from './middleware';
import {logger, isValidHostname, isValidPort} from '../utils';
import {CONNECTION_CREATED, CONNECTION_CLOSED} from '../presets/defs';
const MAX_BUFFERED_SIZE = 512 * 1024; // 512KB
@ -37,7 +21,7 @@ export class Relay extends EventEmitter {
_dnsCache = null;
_isConnectedToDst = false;
_isConnectedToRemote = false;
_remoteHost = '';
@ -60,7 +44,6 @@ export class Relay extends EventEmitter {
this.onBackwardSocketClose = this.onBackwardSocketClose.bind(this);
this.onForwardSocketTimeout = this.onForwardSocketTimeout.bind(this);
this.onForwardSocketClose = this.onForwardSocketClose.bind(this);
this.onPipeNotified = this.onPipeNotified.bind(this);
this.sendForward = this.sendForward.bind(this);
this.sendBackward = this.sendBackward.bind(this);
this.connect = this.connect.bind(this);
@ -121,7 +104,7 @@ export class Relay extends EventEmitter {
// bsocket
onForward(buffer) {
if (this.fsocketWritable || !this._isConnectedToDst) {
if (this.fsocketWritable || !this._isConnectedToRemote) {
const direction = __IS_CLIENT__ ? MIDDLEWARE_DIRECTION_UPWARD : MIDDLEWARE_DIRECTION_DOWNWARD;
this._pipe.feed(direction, buffer);
}
@ -288,79 +271,13 @@ export class Relay extends EventEmitter {
createPipe(presets) {
const middlewares = presets.map((preset) => createMiddleware(preset.name, preset.params || {}));
const pipe = new Pipe();
pipe.on('broadcast', this.onPipeNotified);
pipe.on('broadcast', ActionHandler.bind(this)); // if no action were caught by presets
pipe.on(`next_${MIDDLEWARE_DIRECTION_UPWARD}`, this.sendForward);
pipe.on(`next_${MIDDLEWARE_DIRECTION_DOWNWARD}`, this.sendBackward);
pipe.setMiddlewares(middlewares);
return pipe;
}
/**
* if no action were caught by middlewares
* @param action
* @returns {*}
*/
async onPipeNotified(action) {
switch (action.type) {
case CONNECT_TO_REMOTE: {
const {host, port, onConnected} = action.payload;
if (__IS_SERVER__) {
await this.connect({host, port});
}
if (__IS_CLIENT__) {
logger.info(`[relay] [${this.remote}] request: ${host}:${port}`);
await this.connect({host: __SERVER_HOST__, port: __SERVER_PORT__});
}
this._isConnectedToDst = true;
if (typeof onConnected === 'function') {
onConnected();
}
break;
}
case PRESET_FAILED: {
const props = {
remoteHost: this._remoteHost,
remotePort: this._remotePort,
onClose: this.destroy,
connect: this.connect,
setPresets: this.setPresets,
action: action
};
const {name, message} = action.payload;
logger.error(`[relay] [${this.remote}] preset "${name}" fail to process: ${message}`);
await __BEHAVIOURS__[BEHAVIOUR_EVENT_ON_PRESET_FAILED].run(props);
break;
}
case PRESET_CLOSE_CONNECTION: {
logger.info(`[relay] [${this.remote}] preset request to close connection`);
this.destroy();
break;
}
case PRESET_PAUSE_RECV:
__IS_SERVER__ ?
(this._bsocket && this._bsocket.pause()) :
(this._fsocket && this._fsocket.pause());
break;
case PRESET_PAUSE_SEND:
__IS_SERVER__ ?
(this._fsocket && this._fsocket.pause()) :
(this._bsocket && this._bsocket.pause());
break;
case PRESET_RESUME_RECV:
__IS_SERVER__ ?
(this._bsocket && this._bsocket.resume()) :
(this._fsocket && this._fsocket.resume());
break;
case PRESET_RESUME_SEND:
__IS_SERVER__ ?
(this._fsocket && this._fsocket.resume()) :
(this._bsocket && this._bsocket.resume());
break;
default:
break;
}
}
/**
* close both sides
*/