core/hub: add getConnections(), getTotalRead(), getTotalWritten() and getPerformance()

This commit is contained in:
Micooz 2018-03-11 19:09:10 +08:00
parent 627325503c
commit 05b5e120f9
2 changed files with 42 additions and 0 deletions

@ -8,6 +8,7 @@ import uniqueId from 'lodash.uniqueid';
import { Config } from './config';
import { Relay } from './relay';
import { MuxRelay } from './mux-relay';
import { Performance } from './performance';
import { dumpHex, getRandomInt, hash, logger } from '../utils';
import { http, socks, tcp } from '../proxies';
import { APP_ID } from '../constants';
@ -16,6 +17,8 @@ export class Hub {
_config = null;
_performance = null;
_tcpServer = null;
_udpServer = null;
@ -26,8 +29,13 @@ export class Hub {
_udpRelays = null; // LRU cache
_totalRead = 0;
_totalWritten = 0;
constructor(config) {
this._config = new Config(config);
this._performance = new Performance(this);
this._udpRelays = LRU({ max: 500, maxAge: 1e5, dispose: (_, relay) => relay.destroy() });
}
@ -65,6 +73,34 @@ export class Hub {
logger.info('[hub] shutdown');
}
async getConnections() {
return new Promise((resolve, reject) => {
if (this._tcpServer) {
this._tcpServer.getConnections((err, count) => {
if (err) {
reject(err);
} else {
resolve(count);
}
});
} else {
resolve(0);
}
});
}
getTotalRead() {
return this._totalRead;
}
getTotalWritten() {
return this._totalWritten;
}
getPerformance() {
return this._performance;
}
async _createServer() {
const { is_client, is_server, local_protocol } = this._config;
if (is_client) {
@ -260,6 +296,8 @@ export class Hub {
}
relay.init({ proxyRequest });
relay.on('_read', (size) => this._totalRead += size);
relay.on('_write', (size) => this._totalWritten += size);
relay.on('close', () => this._tcpRelays.delete(relay.id));
this._tcpRelays.set(relay.id, relay);
@ -272,6 +310,8 @@ export class Hub {
// create a mux relay if needed
if (muxRelay === null) {
muxRelay = this._createRelay(context, true);
muxRelay.on('_read', (size) => this._totalRead += size);
muxRelay.on('_write', (size) => this._totalWritten += size);
muxRelay.on('close', () => this._muxRelays.delete(muxRelay.id));
this._muxRelays.set(muxRelay.id, muxRelay);
logger.info(`[mux-${muxRelay.id}] create mux connection, total: ${this._muxRelays.size}`);

@ -230,6 +230,7 @@ export class Relay extends EventEmitter {
}
}
cb(buffer);
setImmediate(() => this.emit('_read', buffer.length));
};
onEncoded = (buffer) => {
@ -244,6 +245,7 @@ export class Relay extends EventEmitter {
}
this._inbound.write(buffer);
}
setImmediate(() => this.emit('_write', buffer.length));
};
onDecoded = (buffer) => {