blinksocks/lib/transports/websocket.js
2017-10-21 12:00:20 +08:00

1 line
7.6 KiB
JavaScript

'use strict';Object.defineProperty(exports,'__esModule',{value:true});exports.WsOutbound=exports.WsInbound=undefined;var _slicedToArray=function(){function sliceIterator(arr,i){var _arr=[];var _n=true;var _d=false;var _e=undefined;try{for(var _i=arr[Symbol.iterator](),_s;!(_n=(_s=_i.next()).done);_n=true){_arr.push(_s.value);if(i&&_arr.length===i)break}}catch(err){_d=true;_e=err}finally{try{if(!_n&&_i['return'])_i['return']()}finally{if(_d)throw _e}}return _arr}return function(arr,i){if(Array.isArray(arr)){return arr}else if(Symbol.iterator in Object(arr)){return sliceIterator(arr,i)}else{throw new TypeError('Invalid attempt to destructure non-iterable instance')}}}();var _ws=require('ws');var _ws2=_interopRequireDefault(_ws);var _defs=require('./defs');var _core=require('../core');var _utils=require('../utils');var _defs2=require('../presets/defs');function _interopRequireDefault(obj){return obj&&obj.__esModule?obj:{default:obj}}function _asyncToGenerator(fn){return function(){var gen=fn.apply(this,arguments);return new Promise(function(resolve,reject){function step(key,arg){try{var info=gen[key](arg);var value=info.value}catch(error){reject(error);return}if(info.done){resolve(value)}else{return Promise.resolve(value).then(function(value){step('next',value)},function(err){step('throw',err)})}}return step('next')})}}const MAX_BUFFERED_SIZE=512*1024;class WsInbound extends _defs.Inbound{constructor(props){super(props);this._ws=null;this._isConnectedToRemote=false;const context=props.context;this.destroy=this.destroy.bind(this);this.onError=this.onError.bind(this);this.onReceive=this.onReceive.bind(this);this._ws=context;this._ws.on('message',this.onReceive);this._ws.on('error',this.onError);this._ws.on('close',this.destroy)}onError(err){_utils.logger.warn(`[ws:inbound] [${this.remote}] ${err.code||''} - ${err.message}`)}onReceive(buffer){if(this._outbound.writable||!this._isConnectedToRemote){const direction=__IS_CLIENT__?_core.PIPE_ENCODE:_core.PIPE_DECODE;this._pipe.feed(direction,buffer)}if(this._outbound&&this._outbound.bufferSize>=MAX_BUFFERED_SIZE){_utils.logger.debug(`[ws:inbound] recv paused due to outbound.bufferSize=${this._outbound.bufferSize} > ${MAX_BUFFERED_SIZE}`);this._ws.pause();this._outbound.once('drain',()=>{if(this._ws){_utils.logger.debug('[ws:inbound] resume to recv');this._ws.resume()}})}}get bufferSize(){return this._ws?this._ws.bufferedAmount:0}get writable(){return this._ws&&this._ws.readyState===_ws2.default.OPEN}onBroadcast(action){switch(action.type){case _defs2.CONNECT_TO_REMOTE:this._ws&&this._ws.pause();break;case _defs2.CONNECTED_TO_REMOTE:this._ws&&this._ws.resume();this._isConnectedToRemote=true;break;case _defs2.PRESET_FAILED:this.onPresetFailed(action);break;case _defs2.PRESET_CLOSE_CONNECTION:this.onPresetCloseConnection();break;case _defs2.PRESET_PAUSE_RECV:this.onPresetPauseRecv();break;case _defs2.PRESET_RESUME_RECV:this.onPresetResumeRecv();break;default:break;}}onPresetFailed(action){var _this=this;return _asyncToGenerator(function*(){var _action$payload=action.payload;const name=_action$payload.name,message=_action$payload.message;_utils.logger.error(`[ws:inbound] [${_this.remote}] preset "${name}" fail to process: ${message}`);if(__IS_CLIENT__){_utils.logger.warn(`[ws:inbound] [${_this.remote}] connection closed`);_this.destroy()}if(__IS_SERVER__){if(__REDIRECT__){const orgData=action.payload.orgData;var _REDIRECT__$split=__REDIRECT__.split(':'),_REDIRECT__$split2=_slicedToArray(_REDIRECT__$split,2);const host=_REDIRECT__$split2[0],port=_REDIRECT__$split2[1];_utils.logger.warn(`[ws:inbound] [${_this.remote}] connection is redirecting to: ${host}:${port}`);_this.setPresets(function(){return[{name:'tracker'}]});yield _this._outbound.connect({host,port:+port});if(_this._outbound.writable){_this._outbound.write(orgData)}}else{_this._ws&&_this._ws.pause();const timeout=(0,_utils.getRandomInt)(10,40);_utils.logger.warn(`[ws:inbound] [${_this.remote}] connection will be closed in ${timeout}s...`);setTimeout(_this.destroy,timeout*1e3)}}})()}onPresetCloseConnection(){_utils.logger.info(`[ws:inbound] [${this.remote}] preset request to close connection`);this.destroy()}onPresetPauseRecv(){__IS_SERVER__&&this._ws&&this._ws.pause()}onPresetResumeRecv(){__IS_SERVER__&&this._ws&&this._ws.resume()}write(buffer){if(this.writable){this._ws.send(buffer)}}destroy(){if(this._ws){const payload={host:this.remoteHost,port:this.remotePort};this.broadcast({type:_defs2.CONNECTION_WILL_CLOSE,payload});this._ws.close(1000);this._ws=null;this.emit('close');this.broadcast({type:_defs2.CONNECTION_CLOSED,payload})}if(this._outbound&&!this._outbound.destroying){this._outbound.destroying=true;const bufferSize=this._outbound.bufferSize;if(bufferSize>0){this._outbound.once('drain',()=>this._outbound.destroy())}else{this._outbound.destroy();this._outbound=null}}}}exports.WsInbound=WsInbound;class WsOutbound extends _defs.Outbound{constructor(props){super(props);this._ws=null;this.destroy=this.destroy.bind(this);this.onError=this.onError.bind(this);this.onReceive=this.onReceive.bind(this)}onError(err){_utils.logger.warn(`[ws:outbound] [${this.remote}] ${err.code||''} - ${err.message}`)}onReceive(buffer){if(this._inbound.writable){const direction=__IS_CLIENT__?_core.PIPE_DECODE:_core.PIPE_ENCODE;this._pipe.feed(direction,buffer)}if(this._inbound&&this._inbound.bufferSize>=MAX_BUFFERED_SIZE){_utils.logger.debug(`[ws:outbound] recv paused due to inbound.bufferSize=${this._inbound.bufferSize} > ${MAX_BUFFERED_SIZE}`);this._ws.pause();this._inbound.once('drain',()=>{if(this._ws){_utils.logger.debug('[ws:outbound] resume to recv');this._ws.resume()}})}}get bufferSize(){return this._ws?this._ws.bufferedAmount:0}get writable(){return this._ws&&this._ws.readyState===_ws2.default.OPEN}write(buffer){if(this.writable){this._ws.send(buffer)}}destroy(){if(this._ws){this._ws.close(1000);this._ws=null}if(this._inbound&&!this._inbound.destroying){this._inbound.destroying=true;const bufferSize=this._inbound.bufferSize;if(bufferSize>0){this._inbound.once('drain',()=>this._inbound.destroy())}else{this._inbound.destroy();this._inbound=null}}}onBroadcast(action){switch(action.type){case _defs2.CONNECT_TO_REMOTE:this.onConnectToRemote(action);break;case _defs2.PRESET_PAUSE_SEND:this.onPresetPauseSend();break;case _defs2.PRESET_RESUME_SEND:this.onPresetResumeSend();break;default:break;}}onConnectToRemote(action){var _this2=this;return _asyncToGenerator(function*(){var _action$payload2=action.payload;const host=_action$payload2.host,port=_action$payload2.port,onConnected=_action$payload2.onConnected;if(__IS_SERVER__){yield _this2.connect({host,port})}if(__IS_CLIENT__){_utils.logger.info(`[ws:outbound] [${_this2.remote}] request: ${host}:${port}`);yield _this2.connect({host:__SERVER_HOST__,port:__SERVER_PORT__})}if(typeof onConnected==='function'){onConnected(_this2._inbound.onReceive)}_this2._pipe.broadcast(null,{type:_defs2.CONNECTED_TO_REMOTE,payload:{host,port}})})()}onPresetPauseSend(){__IS_SERVER__&&this._ws&&this._ws.pause()}onPresetResumeSend(){__IS_SERVER__&&this._ws&&this._ws.resume()}connect({host,port}){var _this3=this;return _asyncToGenerator(function*(){let ip=null;try{ip=yield _this3._dnsCache.get(host)}catch(err){_utils.logger.error(`[ws:outbound] [${_this3.remote}] fail to resolve host ${host}: ${err.message}`)}_utils.logger.info(`[ws:outbound] [${_this3.remote}] connecting to: ws://${host}:${port} resolve=${ip}`);return new Promise(function(resolve){_this3._ws=new _ws2.default(`ws://${host}:${port}`,{perMessageDeflate:false});_this3._ws.on('open',function(){return resolve(_this3._ws)});_this3._ws.on('message',_this3.onReceive);_this3._ws.on('close',_this3.destroy);_this3._ws.on('error',_this3.onError)})})()}}exports.WsOutbound=WsOutbound;