diff --git a/src/core/mux-relay.js b/src/core/mux-relay.js index ab6b754..f5714bf 100644 --- a/src/core/mux-relay.js +++ b/src/core/mux-relay.js @@ -15,6 +15,34 @@ import { WssInbound, WssOutbound, } from '../transports'; +class MuxInbound extends EventEmitter { + + constructor(inbound) { + super(); + this._inbound = inbound; + this._inbound.on('drain', () => this.emit('drain')); + } + + get bufferSize() { + return this._inbound.bufferSize; + } + +} + +class MuxOutbound extends EventEmitter { + + constructor(outbound) { + super(); + this._outbound = outbound; + this._outbound.on('drain', () => this.emit('drain')); + } + + get bufferSize() { + return this._outbound.bufferSize; + } + +} + // .on('_connect') // .on('_read') // .on('_write') @@ -82,6 +110,7 @@ export class MuxRelay extends EventEmitter { const { Inbound } = this._getBounds(transport); const inbound = new Inbound({ config, source, conn }); + inbound.setOutbound(new MuxOutbound(outbound)); inbound.on('_error', (err) => this.emit('_error', err)); inbound.on('data', (buffer) => { if (!isFirstFrameOut) { @@ -189,6 +218,7 @@ export class MuxRelay extends EventEmitter { // outbound const outbound = new Outbound({ config, source }); + outbound.setInbound(new MuxInbound(inbound)); outbound.on('_error', (err) => this.emit('_error', err)); outbound.on('data', (buffer) => { pipe.feed(PIPE_ENCODE, buffer, { cid }); @@ -228,7 +258,7 @@ export class MuxRelay extends EventEmitter { inbound.__tracker.trace(PIPE_DECODE, data.length); setImmediate(() => this.emit('_read', data.length)); } else { - logger.error(`[mux-relay] couldn't delivery data frame to cid=${cid}, dump=${dumpHex(data)}`); + logger.debug(`[mux-relay] couldn't delivery data frame to cid=${cid}, dump=${dumpHex(data)}`); } } else { const outbound = this._outbounds.get(cid); diff --git a/src/transports/h2.js b/src/transports/h2.js index fd4f0a8..8f65067 100644 --- a/src/transports/h2.js +++ b/src/transports/h2.js @@ -14,6 +14,7 @@ export class Http2Inbound extends Inbound { super(props); this._stream = this._conn; this._stream.on('data', this.onReceive); + this._stream.on('drain', this.onDrain); this._stream.on('error', this.onError); this._stream.on('close', this.onClose); this._stream.on('timeout', this.onTimeout); @@ -47,6 +48,10 @@ export class Http2Inbound extends Inbound { this.emit('data', buffer); }; + onDrain = () => { + this.emit('drain'); + }; + onTimeout = () => { logger.warn(`[${this.name}] [${this.remote}] timeout: no I/O on the connection for ${this._config.timeout / 1e3}s`); this.onClose(); @@ -107,6 +112,10 @@ export class Http2Outbound extends Outbound { this.emit('data', buffer); }; + onDrain = () => { + this.emit('drain'); + }; + onTimeout = () => { logger.warn(`[${this.name}] [${this.remote}] timeout: no I/O on the connection for ${this._config.timeout / 1e3}s`); this.onClose(); @@ -153,6 +162,7 @@ export class Http2Outbound extends Outbound { }); this._stream.on('error', this.onError); this._stream.on('data', this.onReceive); + this._stream.on('drain', this.onDrain); this._stream.on('timeout', this.onTimeout); this._stream.on('close', this.onClose); this._stream.setTimeout(this._config.timeout); diff --git a/src/transports/tcp.js b/src/transports/tcp.js index 17cfa29..4d8be42 100644 --- a/src/transports/tcp.js +++ b/src/transports/tcp.js @@ -1,6 +1,6 @@ import net from 'net'; import { Inbound, Outbound } from './defs'; -import { DNSCache, SpeedTester, logger } from '../utils'; +import { DNSCache, logger } from '../utils'; import { ACL_PAUSE_RECV, ACL_PAUSE_SEND, @@ -14,8 +14,6 @@ export class TcpInbound extends Inbound { _socket = null; - _speeder = new SpeedTester(); - _destroyed = false; constructor(props) { @@ -57,25 +55,17 @@ export class TcpInbound extends Inbound { onReceive = (buffer) => { this.emit('data', buffer); - this._speeder.feed(buffer.length); // throttle receiving data to reduce memory grow: // https://github.com/blinksocks/blinksocks/issues/60 // https://nodejs.org/dist/latest/docs/api/net.html#net_socket_buffersize const outbound = this.getOutbound(); if (outbound && outbound.bufferSize > MAX_BUFFERED_SIZE) { - const delta = outbound.bufferSize - (MAX_BUFFERED_SIZE >> 1); - // this speed is almost equivalent to outbound::write() speed - const speed = this._speeder.getSpeed(); - if (speed < 1) { - return; - } - const sec = (delta / speed).toFixed(3); - logger.debug(`[${this.name}] [${this.remote}] recv paused for ${sec}s due to inbound.bufferSize=${outbound.bufferSize} >= ${MAX_BUFFERED_SIZE}`); + logger.debug(`[${this.name}] [${this.remote}] recv paused due to inbound.bufferSize=${outbound.bufferSize} >= ${MAX_BUFFERED_SIZE}`); this.pause(); - setTimeout(() => { + outbound.once('drain', () => { logger.debug(`[${this.name}] [${this.remote}] resume to recv`); this.resume(); - }, sec * 1e3); + }); } }; @@ -90,13 +80,15 @@ export class TcpInbound extends Inbound { onHalfClose = () => { const outbound = this.getOutbound(); - outbound && outbound.end && outbound.end(); + if (outbound && outbound.end) { + outbound.end(); + } }; onClose = () => { this.close(); const outbound = this.getOutbound(); - if (outbound) { + if (outbound && outbound.close) { outbound.close(); this.setOutbound(null); } @@ -155,8 +147,6 @@ export class TcpOutbound extends Outbound { _socket = null; - _speeder = new SpeedTester(); - _destroyed = false; get name() { @@ -184,25 +174,17 @@ export class TcpOutbound extends Outbound { onReceive = (buffer) => { this.emit('data', buffer); - this._speeder.feed(buffer.length); // throttle receiving data to reduce memory grow: // https://github.com/blinksocks/blinksocks/issues/60 // https://nodejs.org/dist/latest/docs/api/net.html#net_socket_buffersize const inbound = this.getInbound(); if (inbound && inbound.bufferSize > MAX_BUFFERED_SIZE) { - const delta = inbound.bufferSize - (MAX_BUFFERED_SIZE >> 1); - // this speed is almost equivalent to inbound::write() speed - const speed = this._speeder.getSpeed(); - if (speed < 1) { - return; - } - const sec = (delta / speed).toFixed(3); - logger.debug(`[${this.name}] [${this.remote}] recv paused for ${sec}s due to inbound.bufferSize=${inbound.bufferSize} >= ${MAX_BUFFERED_SIZE}`); + logger.debug(`[${this.name}] [${this.remote}] recv paused due to inbound.bufferSize=${inbound.bufferSize} >= ${MAX_BUFFERED_SIZE}`); this.pause(); - setTimeout(() => { + inbound.once('drain', () => { logger.debug(`[${this.name}] [${this.remote}] resume to recv`); this.resume(); - }, sec * 1e3); + }); } }; @@ -217,13 +199,15 @@ export class TcpOutbound extends Outbound { onHalfClose = () => { const inbound = this.getInbound(); - inbound && inbound.end && inbound.end(); + if (inbound && inbound.end) { + inbound.end(); + } }; onClose = () => { this.close(); const inbound = this.getInbound(); - if (inbound) { + if (inbound && inbound.close) { inbound.close(); this.setInbound(null); } diff --git a/src/transports/ws.js b/src/transports/ws.js index f927a08..5eef32d 100644 --- a/src/transports/ws.js +++ b/src/transports/ws.js @@ -18,6 +18,7 @@ export class WsInbound extends Inbound { constructor(props) { super(props); this._socket = this._conn; + this._socket._socket.on('drain', this.onDrain); this._socket.on('message', this.onReceive); this._socket.on('error', this.onError); this._socket.on('close', this.onClose); @@ -45,6 +46,10 @@ export class WsInbound extends Inbound { this.emit('data', buffer); }; + onDrain = () => { + this.emit('drain'); + }; + onError = (err) => { logger.warn(`[${this.name}] [${this.remote}] ${err.message}`); this.emit('_error', err); @@ -100,6 +105,10 @@ export class WsOutbound extends Outbound { this.emit('data', buffer); }; + onDrain = () => { + this.emit('drain'); + }; + onError = (err) => { logger.warn(`[${this.name}] [${this.remote}] ${err.message}`); this.emit('_error', err); @@ -136,7 +145,10 @@ export class WsOutbound extends Outbound { handshakeTimeout: 1e4, // 10s perMessageDeflate: false, })); - this._socket.on('open', resolve); + this._socket.on('open', () => { + this._socket._socket.on('drain', this.onDrain); + resolve(); + }); this._socket.on('message', this.onReceive); this._socket.on('error', this.onError); this._socket.on('close', this.onClose);