src: decrease error rate of multiplexing

This commit is contained in:
Micooz 2018-03-01 13:07:02 +08:00
parent be35425d6d
commit 15be66a6ee
4 changed files with 21 additions and 57 deletions

@ -249,7 +249,7 @@ export class Hub {
if (this._config.mux) {
if (this._config.is_client) {
relay.id = cid; // NOTE: this cid will be used in mux preset
muxRelay.addSubRelay(relay);
muxRelay.addSubRelay(cid, relay);
} else {
// on server side, this relay is a muxRelay
this._muxRelays.set(relay.id, relay);

@ -53,8 +53,7 @@ export class MuxRelay extends Relay {
}
destroy() {
super.destroy();
const subRelays = this.getSubRelays();
const subRelays = this._subRelays;
if (subRelays) {
logger.info(`[mux-${this.id}] connection destroyed, cleanup ${subRelays.size} sub connections`);
// cleanup associate relays
@ -64,6 +63,7 @@ export class MuxRelay extends Relay {
subRelays.clear();
this._subRelays = null;
}
super.destroy();
}
// events
@ -105,9 +105,9 @@ export class MuxRelay extends Relay {
// create relations between mux relay and its sub relays,
// when mux relay destroyed, all sub relays should be destroyed as well.
muxRelay.addSubRelay(relay);
muxRelay.addSubRelay(cid, relay);
logger.info(`[mux-${muxRelay.id}] create sub connection(cid=${relay.id}), total: ${muxRelay.getSubRelays().size}`);
logger.info(`[mux-${muxRelay.id}] create sub connection(cid=${cid}), total: ${muxRelay._subRelays.size}`);
} else {
logger.warn(`[mux-${muxRelay.id}] cannot create new sub connection due to no mux connection are available`);
}
@ -133,9 +133,9 @@ export class MuxRelay extends Relay {
onSubConnCloseByProtocol({ cid }) {
const relay = this._subRelays.get(cid);
if (relay) {
this._removeSubRelay(cid);
this._subRelays.delete(cid);
relay.destroy();
logger.debug(`[mux-${this.id}] sub connection(cid=${cid}) closed by protocol`);
logger.info(`[mux-${this.id}] sub connection(cid=${cid}) closed by protocol`);
}
// else {
// logger.warn(`[mux-${this.id}] fail to close sub connection by protocol, no such sub connection(cid=${cid})`);
@ -143,47 +143,23 @@ export class MuxRelay extends Relay {
}
onSubConnCloseBySelf({ cid }) {
const relay = this._getSubRelay(cid);
const relay = this._subRelays.get(cid);
if (relay) {
this.destroySubRelay(cid);
logger.debug(`[mux-${this.id}] sub connection(cid=${cid}) closed by self, remains: ${this.getSubRelays().size}`);
this.encode(Buffer.alloc(0), { cid, isClosing: true });
this._subRelays.delete(cid);
relay.destroy();
logger.info(`[mux-${this.id}] sub connection(cid=${cid}) closed by self, remains: ${this._subRelays.size}`);
}
// else {
// logger.warn(`[mux-${muxRelay.id}] fail to close sub connection by self, no such sub connection(cid=${cid})`);
// logger.warn(`[mux-${this.id}] fail to close sub connection by self, no such sub connection(cid=${cid})`);
// }
}
// methods
addSubRelay(relay) {
relay.on('close', this.onSubConnCloseBySelf.bind(this, { cid: relay.id }));
this._subRelays.set(relay.id, relay);
}
getSubRelays() {
return this._subRelays;
}
destroySubRelay(cid) {
const relay = this._getSubRelay(cid);
if (relay) {
this.encode(Buffer.alloc(0), { cid, isClosing: true });
this._removeSubRelay(cid);
relay.destroy();
}
// else {
// logger.warn(`[mux-${this.id}] fail to close sub connection by calling destroySubRelay(), no such sub connection(cid=${cid})`);
// }
}
_removeSubRelay(cid) {
this._subRelays.delete(cid);
}
_getSubRelay(cid) {
if (this._subRelays) {
return this._subRelays.get(cid);
}
addSubRelay(cid, relay) {
relay.on('close', this.onSubConnCloseBySelf.bind(this, { cid }));
this._subRelays.set(cid, relay);
}
_getRandomMuxRelay() {

@ -1,6 +1,6 @@
import net from 'net';
import ip from 'ip';
import { logger, numberToBuffer } from '../utils';
import { logger, numberToBuffer, dumpHex } from '../utils';
// Socks4 Request Message
// +----+-----+----------+--------+----------+--------+
@ -286,7 +286,7 @@ export function createServer({ bindAddress, bindPort }) {
removeSocksListeners();
return;
}
logger.error(`[socks] [${appAddress}] invalid socks handshake message: ${buffer.slice(0, 60).toString('hex')}`);
logger.error(`[socks] [${appAddress}] invalid socks handshake message: ${dumpHex(buffer)}`);
socket.destroy();
}
else if (stage === STAGE_SOCKS5_REQUEST_MESSAGE) {
@ -334,7 +334,7 @@ export function createServer({ bindAddress, bindPort }) {
}
}
} else {
logger.error(`[socks] [${appAddress}] invalid socks5 request message: ${buffer.slice(0, 60).toString('hex')}`);
logger.error(`[socks] [${appAddress}] invalid socks5 request message: ${dumpHex(buffer)}`);
socket.destroy();
}
}

@ -76,20 +76,14 @@ export class MuxInbound extends Inbound {
}
}
end() {
// TODO: handle half close correctly in mux protocol
this.close();
}
close() {
const doClose = () => {
if (this._config.is_server) {
const { muxRelay, cid } = this.ctx;
const { muxRelay } = this.ctx;
const inbound = muxRelay.getInbound();
if (inbound) {
inbound.removeListener('drain', this.onDrain);
}
muxRelay.destroySubRelay(cid);
}
if (!this._destroyed) {
this._destroyed = true;
@ -159,20 +153,14 @@ export class MuxOutbound extends Outbound {
}
}
end() {
// TODO: handle half close correctly in mux protocol
this.close();
}
close() {
const doClose = () => {
if (this._config.is_client) {
const { muxRelay, cid } = this.ctx;
const { muxRelay } = this.ctx;
const outbound = muxRelay.getOutbound();
if (outbound) {
outbound.removeListener('drain', this.onDrain);
}
muxRelay.destroySubRelay(cid);
}
if (!this._destroyed) {
this._destroyed = true;