bin,core: support built-in cluster mode

close: #88
This commit is contained in:
Micooz 2017-08-14 15:54:19 +08:00
parent 78fc22264f
commit 3c3aa46a2e
5 changed files with 46 additions and 35 deletions

36
bin/bootstrap.js vendored

@ -1,3 +1,4 @@
const cluster = require('cluster');
const fs = require('fs');
const path = require('path');
const chalk = require('chalk');
@ -28,33 +29,20 @@ function obtainConfig(file) {
return json;
}
module.exports = function bootstrap(configPath, {Hub, Config, Balancer}) {
module.exports = function bootstrap(configPath, {Hub, Config}) {
try {
Config.init(obtainConfig(configPath));
if (__IS_WATCH__) {
fs.watchFile(configPath, function (curr, prev) {
if (curr.mtime > prev.mtime) {
console.log(`==> [bootstrap] ${path.basename(configPath)} has changed, reload`);
try {
Config.init(obtainConfig(configPath));
if (__IS_CLIENT__) {
console.info('==> [balancer] restarted');
Balancer.start(__SERVERS__);
}
console.info(JSON.stringify(__ALL_CONFIG__));
} catch (err) {
console.error(err);
}
}
});
if (cluster.isMaster) {
for (let i = 0; i < __WORKERS__; ++i) {
cluster.fork();
}
console.log(`==> [bootstrap] started ${__WORKERS__} workers`);
} else {
const hub = new Hub();
hub.on('close', () => process.exit(0));
hub.run();
process.on('SIGINT', () => hub.terminate());
}
const app = new Hub();
app.on('close', () => process.exit(0));
app.run();
process.on('SIGINT', () => app.terminate());
} catch (err) {
console.error(err);
process.exit(-1);

@ -1,4 +1,5 @@
const fs = require('fs');
const os = require('os');
const crypto = require('crypto');
/**
@ -29,6 +30,7 @@ module.exports = function init() {
const key = random('abcdefghjkmnpqrstuvwxyz23456789!@#$%^&*()_+<>?:|{}-=[];,./ABCDEFGHJKLMNPQRSTUVWXYZ', 16);
const port = getRandomInt(1024, 65535);
const timeout = getRandomInt(200, 1000);
const workers = os.cpus().length;
const clientJson = {
'host': '127.0.0.1',
@ -57,7 +59,7 @@ module.exports = function init() {
'dns': [],
'dns_expire': 3600,
'timeout': timeout,
'watch': false,
'workers': workers,
'log_level': 'info'
};
@ -82,7 +84,7 @@ module.exports = function init() {
'dns_expire': 3600,
'redirect': '',
'timeout': timeout,
'watch': false,
'workers': workers,
'log_level': 'info'
};

@ -1,8 +1,9 @@
import dns from 'dns';
import fs from 'fs';
import os from 'os';
import net from 'net';
import {isValidPort} from '../utils';
import {BLINKSOCKS_DIR, LOG_DIR, DEFAULT_LOG_LEVEL} from './constants';
import {BLINKSOCKS_DIR, LOG_DIR, DEFAULT_LOG_LEVEL, DEFAULT_WORKERS} from './constants';
import {DNS_DEFAULT_EXPIRE} from './dns-cache';
/**
@ -93,6 +94,19 @@ export class Config {
console.warn(`==> [config] 'timeout' is too short, is ${json.timeout}s expected?`);
}
// workers
if (typeof json.workers !== 'undefined') {
if (typeof json.workers !== 'number') {
throw Error('\'workers\' must be a number');
}
if (json.workers < 1) {
throw Error('\'workers\' must be greater than 0');
}
if (json.workers > os.cpus().length) {
console.warn(`==> [config] 'workers' is greater than the number of cpus, is ${json.workers} workers expected?`);
}
}
// dns_expire
if (typeof json.dns_expire !== 'undefined') {
if (typeof json.dns_expire !== 'number') {
@ -188,7 +202,7 @@ export class Config {
global.__IS_SERVER__ = !global.__IS_CLIENT__;
global.__REDIRECT__ = json.redirect;
global.__TIMEOUT__ = json.timeout * 1e3;
global.__IS_WATCH__ = !!json.watch;
global.__WORKERS__ = json.workers || DEFAULT_WORKERS;
global.__LOG_LEVEL__ = json.log_level || DEFAULT_LOG_LEVEL;
global.__DNS_EXPIRE__ = (json.dns_expire !== undefined) ? json.dns_expire * 1e3 : DNS_DEFAULT_EXPIRE;
global.__ALL_CONFIG__ = json;

@ -15,6 +15,7 @@ const LOG_FILE_PATH = path.join(LOG_DIR,
);
const LOG_FILE_MAX_SIZE = 2 * 1024 * 1024; // 2MB
const DEFAULT_LOG_LEVEL = 'info';
const DEFAULT_WORKERS = os.cpus().length;
module.exports = {
HOME_DIR,
@ -22,5 +23,6 @@ module.exports = {
LOG_DIR,
LOG_FILE_PATH,
LOG_FILE_MAX_SIZE,
DEFAULT_LOG_LEVEL
DEFAULT_LOG_LEVEL,
DEFAULT_WORKERS
};

@ -1,3 +1,4 @@
import cluster from 'cluster';
import EventEmitter from 'events';
import net from 'net';
import logger from './logger';
@ -26,6 +27,8 @@ const nextId = (function () {
*/
export class Hub extends EventEmitter {
_isFirstWorker = cluster.worker.id <= 1;
_hub = null; // instance of class net.Server
_sockets = []; // instances of our class Socket
@ -46,10 +49,10 @@ export class Hub extends EventEmitter {
onClose() {
if (!this._isClosed) {
logger.info('==> [hub] shutdown');
this._isFirstWorker && logger.info('==> [hub] shutdown');
if (__IS_CLIENT__) {
Balancer.destroy();
logger.info('==> [balancer] stopped');
this._isFirstWorker && logger.info('==> [balancer] stopped');
}
this._isClosed = true;
this._sockets.forEach((socket) => socket.destroy());
@ -80,11 +83,13 @@ export class Hub extends EventEmitter {
port: __LOCAL_PORT__
};
this._hub.listen(options, () => {
logger.info(`==> [hub] use configuration: ${JSON.stringify(__ALL_CONFIG__)}`);
logger.info(`==> [hub] running as: ${__IS_SERVER__ ? 'Server' : 'Client'}`);
logger.info(`==> [hub] listening on: ${JSON.stringify(this._hub.address())}`);
if (this._isFirstWorker) {
logger.info(`==> [hub] use configuration: ${JSON.stringify(__ALL_CONFIG__)}`);
logger.info(`==> [hub] running as: ${__IS_SERVER__ ? 'Server' : 'Client'}`);
logger.info(`==> [hub] listening on: ${JSON.stringify(this._hub.address())}`);
}
if (__IS_CLIENT__) {
logger.info('==> [balancer] started');
this._isFirstWorker && logger.info('==> [balancer] started');
Balancer.start(__SERVERS__);
}
if (typeof callback === 'function') {