From 3c3aa46a2efd1cd0dbd851f2bd165726fe4a902e Mon Sep 17 00:00:00 2001 From: Micooz Date: Mon, 14 Aug 2017 15:54:19 +0800 Subject: [PATCH] bin,core: support built-in cluster mode close: #88 --- bin/bootstrap.js | 36 ++++++++++++------------------------ bin/init.js | 6 ++++-- src/core/config.js | 18 ++++++++++++++++-- src/core/constants.js | 4 +++- src/core/hub.js | 17 +++++++++++------ 5 files changed, 46 insertions(+), 35 deletions(-) diff --git a/bin/bootstrap.js b/bin/bootstrap.js index 917c1c4..4b05fd8 100644 --- a/bin/bootstrap.js +++ b/bin/bootstrap.js @@ -1,3 +1,4 @@ +const cluster = require('cluster'); const fs = require('fs'); const path = require('path'); const chalk = require('chalk'); @@ -28,33 +29,20 @@ function obtainConfig(file) { return json; } -module.exports = function bootstrap(configPath, {Hub, Config, Balancer}) { +module.exports = function bootstrap(configPath, {Hub, Config}) { try { Config.init(obtainConfig(configPath)); - - if (__IS_WATCH__) { - fs.watchFile(configPath, function (curr, prev) { - if (curr.mtime > prev.mtime) { - console.log(`==> [bootstrap] ${path.basename(configPath)} has changed, reload`); - try { - Config.init(obtainConfig(configPath)); - if (__IS_CLIENT__) { - console.info('==> [balancer] restarted'); - Balancer.start(__SERVERS__); - } - console.info(JSON.stringify(__ALL_CONFIG__)); - } catch (err) { - console.error(err); - } - } - }); + if (cluster.isMaster) { + for (let i = 0; i < __WORKERS__; ++i) { + cluster.fork(); + } + console.log(`==> [bootstrap] started ${__WORKERS__} workers`); + } else { + const hub = new Hub(); + hub.on('close', () => process.exit(0)); + hub.run(); + process.on('SIGINT', () => hub.terminate()); } - - const app = new Hub(); - app.on('close', () => process.exit(0)); - app.run(); - - process.on('SIGINT', () => app.terminate()); } catch (err) { console.error(err); process.exit(-1); diff --git a/bin/init.js b/bin/init.js index fdcc6e8..6826b5e 100644 --- a/bin/init.js +++ b/bin/init.js @@ -1,4 +1,5 @@ const fs = require('fs'); +const os = require('os'); const crypto = require('crypto'); /** @@ -29,6 +30,7 @@ module.exports = function init() { const key = random('abcdefghjkmnpqrstuvwxyz23456789!@#$%^&*()_+<>?:|{}-=[];,./ABCDEFGHJKLMNPQRSTUVWXYZ', 16); const port = getRandomInt(1024, 65535); const timeout = getRandomInt(200, 1000); + const workers = os.cpus().length; const clientJson = { 'host': '127.0.0.1', @@ -57,7 +59,7 @@ module.exports = function init() { 'dns': [], 'dns_expire': 3600, 'timeout': timeout, - 'watch': false, + 'workers': workers, 'log_level': 'info' }; @@ -82,7 +84,7 @@ module.exports = function init() { 'dns_expire': 3600, 'redirect': '', 'timeout': timeout, - 'watch': false, + 'workers': workers, 'log_level': 'info' }; diff --git a/src/core/config.js b/src/core/config.js index a670cae..ec81117 100755 --- a/src/core/config.js +++ b/src/core/config.js @@ -1,8 +1,9 @@ import dns from 'dns'; import fs from 'fs'; +import os from 'os'; import net from 'net'; import {isValidPort} from '../utils'; -import {BLINKSOCKS_DIR, LOG_DIR, DEFAULT_LOG_LEVEL} from './constants'; +import {BLINKSOCKS_DIR, LOG_DIR, DEFAULT_LOG_LEVEL, DEFAULT_WORKERS} from './constants'; import {DNS_DEFAULT_EXPIRE} from './dns-cache'; /** @@ -93,6 +94,19 @@ export class Config { console.warn(`==> [config] 'timeout' is too short, is ${json.timeout}s expected?`); } + // workers + if (typeof json.workers !== 'undefined') { + if (typeof json.workers !== 'number') { + throw Error('\'workers\' must be a number'); + } + if (json.workers < 1) { + throw Error('\'workers\' must be greater than 0'); + } + if (json.workers > os.cpus().length) { + console.warn(`==> [config] 'workers' is greater than the number of cpus, is ${json.workers} workers expected?`); + } + } + // dns_expire if (typeof json.dns_expire !== 'undefined') { if (typeof json.dns_expire !== 'number') { @@ -188,7 +202,7 @@ export class Config { global.__IS_SERVER__ = !global.__IS_CLIENT__; global.__REDIRECT__ = json.redirect; global.__TIMEOUT__ = json.timeout * 1e3; - global.__IS_WATCH__ = !!json.watch; + global.__WORKERS__ = json.workers || DEFAULT_WORKERS; global.__LOG_LEVEL__ = json.log_level || DEFAULT_LOG_LEVEL; global.__DNS_EXPIRE__ = (json.dns_expire !== undefined) ? json.dns_expire * 1e3 : DNS_DEFAULT_EXPIRE; global.__ALL_CONFIG__ = json; diff --git a/src/core/constants.js b/src/core/constants.js index e2ca751..86d3565 100644 --- a/src/core/constants.js +++ b/src/core/constants.js @@ -15,6 +15,7 @@ const LOG_FILE_PATH = path.join(LOG_DIR, ); const LOG_FILE_MAX_SIZE = 2 * 1024 * 1024; // 2MB const DEFAULT_LOG_LEVEL = 'info'; +const DEFAULT_WORKERS = os.cpus().length; module.exports = { HOME_DIR, @@ -22,5 +23,6 @@ module.exports = { LOG_DIR, LOG_FILE_PATH, LOG_FILE_MAX_SIZE, - DEFAULT_LOG_LEVEL + DEFAULT_LOG_LEVEL, + DEFAULT_WORKERS }; diff --git a/src/core/hub.js b/src/core/hub.js index 4c9d70d..75028e8 100755 --- a/src/core/hub.js +++ b/src/core/hub.js @@ -1,3 +1,4 @@ +import cluster from 'cluster'; import EventEmitter from 'events'; import net from 'net'; import logger from './logger'; @@ -26,6 +27,8 @@ const nextId = (function () { */ export class Hub extends EventEmitter { + _isFirstWorker = cluster.worker.id <= 1; + _hub = null; // instance of class net.Server _sockets = []; // instances of our class Socket @@ -46,10 +49,10 @@ export class Hub extends EventEmitter { onClose() { if (!this._isClosed) { - logger.info('==> [hub] shutdown'); + this._isFirstWorker && logger.info('==> [hub] shutdown'); if (__IS_CLIENT__) { Balancer.destroy(); - logger.info('==> [balancer] stopped'); + this._isFirstWorker && logger.info('==> [balancer] stopped'); } this._isClosed = true; this._sockets.forEach((socket) => socket.destroy()); @@ -80,11 +83,13 @@ export class Hub extends EventEmitter { port: __LOCAL_PORT__ }; this._hub.listen(options, () => { - logger.info(`==> [hub] use configuration: ${JSON.stringify(__ALL_CONFIG__)}`); - logger.info(`==> [hub] running as: ${__IS_SERVER__ ? 'Server' : 'Client'}`); - logger.info(`==> [hub] listening on: ${JSON.stringify(this._hub.address())}`); + if (this._isFirstWorker) { + logger.info(`==> [hub] use configuration: ${JSON.stringify(__ALL_CONFIG__)}`); + logger.info(`==> [hub] running as: ${__IS_SERVER__ ? 'Server' : 'Client'}`); + logger.info(`==> [hub] listening on: ${JSON.stringify(this._hub.address())}`); + } if (__IS_CLIENT__) { - logger.info('==> [balancer] started'); + this._isFirstWorker && logger.info('==> [balancer] started'); Balancer.start(__SERVERS__); } if (typeof callback === 'function') {