Merge branch 'feature-multi-server'

This commit is contained in:
Micooz 2017-03-10 09:56:35 +08:00
commit 337f2afa42
6 changed files with 158 additions and 92 deletions

14
bin/bootstrap.js vendored

@ -8,8 +8,7 @@ const options = [
['-c, --config [file]', 'a json format file for configuration, if specified, other options are ignored', ''],
['--host <host>', 'an ip address or a hostname to bind', 'localhost'],
['--port <port>', 'where to listen on', 1080],
['--server-host [server-host]', 'an ip address or hostname to connect to'],
['--server-port [server-port]', 'which port the server listen on'],
['--servers [servers]', 'a list of servers, split by comma', (value) => value.split(',')],
['--key <key>', 'a key for encryption and decryption'],
['--frame [frame]', 'a preset used in frame middleware, default: \'origin\'', 'origin'],
['--frame-params [crypto-params]', 'parameters for frame preset, default: \'\'', ''],
@ -54,7 +53,7 @@ function obtainConfig(options) {
}
return config;
}
const {host, port, serverHost, serverPort, key} = options;
const {host, port, servers, key} = options;
const {frame, frameParams, crypto, cryptoParams, protocol, protocolParams, obfs, obfsParams} = options;
const {logLevel, quiet} = options;
const config = {
@ -71,11 +70,8 @@ function obtainConfig(options) {
obfs_params: obfsParams,
log_level: typeof quiet === 'undefined' ? logLevel : 'error'
};
if (serverHost) {
Object.assign(config, {
server_host: serverHost,
server_port: parseInt(serverPort, 10)
});
if (servers) {
Object.assign(config, {servers});
}
return config;
}
@ -103,7 +99,7 @@ module.exports = function ({Hub}) {
try {
const app = new Hub(config);
app.run();
process.on('SIGINT', () => process.exit(0));
process.on('SIGINT', () => app.onClose());
} catch (err) {
console.error(err.message);
process.exit(-1);

@ -33,12 +33,16 @@ describe('Config#init', function () {
expect(() => Config.init({host: 'localhost', port: -1})).toThrow();
});
it('should throw when server_host(if provided) is empty', function () {
expect(() => Config.init({host: 'localhost', port: 1080, server_host: ''})).toThrow();
it('should throw when servers(if provided) is not an array', function () {
expect(() => Config.init({host: 'localhost', port: 1080, servers: ''})).toThrow();
});
it('should throw when server_port(if provided) is not natural number', function () {
expect(() => Config.init({host: 'localhost', port: 1080, server_host: 'localhost', server_port: 1.1})).toThrow();
it('should throw when servers(if provided) is an array but empty', function () {
expect(() => Config.init({host: 'localhost', port: 1080, servers: []})).toThrow();
});
it('should throw when servers(if provided) is an array but has invalid items', function () {
expect(() => Config.init({host: 'localhost', port: 1080, servers: ['']})).toThrow();
});
it('should throw when key is not string', function () {
@ -46,8 +50,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: null
});
}).toThrow();
@ -58,8 +61,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: '',
});
}).toThrow();
@ -70,8 +72,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: DEFAULT_KEY
});
}).toThrow();
@ -82,8 +83,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: '123',
frame: null
});
@ -95,8 +95,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: '123',
frame: 'xxx',
frame_params: [1, 2]
@ -109,8 +108,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: '123',
frame: 'xxx',
frame_params: '',
@ -124,8 +122,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: '123',
frame: 'xxx',
frame_params: '',
@ -140,8 +137,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: '123',
frame: 'xxx',
frame_params: '',
@ -157,8 +153,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: '123',
frame: 'xxx',
frame_params: '',
@ -175,8 +170,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: '123',
frame: 'xxx',
frame_params: '',
@ -194,8 +188,7 @@ describe('Config#init', function () {
Config.init({
host: 'localhost',
port: 1080,
server_host: 'localhost',
server_port: 1080,
servers: ['abc.com:443'],
key: '123',
frame: 'xxx',
frame_params: '',

72
src/core/balancer.js Normal file

@ -0,0 +1,72 @@
import net from 'net';
const QUERY_INTERVAL = 6e4; // 1min
const now = () => (new Date()).getTime();
export class Balancer {
static _servers = [];
static _pings = [];
static _timer = null;
// public
/**
* initialize the balancer with server list and query interval
* @param servers
* @param interval
*/
static init(servers, interval = QUERY_INTERVAL) {
this._servers = servers;
this._pings = this._servers.map(() => 0);
this._startQuery(interval);
}
/**
* stop querying
*/
static destroy() {
this._stopQuery();
}
/**
* returns the fastest one of the servers
* @returns {{host, port}}
*/
static getFastest() {
let index = 0;
const pings = this._pings;
for (let i = 0; i < pings.length; ++i) {
const ping = pings[i];
if ((ping > 0 && ping < pings[index]) || pings[index] <= 0) {
index = i;
}
}
return this._servers[index];
}
// private
static _startQuery(interval) {
if (this._servers.length > 1) {
this._timer = setInterval(() => this._servers.map(
(server, i) => {
const startTime = now();
const socket = net.connect(server, () => {
this._pings[i] = now() - startTime;
socket.end();
});
socket.on('error', () => this._pings[i] = -1);
}
), interval);
}
}
static _stopQuery() {
clearInterval(this._timer);
}
}

@ -10,9 +10,7 @@ export class Config {
static port;
static server_host;
static server_port;
static servers;
static key;
@ -61,24 +59,29 @@ export class Config {
this.port = json.port;
// server_host & server_port
// servers
if (typeof json.server_host === 'string') {
if (json.server_host === '') {
throw Error('\'server_host\' must not be empty');
if (typeof json.servers !== 'undefined') {
if (!Array.isArray(json.servers)) {
throw Error('\'servers\' must be an array');
}
if (!Number.isSafeInteger(json.server_port) || json.server_port <= 0) {
throw Error('\'server_port\' must be a natural number');
const servers = json.servers
.map((item) => item.split(':'))
.filter((pair) => pair.length === 2 && Number.isSafeInteger(+pair[1]))
.map(([host, port]) => ({host, port}));
if (servers.length < 1) {
throw Error('\'server_host\' must contain at least one valid item');
}
this.servers = servers;
this._is_server = false;
} else {
this._is_server = true;
}
this.server_host = json.server_host;
this.server_port = json.server_port;
// key
if (typeof json.key !== 'string') {
@ -164,8 +167,7 @@ export class Config {
global.__LOCAL_HOST__ = this.host;
global.__LOCAL_PORT__ = this.port;
global.__SERVER_HOST__ = this.server_host;
global.__SERVER_PORT__ = this.server_port;
global.__SERVERS__ = this.servers;
global.__KEY__ = this.key;

@ -2,6 +2,7 @@ import net from 'net';
import logger from 'winston';
import {Config} from './config';
import {Socket} from './socket';
import {Balancer} from './balancer';
const nextId = (function () {
let i = 0;
@ -32,6 +33,8 @@ export class Hub {
onClose() {
logger.info('hub shutdown');
logger.info('stopping balancer');
Balancer.destroy();
process.exit(0);
}
@ -47,9 +50,14 @@ export class Hub {
port: __LOCAL_PORT__
};
this._hub.listen(options, () => {
console.info('==> use configuration:', JSON.stringify(Config.abstract()));
console.info('==> use configuration:');
console.info(Config.abstract());
console.info(`==> blinksocks is running as: ${__IS_SERVER__ ? 'Server' : 'Client'}`);
console.info('==> opened hub on:', this._hub.address());
console.info('==> blinksocks is listening on:', this._hub.address());
if (__IS_CLIENT__) {
console.info('==> starting balancer');
Balancer.init(__SERVERS__);
}
});
}

@ -1,6 +1,7 @@
import net from 'net';
import logger from 'winston';
import {DNSCache} from './dns-cache';
import {Balancer} from './balancer';
import {Pipe} from './pipe';
import {
MIDDLEWARE_DIRECTION_UPWARD,
@ -49,6 +50,8 @@ import {
const dnsCache = DNSCache.create();
let lastEndPoint = null;
export class Socket {
_id = null;
@ -133,6 +136,7 @@ export class Socket {
onError(err) {
switch (err.code) {
case 'ECONNREFUSED':
// TODO: maybe we can switch to another server
case 'EADDRNOTAVAIL':
case 'ENETDOWN':
case 'ECONNRESET':
@ -168,6 +172,26 @@ export class Socket {
}
}
/**
* connect to a server, for both client and server
* @param host
* @param port
* @param callback
* @returns {Promise.<void>}
*/
async connectTo({host, port}, callback) {
logger.info(`[${this._id}] connecting to:`, {host, port});
try {
const ip = await dnsCache.get(host);
this._fsocket = net.connect({host: ip, port}, callback);
this._fsocket.on('error', this.onError);
this._fsocket.on('close', this.onClose);
this._fsocket.on('data', this.onBackward);
} catch (err) {
logger.error(err.message);
}
}
/**
* create pipes for both data forward and backward
*/
@ -175,7 +199,8 @@ export class Socket {
const pipeProps = {
onNotified: (action) => {
if (__IS_SERVER__ && action.type === SOCKET_CONNECT_TO_DST) {
return this.connectToDst(...action.payload);
const [addr, callback] = action.payload;
return this.connectTo(addr, callback);
}
if (action.type === PROCESSING_FAILED) {
const message = action.payload;
@ -196,45 +221,6 @@ export class Socket {
this._pipe.on(`next_${MIDDLEWARE_DIRECTION_DOWNWARD}`, (buf) => this.send(buf, __IS_SERVER__));
}
/**
* connect to blinksocks server
* @returns {Promise}
*/
async connectToServer(addr, callback) {
const ep = {
host: __SERVER_HOST__, // TODO: use dnsCache
port: __SERVER_PORT__
};
logger.info(`[${this._id}] connecting to:`, ep);
this._fsocket = net.connect(ep, () => {
this.createPipe(addr);
callback();
});
this._fsocket.on('error', this.onError);
this._fsocket.on('close', this.onClose);
this._fsocket.on('data', this.onBackward);
}
/**
* connect to the real server, server side only
* @param addr
* @param callback
* @returns {Promise.<void>}
*/
async connectToDst(addr, callback) {
const {host, port} = addr;
try {
const ip = await dnsCache.get(host);
logger.info(`[${this._id}] connecting to ${host}(${ip}:${port})`);
this._fsocket = net.connect({host: ip, port}, callback);
this._fsocket.on('error', this.onError);
this._fsocket.on('close', this.onClose);
this._fsocket.on('data', this.onBackward);
} catch (err) {
logger.error(err.message);
}
}
/*** client handshake, multiple protocols ***/
onHandshake(buffer) {
@ -245,7 +231,16 @@ export class Socket {
}
onHandshakeDone(addr, callback) {
return this.connectToServer(addr, callback);
const ep = Balancer.getFastest();
if (lastEndPoint !== null &&
(ep.host !== lastEndPoint.host || ep.port !== lastEndPoint.port)) {
logger.info('balancer switch to use:', ep);
}
lastEndPoint = ep;
return this.connectTo(ep, () => {
this.createPipe(addr);
callback();
});
}
trySocksHandshake(socket, buffer) {