src: reduce memory usage for multiplexing

This commit is contained in:
Micooz 2018-07-14 17:13:20 +08:00
parent ae0e0ebcbe
commit 7be5bbe780
4 changed files with 69 additions and 33 deletions

@ -15,6 +15,34 @@ import {
WssInbound, WssOutbound,
} from '../transports';
class MuxInbound extends EventEmitter {
constructor(inbound) {
super();
this._inbound = inbound;
this._inbound.on('drain', () => this.emit('drain'));
}
get bufferSize() {
return this._inbound.bufferSize;
}
}
class MuxOutbound extends EventEmitter {
constructor(outbound) {
super();
this._outbound = outbound;
this._outbound.on('drain', () => this.emit('drain'));
}
get bufferSize() {
return this._outbound.bufferSize;
}
}
// .on('_connect')
// .on('_read')
// .on('_write')
@ -82,6 +110,7 @@ export class MuxRelay extends EventEmitter {
const { Inbound } = this._getBounds(transport);
const inbound = new Inbound({ config, source, conn });
inbound.setOutbound(new MuxOutbound(outbound));
inbound.on('_error', (err) => this.emit('_error', err));
inbound.on('data', (buffer) => {
if (!isFirstFrameOut) {
@ -189,6 +218,7 @@ export class MuxRelay extends EventEmitter {
// outbound
const outbound = new Outbound({ config, source });
outbound.setInbound(new MuxInbound(inbound));
outbound.on('_error', (err) => this.emit('_error', err));
outbound.on('data', (buffer) => {
pipe.feed(PIPE_ENCODE, buffer, { cid });
@ -228,7 +258,7 @@ export class MuxRelay extends EventEmitter {
inbound.__tracker.trace(PIPE_DECODE, data.length);
setImmediate(() => this.emit('_read', data.length));
} else {
logger.error(`[mux-relay] couldn't delivery data frame to cid=${cid}, dump=${dumpHex(data)}`);
logger.debug(`[mux-relay] couldn't delivery data frame to cid=${cid}, dump=${dumpHex(data)}`);
}
} else {
const outbound = this._outbounds.get(cid);

@ -14,6 +14,7 @@ export class Http2Inbound extends Inbound {
super(props);
this._stream = this._conn;
this._stream.on('data', this.onReceive);
this._stream.on('drain', this.onDrain);
this._stream.on('error', this.onError);
this._stream.on('close', this.onClose);
this._stream.on('timeout', this.onTimeout);
@ -47,6 +48,10 @@ export class Http2Inbound extends Inbound {
this.emit('data', buffer);
};
onDrain = () => {
this.emit('drain');
};
onTimeout = () => {
logger.warn(`[${this.name}] [${this.remote}] timeout: no I/O on the connection for ${this._config.timeout / 1e3}s`);
this.onClose();
@ -107,6 +112,10 @@ export class Http2Outbound extends Outbound {
this.emit('data', buffer);
};
onDrain = () => {
this.emit('drain');
};
onTimeout = () => {
logger.warn(`[${this.name}] [${this.remote}] timeout: no I/O on the connection for ${this._config.timeout / 1e3}s`);
this.onClose();
@ -153,6 +162,7 @@ export class Http2Outbound extends Outbound {
});
this._stream.on('error', this.onError);
this._stream.on('data', this.onReceive);
this._stream.on('drain', this.onDrain);
this._stream.on('timeout', this.onTimeout);
this._stream.on('close', this.onClose);
this._stream.setTimeout(this._config.timeout);

@ -1,6 +1,6 @@
import net from 'net';
import { Inbound, Outbound } from './defs';
import { DNSCache, SpeedTester, logger } from '../utils';
import { DNSCache, logger } from '../utils';
import {
ACL_PAUSE_RECV,
ACL_PAUSE_SEND,
@ -14,8 +14,6 @@ export class TcpInbound extends Inbound {
_socket = null;
_speeder = new SpeedTester();
_destroyed = false;
constructor(props) {
@ -57,25 +55,17 @@ export class TcpInbound extends Inbound {
onReceive = (buffer) => {
this.emit('data', buffer);
this._speeder.feed(buffer.length);
// throttle receiving data to reduce memory grow:
// https://github.com/blinksocks/blinksocks/issues/60
// https://nodejs.org/dist/latest/docs/api/net.html#net_socket_buffersize
const outbound = this.getOutbound();
if (outbound && outbound.bufferSize > MAX_BUFFERED_SIZE) {
const delta = outbound.bufferSize - (MAX_BUFFERED_SIZE >> 1);
// this speed is almost equivalent to outbound::write() speed
const speed = this._speeder.getSpeed();
if (speed < 1) {
return;
}
const sec = (delta / speed).toFixed(3);
logger.debug(`[${this.name}] [${this.remote}] recv paused for ${sec}s due to inbound.bufferSize=${outbound.bufferSize} >= ${MAX_BUFFERED_SIZE}`);
logger.debug(`[${this.name}] [${this.remote}] recv paused due to inbound.bufferSize=${outbound.bufferSize} >= ${MAX_BUFFERED_SIZE}`);
this.pause();
setTimeout(() => {
outbound.once('drain', () => {
logger.debug(`[${this.name}] [${this.remote}] resume to recv`);
this.resume();
}, sec * 1e3);
});
}
};
@ -90,13 +80,15 @@ export class TcpInbound extends Inbound {
onHalfClose = () => {
const outbound = this.getOutbound();
outbound && outbound.end && outbound.end();
if (outbound && outbound.end) {
outbound.end();
}
};
onClose = () => {
this.close();
const outbound = this.getOutbound();
if (outbound) {
if (outbound && outbound.close) {
outbound.close();
this.setOutbound(null);
}
@ -155,8 +147,6 @@ export class TcpOutbound extends Outbound {
_socket = null;
_speeder = new SpeedTester();
_destroyed = false;
get name() {
@ -184,25 +174,17 @@ export class TcpOutbound extends Outbound {
onReceive = (buffer) => {
this.emit('data', buffer);
this._speeder.feed(buffer.length);
// throttle receiving data to reduce memory grow:
// https://github.com/blinksocks/blinksocks/issues/60
// https://nodejs.org/dist/latest/docs/api/net.html#net_socket_buffersize
const inbound = this.getInbound();
if (inbound && inbound.bufferSize > MAX_BUFFERED_SIZE) {
const delta = inbound.bufferSize - (MAX_BUFFERED_SIZE >> 1);
// this speed is almost equivalent to inbound::write() speed
const speed = this._speeder.getSpeed();
if (speed < 1) {
return;
}
const sec = (delta / speed).toFixed(3);
logger.debug(`[${this.name}] [${this.remote}] recv paused for ${sec}s due to inbound.bufferSize=${inbound.bufferSize} >= ${MAX_BUFFERED_SIZE}`);
logger.debug(`[${this.name}] [${this.remote}] recv paused due to inbound.bufferSize=${inbound.bufferSize} >= ${MAX_BUFFERED_SIZE}`);
this.pause();
setTimeout(() => {
inbound.once('drain', () => {
logger.debug(`[${this.name}] [${this.remote}] resume to recv`);
this.resume();
}, sec * 1e3);
});
}
};
@ -217,13 +199,15 @@ export class TcpOutbound extends Outbound {
onHalfClose = () => {
const inbound = this.getInbound();
inbound && inbound.end && inbound.end();
if (inbound && inbound.end) {
inbound.end();
}
};
onClose = () => {
this.close();
const inbound = this.getInbound();
if (inbound) {
if (inbound && inbound.close) {
inbound.close();
this.setInbound(null);
}

@ -18,6 +18,7 @@ export class WsInbound extends Inbound {
constructor(props) {
super(props);
this._socket = this._conn;
this._socket._socket.on('drain', this.onDrain);
this._socket.on('message', this.onReceive);
this._socket.on('error', this.onError);
this._socket.on('close', this.onClose);
@ -45,6 +46,10 @@ export class WsInbound extends Inbound {
this.emit('data', buffer);
};
onDrain = () => {
this.emit('drain');
};
onError = (err) => {
logger.warn(`[${this.name}] [${this.remote}] ${err.message}`);
this.emit('_error', err);
@ -100,6 +105,10 @@ export class WsOutbound extends Outbound {
this.emit('data', buffer);
};
onDrain = () => {
this.emit('drain');
};
onError = (err) => {
logger.warn(`[${this.name}] [${this.remote}] ${err.message}`);
this.emit('_error', err);
@ -136,7 +145,10 @@ export class WsOutbound extends Outbound {
handshakeTimeout: 1e4, // 10s
perMessageDeflate: false,
}));
this._socket.on('open', resolve);
this._socket.on('open', () => {
this._socket._socket.on('drain', this.onDrain);
resolve();
});
this._socket.on('message', this.onReceive);
this._socket.on('error', this.onError);
this._socket.on('close', this.onClose);