src: refactor PRESET_FAILED handling

This commit is contained in:
Micooz 2018-06-16 22:23:13 +08:00
parent 20f634713c
commit c0312b4837
4 changed files with 52 additions and 82 deletions

@ -1,8 +1,8 @@
import EventEmitter from 'events';
import { ACL } from './acl';
import { ACL, ACL_CLOSE_CONNECTION } from './acl';
import { Pipe } from './pipe';
import { Tracker } from './tracker';
import { logger } from '../utils';
import { getRandomInt, logger } from '../utils';
import {
TcpInbound, TcpOutbound,
@ -194,6 +194,15 @@ export class Relay extends EventEmitter {
if (this._acl && this._acl.checkFailTimes(this._config.acl_tries)) {
return;
}
this.onPresetFailed(action);
return;
}
if (action.type === ACL_CLOSE_CONNECTION) {
const transport = this._transport;
const remote = `${this._sourceAddress.host}:${this._sourceAddress.port}`;
logger.warn(`[relay] [${transport}] [${remote}] acl request to close this connection`);
this.destroy();
return;
}
this._inbound && this._inbound.onBroadcast(action);
this._outbound && this._outbound.onBroadcast(action);
@ -235,6 +244,44 @@ export class Relay extends EventEmitter {
}
};
async onPresetFailed(action) {
const { name, message, orgData } = action.payload;
const transport = this._transport;
const remote = `${this._sourceAddress.host}:${this._sourceAddress.port}`;
logger.error(`[relay] [${transport}] [${remote}] preset "${name}" fail to process: ${message}`);
this.emit('_error', new Error(message));
// close connection directly on client side
if (this._config.is_client) {
logger.warn(`[relay] [${transport}] [${remote}] connection closed`);
this.destroy();
}
// for server side, redirect traffic if "redirect" is set, otherwise, close connection after a random timeout
if (this._config.is_server && !this._config.mux) {
if (this._config.redirect) {
const [host, port] = this._config.redirect.split(':');
logger.warn(`[relay] [${transport}] [${remote}] connection is redirecting to: ${host}:${port}`);
// clear preset list
this._pipe.updatePresets([]);
// connect to "redirect" remote
await this._outbound.connect({ host, port: +port });
if (this._outbound.writable) {
this._outbound.write(orgData);
}
} else {
this._outbound.pause && this._outbound.pause();
const timeout = getRandomInt(5, 30);
logger.warn(`[relay] [${transport}] [${remote}] connection will be closed in ${timeout}s...`);
setTimeout(this.destroy.bind(this), timeout * 1e3);
}
}
}
// methods
encode(buffer, extraArgs) {

@ -1,6 +1,5 @@
import { Inbound, Outbound } from './defs';
import { logger } from '../utils';
import { CONNECT_TO_REMOTE, CONNECTED_TO_REMOTE, PRESET_FAILED } from '../constants';
import { CONNECT_TO_REMOTE, CONNECTED_TO_REMOTE } from '../constants';
export class MuxInbound extends Inbound {
@ -55,21 +54,11 @@ export class MuxInbound extends Inbound {
socket.resume();
}
break;
case PRESET_FAILED:
this.onPresetFailed(action);
break;
default:
break;
}
}
async onPresetFailed(action) {
const { name, message } = action.payload;
logger.error(`[${this.name}] [${this.remote}] preset "${name}" fail to process: ${message}`);
this.emit('_error', new Error(message));
// TODO: maybe have more things to do rather than keep silent
}
onDrain() {
this.emit('drain');
}

@ -1,6 +1,6 @@
import net from 'net';
import { Inbound, Outbound } from './defs';
import { DNSCache, logger, getRandomInt } from '../utils';
import { DNSCache, logger } from '../utils';
import {
MAX_BUFFERED_SIZE,
@ -8,11 +8,9 @@ import {
PIPE_DECODE,
CONNECT_TO_REMOTE,
CONNECTED_TO_REMOTE,
PRESET_FAILED,
} from '../constants';
import {
ACL_CLOSE_CONNECTION,
ACL_PAUSE_RECV,
ACL_PAUSE_SEND,
ACL_RESUME_RECV,
@ -150,13 +148,6 @@ export class TcpInbound extends Inbound {
case CONNECTED_TO_REMOTE:
this.resume();
break;
case PRESET_FAILED:
this.onPresetFailed(action);
break;
case ACL_CLOSE_CONNECTION:
logger.info(`[${this.name}] [${this.remote}] acl request to close connection`);
this.close();
break;
case ACL_PAUSE_RECV:
this.pause();
break;
@ -168,42 +159,6 @@ export class TcpInbound extends Inbound {
}
}
async onPresetFailed(action) {
const { name, message } = action.payload;
logger.error(`[${this.name}] [${this.remote}] preset "${name}" fail to process: ${message}`);
this.emit('_error', new Error(message));
// close connection directly on client side
if (this._config.is_client) {
logger.warn(`[${this.name}] [${this.remote}] connection closed`);
this.onClose();
}
// for server side, redirect traffic if "redirect" is set, otherwise, close connection after a random timeout
if (this._config.is_server && !this._config.mux) {
if (this._config.redirect) {
const { orgData } = action.payload;
const [host, port] = this._config.redirect.split(':');
logger.warn(`[${this.name}] [${this.remote}] connection is redirecting to: ${host}:${port}`);
// clear preset list
this.ctx.pipe.updatePresets([]);
// connect to "redirect" remote
await this._outbound.connect({ host, port: +port });
if (this._outbound.writable) {
this._outbound.write(orgData);
}
} else {
this.pause();
const timeout = getRandomInt(10, 40);
logger.warn(`[${this.name}] [${this.remote}] connection will be closed in ${timeout}s...`);
setTimeout(this.onClose, timeout * 1e3);
}
}
}
}
export class TcpOutbound extends Outbound {

@ -1,6 +1,6 @@
import dgram from 'dgram';
import { Inbound, Outbound } from './defs';
import { PIPE_ENCODE, PIPE_DECODE, CONNECT_TO_REMOTE, PRESET_FAILED } from '../constants';
import { PIPE_ENCODE, PIPE_DECODE, CONNECT_TO_REMOTE } from '../constants';
import { logger } from '../utils';
export class UdpInbound extends Inbound {
@ -12,7 +12,6 @@ export class UdpInbound extends Inbound {
constructor(props) {
super(props);
this.onReceive = this.onReceive.bind(this);
this.onPresetFailed = this.onPresetFailed.bind(this);
this._socket = this.ctx.socket;
}
@ -22,26 +21,6 @@ export class UdpInbound extends Inbound {
this.ctx.pipe.feed(type, buffer);
}
onBroadcast(action) {
switch (action.type) {
case PRESET_FAILED:
this.onPresetFailed(action);
break;
default:
break;
}
}
onPresetFailed(action) {
const { name, message } = action.payload;
logger.error(`[udp:inbound] [${this.remote}] preset "${name}" fail to process: ${message}`);
if (this._outbound) {
this._outbound.close();
this._outbound = null;
}
this.close();
}
write(buffer) {
const { address, port } = this._rinfo;
const onSendError = (err) => {