Merge pull request #104 from blinksocks/feature-http2-transport

Feature: add http2 transport
This commit is contained in:
Micooz Lee 2018-06-17 11:45:28 +08:00 committed by GitHub
commit f463a5ae60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 519 additions and 126 deletions

@ -5,7 +5,7 @@ node_js:
- "node" - "node"
before_deploy: before_deploy:
- export NEXT_VERSION=3.3.1 - export NEXT_VERSION=3.3.2
- export COMMIT_HASH=$(git log --format=%h -1) - export COMMIT_HASH=$(git log --format=%h -1)
- export DIST_PATH=build - export DIST_PATH=build
- export PUBLISH_REPO=blinksocks/blinksocks-nightly-releases - export PUBLISH_REPO=blinksocks/blinksocks-nightly-releases

@ -20,7 +20,7 @@
* Cross-platform: running on Linux, Windows and macOS. * Cross-platform: running on Linux, Windows and macOS.
* Lightweight proxy interfaces: Socks5/Socks4/Socks4a and HTTP. * Lightweight proxy interfaces: Socks5/Socks4/Socks4a and HTTP.
* Multiple Transport Layers: TCP, UDP, [TLS], [WebSocket] and [WebSocket/TLS]. * Transport Layer Support: TCP, UDP, [TLS], [HTTP2], [WebSocket] and [WebSocket/TLS].
* TLS/TLS/WebSocket [multiplexing]. * TLS/TLS/WebSocket [multiplexing].
* Convenient protocol [customization]. * Convenient protocol [customization].
* Access Control List([ACL]) support. * Access Control List([ACL]) support.
@ -84,6 +84,7 @@ Apache License 2.0
[customization]: docs/development/api [customization]: docs/development/api
[ACL]: docs/config#access-control-list [ACL]: docs/config#access-control-list
[TLS]: docs/examples/tls [TLS]: docs/examples/tls
[HTTP2]: docs/examples/http2
[WebSocket]: docs/examples/websocket [WebSocket]: docs/examples/websocket
[WebSocket/TLS]: docs/examples/websocket-tls [WebSocket/TLS]: docs/examples/websocket-tls
[multiplexing]: docs/examples/multiplexing [multiplexing]: docs/examples/multiplexing

@ -112,9 +112,9 @@ $ blinksocks init
The `<protocol>` should be: The `<protocol>` should be:
* On client side: `tcp`, `socks`/`socks5`/`socks4`/`socks4a`, `http` or `https`. * On client side: `tcp`, `socks`/`socks5`/`socks4`/`socks4a`, `http` or `https`.
* On server side: `tcp`, `tls`, `ws` or `wss`. * On server side: `tcp`, `tls`, `ws`, `wss` or `h2`.
#### Service Authentication #### Service Authentication (client side only)
* Create a **http/https** service with [Basic Authentication](https://www.iana.org/go/rfc7617). * Create a **http/https** service with [Basic Authentication](https://www.iana.org/go/rfc7617).
@ -136,7 +136,7 @@ The `<protocol>` should be:
} }
``` ```
#### Service Params #### Service Params (client side only)
**?forward=<host>:<port>** **?forward=<host>:<port>**
@ -183,7 +183,7 @@ In this case, it uses [iperf](https://en.wikipedia.org/wiki/Iperf) to test netwo
For more information about presets, please check out [presets]. For more information about presets, please check out [presets].
### Access Control List ### Access Control List (server side only)
You can enable ACL on **server** by setting **acl: true** and provide a acl configuration file in **acl_conf**: You can enable ACL on **server** by setting **acl: true** and provide a acl configuration file in **acl_conf**:

@ -0,0 +1,9 @@
example.com {
proxy /<your-custom-path> https://127.0.0.1:59463 {
insecure_skip_verify
header_upstream Host {host}
header_upstream X-Real-IP {remote}
header_upstream X-Forwarded-For {remote}
header_upstream X-Forwarded-Proto {scheme}
}
}

@ -0,0 +1,40 @@
# http2-caddy
**Minimal Version Required: v3.3.2**
blinksocks can transfer data through [caddy] proxy server using http2:
```
+--------------------------------------------------+
| Caddy Server |
+-------------+ | +-----------+ | +------------+
| | h2://site.com/path | :433 h2://127.0.0.1:1234 | | | tcp:// | |
| bs-client <-----------------------> proxy /path +--------------------> bs-server <-------------> Target |
| | (encrypted) | (encrypted) | | | (raw) | |
+-------------+ | +-----------+ | +------------+
| |
+--------------------------------------------------+
```
When use `h2://` as transport on **server side**, make sure both `tls_cert` and `tls_key` is provided:
```
{
...
"tls_key": "key.pem",
"tls_cert": "cert.pem"
...
}
```
**self-signed** tls_cert is ok because we set `insecure_skip_verify` in Caddyfile.
## Generate key.pem and cert.pem
```
// self-signed certificate
$ openssl req -x509 -newkey rsa:4096 -nodes -sha256 -subj '/CN=example.com' \
-keyout key.pem -out cert.pem
```
[caddy]: https://caddyserver.com

@ -0,0 +1,15 @@
{
"service": "socks5://127.0.0.1:1080",
"server": {
"service": "h2://example.com:443/your-custom-path",
"key": "zAcy9wve53gpm{YC",
"presets": [
{
"name": "ss-base"
},
{
"name": "obfs-random-padding"
}
]
}
}

@ -0,0 +1,14 @@
{
"service": "h2://0.0.0.0:64270",
"key": "zAcy9wve53gpm{YC",
"presets": [
{
"name": "ss-base"
},
{
"name": "obfs-random-padding"
}
],
"tls_key": "key.pem",
"tls_cert": "cert.pem"
}

@ -0,0 +1,32 @@
# http2
**Minimal Version Required: v3.3.2**
blinksocks can transfer data using `http2`:
```
+-------------+ +-------------+ +------------+
| | h2://site.com/path | | tcp:// | |
| bs-client <----------------------> bs-server <-----------> Target |
| | | | | |
+-------------+ +-------------+ +------------+
```
When use `h2://` as transport, make sure both `tls_cert` and `tls_key` is provided to `bs-server`.
> If your are using self-signed certificate on server, please also provide the same `tls_cert` on client and also set `"tls_cert_self_signed": true`.
Make sure you provide **Common Name** of certificate NOT IP in client config:
```
{
...
"server": {
"service": "h2://<Common Name>:<port>",
"tls_cert": "cert.pem",
"tls_cert_self_signed": true
...
},
...
}
```

@ -0,0 +1,15 @@
{
"service": "socks5://127.0.0.1:1080",
"server": {
"service": "h2://example.com:18732",
"key": "TZr[JmZYjNJ3USYq",
"presets": [
{
"name": "ss-base"
},
{
"name": "obfs-random-padding"
}
]
}
}

@ -0,0 +1,12 @@
{
"service": "tcp://0.0.0.0:18732",
"key": "TZr[JmZYjNJ3USYq",
"presets": [
{
"name": "ss-base"
},
{
"name": "obfs-random-padding"
}
]
}

@ -235,10 +235,10 @@ export class ACL extends EventEmitter {
}); });
} }
constructor({ remoteInfo, rules, max_tries = DEFAULT_MAX_TRIES }) { constructor({ sourceAddress, rules, max_tries = DEFAULT_MAX_TRIES }) {
super(); super();
this._sourceHost = remoteInfo.host; this._sourceHost = sourceAddress.host;
this._sourcePort = remoteInfo.port; this._sourcePort = sourceAddress.port;
this._rules = rules; this._rules = rules;
this._maxTries = max_tries; this._maxTries = max_tries;
} }

@ -142,7 +142,7 @@ export class Config {
this.server_pathname = pathname; this.server_pathname = pathname;
// preload tls_cert or tls_key // preload tls_cert or tls_key
if (this.server_protocol === 'tls' || this.server_protocol === 'wss') { if (['tls', 'wss', 'h2'].includes(this.server_protocol)) {
if (this.is_client) { if (this.is_client) {
this.tls_cert_self_signed = !!server.tls_cert_self_signed; this.tls_cert_self_signed = !!server.tls_cert_self_signed;
} }
@ -370,14 +370,14 @@ export class Config {
const proto = protocol.slice(0, -1); const proto = protocol.slice(0, -1);
const available_server_protocols = [ const available_server_protocols = [
'tcp', 'ws', 'wss', 'tls', 'tcp', 'ws', 'wss', 'tls', 'h2'
]; ];
if (!available_server_protocols.includes(proto)) { if (!available_server_protocols.includes(proto)) {
throw Error(`service.protocol must be: ${available_server_protocols.join(', ')}`); throw Error(`service.protocol must be: ${available_server_protocols.join(', ')}`);
} }
// tls_cert, tls_key // tls_cert, tls_key
if (proto === 'tls' || proto === 'wss') { if (['tls', 'wss', 'h2'].includes(proto)) {
if (from_client && server.tls_cert_self_signed) { if (from_client && server.tls_cert_self_signed) {
if (typeof server.tls_cert !== 'string' || server.tls_cert === '') { if (typeof server.tls_cert !== 'string' || server.tls_cert === '') {
throw Error('"tls_cert" must be provided when "tls_cert_self_signed" is set'); throw Error('"tls_cert" must be provided when "tls_cert_self_signed" is set');

@ -4,6 +4,7 @@ import net from 'net';
import http from 'http'; import http from 'http';
import https from 'https'; import https from 'https';
import { URL } from 'url'; import { URL } from 'url';
import http2 from 'http2';
import tls from 'tls'; import tls from 'tls';
import ws from 'ws'; import ws from 'ws';
import LRU from 'lru-cache'; import LRU from 'lru-cache';
@ -247,6 +248,12 @@ export class Hub {
server.listen(address, () => onListening(server)); server.listen(address, () => onListening(server));
break; break;
} }
case 'h2': {
server = http2.createSecureServer({ key: tls_key, cert: tls_cert });
server.on('stream', (stream) => this._onConnection(stream));
server.listen(address, () => onListening(server));
break;
}
default: default:
return reject(Error(`unsupported protocol: "${local_protocol}"`)); return reject(Error(`unsupported protocol: "${local_protocol}"`));
} }
@ -282,7 +289,7 @@ export class Hub {
if (relay === undefined) { if (relay === undefined) {
const context = { const context = {
socket: server, socket: server,
remoteInfo: { host: address, port: port }, sourceAddress: { host: address, port: port },
}; };
relay = this._createUdpRelay(context); relay = this._createUdpRelay(context);
relay.init({ proxyRequest }); relay.init({ proxyRequest });
@ -330,21 +337,18 @@ export class Hub {
this._upSpeedTester.feed(size); this._upSpeedTester.feed(size);
}; };
_onConnection = (socket, proxyRequest = null) => { _onConnection = (conn, proxyRequest = null) => {
logger.verbose(`[hub] [${socket.remoteAddress}:${socket.remotePort}] connected`); const sourceAddress = this._getSourceAddress(conn);
const updateConnStatus = (event, extra) => this._updateConnStatus(event, sourceAddress, extra);
const sourceAddress = { host: socket.remoteAddress, port: socket.remotePort }; logger.verbose(`[hub] [${sourceAddress.host}:${sourceAddress.port}] connected`);
const updateConnStatus = (event, extra) => {
this._updateConnStatus(event, sourceAddress, extra);
};
updateConnStatus('new'); updateConnStatus('new');
const context = { const context = {
socket, socket: conn,
proxyRequest, proxyRequest,
remoteInfo: sourceAddress, sourceAddress,
}; };
let muxRelay = null, cid = null; let muxRelay = null, cid = null;
@ -388,6 +392,18 @@ export class Hub {
this._tcpRelays.set(relay.id, relay); this._tcpRelays.set(relay.id, relay);
}; };
_getSourceAddress(conn) {
let sourceHost, sourcePort;
if (conn.session) {
sourceHost = conn.session.socket.remoteAddress;
sourcePort = conn.session.socket.remotePort;
} else {
sourceHost = conn.remoteAddress;
sourcePort = conn.remotePort;
}
return { host: sourceHost, port: sourcePort };
}
_getMuxRelayOnClient(context, cid) { _getMuxRelayOnClient(context, cid) {
// get a mux relay // get a mux relay
let muxRelay = this._selectMuxRelay(); let muxRelay = this._selectMuxRelay();
@ -395,7 +411,7 @@ export class Hub {
// create a mux relay if needed // create a mux relay if needed
if (muxRelay === null) { if (muxRelay === null) {
const updateConnStatus = (event, extra) => { const updateConnStatus = (event, extra) => {
const sourceAddress = context.remoteInfo; const { sourceAddress } = context;
this._updateConnStatus(event, sourceAddress, extra); this._updateConnStatus(event, sourceAddress, extra);
}; };
muxRelay = this._createRelay(context, true); muxRelay = this._createRelay(context, true);

@ -80,7 +80,7 @@ export class MuxRelay extends Relay {
transport: 'mux', transport: 'mux',
context: { context: {
socket: this._ctx.socket, socket: this._ctx.socket,
remoteInfo: this._ctx.remoteInfo, sourceAddress: this._ctx.sourceAddress,
cid, cid,
muxRelay, // NOTE: associate the mux relay here muxRelay, // NOTE: associate the mux relay here
} }
@ -165,11 +165,11 @@ export class MuxRelay extends Relay {
} }
_getRandomMuxRelay() { _getRandomMuxRelay() {
const { muxRelays, remoteInfo } = this._ctx; const { muxRelays, sourceAddress } = this._ctx;
const relays = [...muxRelays.values()].filter((relay) => const relays = [...muxRelays.values()].filter((relay) =>
relay._ctx && relay._ctx &&
relay._ctx.remoteInfo.host === remoteInfo.host && relay._ctx.sourceAddress.host === sourceAddress.host &&
relay._ctx.remoteInfo.port === remoteInfo.port relay._ctx.sourceAddress.port === sourceAddress.port
); );
return relays[getRandomInt(0, relays.length - 1)]; return relays[getRandomInt(0, relays.length - 1)];
} }

@ -1,13 +1,14 @@
import EventEmitter from 'events'; import EventEmitter from 'events';
import { ACL } from './acl'; import { ACL, ACL_CLOSE_CONNECTION } from './acl';
import { Pipe } from './pipe'; import { Pipe } from './pipe';
import { Tracker } from './tracker'; import { Tracker } from './tracker';
import { logger } from '../utils'; import { getRandomInt, logger } from '../utils';
import { import {
TcpInbound, TcpOutbound, TcpInbound, TcpOutbound,
UdpInbound, UdpOutbound, UdpInbound, UdpOutbound,
TlsInbound, TlsOutbound, TlsInbound, TlsOutbound,
Http2Inbound, Http2Outbound,
WsInbound, WsOutbound, WsInbound, WsOutbound,
WssInbound, WssOutbound, WssInbound, WssOutbound,
MuxInbound, MuxOutbound, MuxInbound, MuxOutbound,
@ -28,7 +29,7 @@ import { PIPE_ENCODE, PIPE_DECODE, CONNECT_TO_REMOTE, PRESET_FAILED } from '../c
// .on('_connect') // .on('_connect')
// .on('_read') // .on('_read')
// .on('_write') // .on('_write')
// .on('_error'); // .on('_error')
// .on('close') // .on('close')
export class Relay extends EventEmitter { export class Relay extends EventEmitter {
@ -46,7 +47,7 @@ export class Relay extends EventEmitter {
_transport = null; _transport = null;
_remoteInfo = null; _sourceAddress = null;
_proxyRequest = null; _proxyRequest = null;
@ -78,7 +79,7 @@ export class Relay extends EventEmitter {
this._id = Relay.idcounter++; this._id = Relay.idcounter++;
this._config = config; this._config = config;
this._transport = transport; this._transport = transport;
this._remoteInfo = context.remoteInfo; this._sourceAddress = context.sourceAddress;
// pipe // pipe
this._presets = this.preparePresets(presets); this._presets = this.preparePresets(presets);
this._pipe = this.createPipe(this._presets); this._pipe = this.createPipe(this._presets);
@ -106,12 +107,12 @@ export class Relay extends EventEmitter {
this._inbound.on('close', () => this.onBoundClose(inbound, outbound)); this._inbound.on('close', () => this.onBoundClose(inbound, outbound));
// acl // acl
if (config.acl) { if (config.acl) {
this._acl = new ACL({ remoteInfo: this._remoteInfo, rules: config.acl_rules }); this._acl = new ACL({ sourceAddress: this._sourceAddress, rules: config.acl_rules });
this._acl.on('action', this.onBroadcast.bind(this)); this._acl.on('action', this.onBroadcast.bind(this));
} }
// tracker // tracker
this._tracker = new Tracker({ config, transport }); this._tracker = new Tracker({ config, transport });
this._tracker.setSourceAddress(this._remoteInfo.host, this._remoteInfo.port); this._tracker.setSourceAddress(this._sourceAddress.host, this._sourceAddress.port);
} }
init({ proxyRequest }) { init({ proxyRequest }) {
@ -132,6 +133,7 @@ export class Relay extends EventEmitter {
'tcp': [TcpInbound, TcpOutbound], 'tcp': [TcpInbound, TcpOutbound],
'udp': [UdpInbound, UdpOutbound], 'udp': [UdpInbound, UdpOutbound],
'tls': [TlsInbound, TlsOutbound], 'tls': [TlsInbound, TlsOutbound],
'h2': [Http2Inbound, Http2Outbound],
'ws': [WsInbound, WsOutbound], 'ws': [WsInbound, WsOutbound],
'wss': [WssInbound, WssOutbound], 'wss': [WssInbound, WssOutbound],
'mux': [MuxInbound, MuxOutbound], 'mux': [MuxInbound, MuxOutbound],
@ -168,7 +170,7 @@ export class Relay extends EventEmitter {
onBroadcast(action) { onBroadcast(action) {
if (action.type === CONNECT_TO_REMOTE) { if (action.type === CONNECT_TO_REMOTE) {
const { host: sourceHost, port: sourcePort } = this._remoteInfo; const { host: sourceHost, port: sourcePort } = this._sourceAddress;
const { host: targetHost, port: targetPort } = action.payload; const { host: targetHost, port: targetPort } = action.payload;
const remote = `${sourceHost}:${sourcePort}`; const remote = `${sourceHost}:${sourcePort}`;
const target = `${targetHost}:${targetPort}`; const target = `${targetHost}:${targetPort}`;
@ -192,6 +194,15 @@ export class Relay extends EventEmitter {
if (this._acl && this._acl.checkFailTimes(this._config.acl_tries)) { if (this._acl && this._acl.checkFailTimes(this._config.acl_tries)) {
return; return;
} }
this.onPresetFailed(action);
return;
}
if (action.type === ACL_CLOSE_CONNECTION) {
const transport = this._transport;
const remote = `${this._sourceAddress.host}:${this._sourceAddress.port}`;
logger.warn(`[relay] [${transport}] [${remote}] acl request to close this connection`);
this.destroy();
return;
} }
this._inbound && this._inbound.onBroadcast(action); this._inbound && this._inbound.onBroadcast(action);
this._outbound && this._outbound.onBroadcast(action); this._outbound && this._outbound.onBroadcast(action);
@ -233,6 +244,44 @@ export class Relay extends EventEmitter {
} }
}; };
async onPresetFailed(action) {
const { name, message, orgData } = action.payload;
const transport = this._transport;
const remote = `${this._sourceAddress.host}:${this._sourceAddress.port}`;
logger.error(`[relay] [${transport}] [${remote}] preset "${name}" fail to process: ${message}`);
this.emit('_error', new Error(message));
// close connection directly on client side
if (this._config.is_client) {
logger.warn(`[relay] [${transport}] [${remote}] connection closed`);
this.destroy();
}
// for server side, redirect traffic if "redirect" is set, otherwise, close connection after a random timeout
if (this._config.is_server && !this._config.mux) {
if (this._config.redirect) {
const [host, port] = this._config.redirect.split(':');
logger.warn(`[relay] [${transport}] [${remote}] connection is redirecting to: ${host}:${port}`);
// clear preset list
this._pipe.updatePresets([]);
// connect to "redirect" remote
await this._outbound.connect({ host, port: +port });
if (this._outbound.writable) {
this._outbound.write(orgData);
}
} else {
this._outbound.pause && this._outbound.pause();
const timeout = getRandomInt(5, 30);
logger.warn(`[relay] [${transport}] [${remote}] connection will be closed in ${timeout}s...`);
setTimeout(this.destroy.bind(this), timeout * 1e3);
}
}
}
// methods // methods
encode(buffer, extraArgs) { encode(buffer, extraArgs) {
@ -303,7 +352,7 @@ export class Relay extends EventEmitter {
this._acl = null; this._acl = null;
} }
this._ctx = null; this._ctx = null;
this._remoteInfo = null; this._sourceAddress = null;
this._proxyRequest = null; this._proxyRequest = null;
} }
} }

@ -355,7 +355,7 @@ export function createServer({ bindAddress, bindPort, username, password }) {
// Username/Password Authentication // Username/Password Authentication
if (isAuthRequired) { if (isAuthRequired) {
if (username !== request.username || password !== request.password) { if (username !== request.username || password !== request.password) {
logger.error(`[socks] [${appAddress}] invalid socks5 authorization, username=${request.username} password=${request.password}`); logger.error(`[socks] [${appAddress}] invalid socks5 authorization username/password, dump=${dumpHex(buffer)}`);
socket.end(Buffer.from([SOCKS_VERSION_V5, 0x01])); socket.end(Buffer.from([SOCKS_VERSION_V5, 0x01]));
return; return;
} }

@ -18,11 +18,11 @@ class Bound extends EventEmitter {
} }
get remoteHost() { get remoteHost() {
return this.ctx.remoteInfo.host; return this.ctx.sourceAddress.host;
} }
get remotePort() { get remotePort() {
return this.ctx.remoteInfo.port; return this.ctx.sourceAddress.port;
} }
get remote() { get remote() {

232
src/transports/h2.js Normal file

@ -0,0 +1,232 @@
import http2 from 'http2';
import { Inbound, Outbound } from './defs';
import { logger } from '../utils';
import {
CONNECT_TO_REMOTE,
CONNECTED_TO_REMOTE,
PIPE_DECODE,
PIPE_ENCODE,
} from '../constants';
const { HTTP2_HEADER_PATH, HTTP2_HEADER_METHOD } = http2.constants;
export class Http2Inbound extends Inbound {
_session = null;
_stream = null;
_destroyed = false;
constructor(props) {
super(props);
this.onError = this.onError.bind(this);
this.onReceive = this.onReceive.bind(this);
this.onTimeout = this.onTimeout.bind(this);
this.onClose = this.onClose.bind(this);
if (this.ctx.socket) {
this._stream = this.ctx.socket;
this._session = this._stream.session;
this._stream.on('data', this.onReceive);
this._session.on('error', this.onError);
this._session.on('timeout', this.onTimeout);
this._session.on('close', this.onClose);
this._session.setTimeout(this._config.timeout);
}
}
get name() {
return 'h2:inbound';
}
get writable() {
return this._stream && this._stream.writable;
}
write(buffer) {
if (this.writable) {
this._stream.write(buffer);
}
}
onError(err) {
logger.warn(`[${this.name}] [${this.remote}] ${err.message}`);
this.emit('_error', err);
}
onReceive(buffer) {
const direction = this._config.is_client ? PIPE_ENCODE : PIPE_DECODE;
this.ctx.pipe.feed(direction, buffer);
}
onTimeout() {
logger.warn(`[${this.name}] [${this.remote}] timeout: no I/O on the connection for ${this._config.timeout / 1e3}s`);
this.onClose();
}
onClose() {
this.close();
if (this._outbound) {
this._outbound.close();
this._outbound = null;
}
}
close() {
if (this._session) {
this._session.destroy();
this._session = null;
}
if (!this._destroyed) {
this._destroyed = true;
this.emit('close');
}
}
}
export class Http2Outbound extends Outbound {
_session = null;
_stream = null;
_destroyed = false;
constructor(props) {
super(props);
this.onError = this.onError.bind(this);
this.onReceive = this.onReceive.bind(this);
this.onTimeout = this.onTimeout.bind(this);
this.onClose = this.onClose.bind(this);
}
get name() {
return 'h2:outbound';
}
get writable() {
return this._stream && this._stream.writable;
}
write(buffer) {
if (this.writable) {
this._stream.write(buffer);
}
}
onError(err) {
logger.warn(`[${this.name}] [${this.remote}] ${err.message}`);
this.emit('_error', err);
}
onReceive(buffer) {
const direction = this._config.is_client ? PIPE_DECODE : PIPE_ENCODE;
this.ctx.pipe.feed(direction, buffer);
}
onTimeout() {
logger.warn(`[${this.name}] [${this.remote}] timeout: no I/O on the connection for ${this._config.timeout / 1e3}s`);
this.onClose();
}
onClose() {
this.close();
if (this._inbound) {
this._inbound.close();
this._inbound = null;
}
}
close() {
if (this._session) {
this._session.destroy();
this._session = null;
}
if (!this._destroyed) {
this._destroyed = true;
this.emit('close');
}
}
onBroadcast(action) {
switch (action.type) {
case CONNECT_TO_REMOTE:
this.onConnectToRemote(action);
break;
default:
break;
}
}
async onConnectToRemote(action) {
const { host, port, keepAlive, onConnected } = action.payload;
if (!keepAlive || !this._session) {
const { server_host, server_port, server_pathname } = this._config;
try {
await this.connect({
host: server_host,
port: server_port,
pathname: server_pathname,
});
// session
this._session.on('connect', () => {
if (typeof onConnected === 'function') {
try {
onConnected((buffer) => {
if (buffer) {
const type = this._config.is_client ? PIPE_ENCODE : PIPE_DECODE;
this.ctx.pipe.feed(type, buffer, { cid: this.ctx.proxyRequest.cid, host, port });
}
});
} catch (err) {
logger.error(`[${this.name}] [${this.remote}] onConnected callback error: ${err.message}`);
this.emit('_error', err);
}
}
this.broadcast({ type: CONNECTED_TO_REMOTE, payload: { host, port } });
});
// stream
this._stream = this._session.request({
[HTTP2_HEADER_METHOD]: 'POST',
[HTTP2_HEADER_PATH]: server_pathname || '/',
}, {
endStream: false,
});
this._stream.on('error', this.onError);
this._stream.on('data', this.onReceive);
} catch (err) {
logger.warn(`[${this.name}] [${this.remote}] cannot connect to ${server_host}:${server_port}, ${err.message}`);
this.emit('_error', err);
this.onClose();
}
} else {
this.broadcast({ type: CONNECTED_TO_REMOTE, payload: { host, port } });
}
}
async connect({ host, port, pathname }) {
// close alive connection before create a new one
if (this._session && !this._session.closed) {
this._session.destroy();
}
const address = `h2://${host}:${port}` + (pathname ? pathname : '');
logger.info(`[${this.name}] [${this.remote}] connecting to ${address}`);
const options = {};
if (this._config.tls_cert_self_signed) {
options.ca = this._config.tls_cert;
}
this._session = http2.connect(`https://${host}:${port}`, options);
this._session.on('close', this.onClose);
this._session.on('timeout', this.onTimeout);
this._session.on('error', this.onError);
this._session.setTimeout(this._config.timeout);
}
}

@ -1,6 +1,7 @@
export * from './tcp'; export * from './tcp';
export * from './udp'; export * from './udp';
export * from './tls'; export * from './tls';
export * from './h2';
export * from './ws'; export * from './ws';
export * from './wss'; export * from './wss';
export * from './mux'; export * from './mux';

@ -1,6 +1,5 @@
import { Inbound, Outbound } from './defs'; import { Inbound, Outbound } from './defs';
import { logger } from '../utils'; import { CONNECT_TO_REMOTE, CONNECTED_TO_REMOTE } from '../constants';
import { CONNECT_TO_REMOTE, CONNECTED_TO_REMOTE, PRESET_FAILED } from '../constants';
export class MuxInbound extends Inbound { export class MuxInbound extends Inbound {
@ -55,21 +54,11 @@ export class MuxInbound extends Inbound {
socket.resume(); socket.resume();
} }
break; break;
case PRESET_FAILED:
this.onPresetFailed(action);
break;
default: default:
break; break;
} }
} }
async onPresetFailed(action) {
const { name, message } = action.payload;
logger.error(`[${this.name}] [${this.remote}] preset "${name}" fail to process: ${message}`);
this.emit('_error', new Error(message));
// TODO: maybe have more things to do rather than keep silent
}
onDrain() { onDrain() {
this.emit('drain'); this.emit('drain');
} }

@ -1,6 +1,6 @@
import net from 'net'; import net from 'net';
import { Inbound, Outbound } from './defs'; import { Inbound, Outbound } from './defs';
import { DNSCache, logger, getRandomInt } from '../utils'; import { DNSCache, logger } from '../utils';
import { import {
MAX_BUFFERED_SIZE, MAX_BUFFERED_SIZE,
@ -8,11 +8,9 @@ import {
PIPE_DECODE, PIPE_DECODE,
CONNECT_TO_REMOTE, CONNECT_TO_REMOTE,
CONNECTED_TO_REMOTE, CONNECTED_TO_REMOTE,
PRESET_FAILED,
} from '../constants'; } from '../constants';
import { import {
ACL_CLOSE_CONNECTION,
ACL_PAUSE_RECV, ACL_PAUSE_RECV,
ACL_PAUSE_SEND, ACL_PAUSE_SEND,
ACL_RESUME_RECV, ACL_RESUME_RECV,
@ -150,13 +148,6 @@ export class TcpInbound extends Inbound {
case CONNECTED_TO_REMOTE: case CONNECTED_TO_REMOTE:
this.resume(); this.resume();
break; break;
case PRESET_FAILED:
this.onPresetFailed(action);
break;
case ACL_CLOSE_CONNECTION:
logger.info(`[${this.name}] [${this.remote}] acl request to close connection`);
this.close();
break;
case ACL_PAUSE_RECV: case ACL_PAUSE_RECV:
this.pause(); this.pause();
break; break;
@ -168,42 +159,6 @@ export class TcpInbound extends Inbound {
} }
} }
async onPresetFailed(action) {
const { name, message } = action.payload;
logger.error(`[${this.name}] [${this.remote}] preset "${name}" fail to process: ${message}`);
this.emit('_error', new Error(message));
// close connection directly on client side
if (this._config.is_client) {
logger.warn(`[${this.name}] [${this.remote}] connection closed`);
this.onClose();
}
// for server side, redirect traffic if "redirect" is set, otherwise, close connection after a random timeout
if (this._config.is_server && !this._config.mux) {
if (this._config.redirect) {
const { orgData } = action.payload;
const [host, port] = this._config.redirect.split(':');
logger.warn(`[${this.name}] [${this.remote}] connection is redirecting to: ${host}:${port}`);
// clear preset list
this.ctx.pipe.updatePresets([]);
// connect to "redirect" remote
await this._outbound.connect({ host, port: +port });
if (this._outbound.writable) {
this._outbound.write(orgData);
}
} else {
this.pause();
const timeout = getRandomInt(10, 40);
logger.warn(`[${this.name}] [${this.remote}] connection will be closed in ${timeout}s...`);
setTimeout(this.onClose, timeout * 1e3);
}
}
}
} }
export class TcpOutbound extends Outbound { export class TcpOutbound extends Outbound {
@ -338,17 +293,21 @@ export class TcpOutbound extends Outbound {
async onConnectToRemote(action) { async onConnectToRemote(action) {
const { host, port, keepAlive, onConnected } = action.payload; const { host, port, keepAlive, onConnected } = action.payload;
if (!keepAlive || !this._socket) { if (!keepAlive || !this._socket) {
let targetHost, targetPort;
try { try {
if (this._config.is_server) { if (this._config.is_server) {
await this.connect({ host, port }); targetHost = host;
targetPort = port;
} }
if (this._config.is_client) { if (this._config.is_client) {
await this.connect({ targetHost = this._config.server_host;
host: this._config.server_host, targetPort = this._config.server_port;
port: this._config.server_port,
pathname: this._config.server_pathname,
});
} }
await this.connect({
host: targetHost,
port: targetPort,
pathname: this._config.server_pathname,
});
this._socket.on('connect', () => { this._socket.on('connect', () => {
if (typeof onConnected === 'function') { if (typeof onConnected === 'function') {
try { try {
@ -366,7 +325,7 @@ export class TcpOutbound extends Outbound {
this.broadcast({ type: CONNECTED_TO_REMOTE, payload: { host, port } }); this.broadcast({ type: CONNECTED_TO_REMOTE, payload: { host, port } });
}); });
} catch (err) { } catch (err) {
logger.warn(`[${this.name}] [${this.remote}] cannot connect to ${host}:${port}, ${err.message}`); logger.warn(`[${this.name}] [${this.remote}] cannot connect to ${targetHost}:${targetPort}, ${err.message}`);
this.emit('_error', err); this.emit('_error', err);
this.onClose(); this.onClose();
} }

@ -1,6 +1,6 @@
import dgram from 'dgram'; import dgram from 'dgram';
import { Inbound, Outbound } from './defs'; import { Inbound, Outbound } from './defs';
import { PIPE_ENCODE, PIPE_DECODE, CONNECT_TO_REMOTE, PRESET_FAILED } from '../constants'; import { PIPE_ENCODE, PIPE_DECODE, CONNECT_TO_REMOTE } from '../constants';
import { logger } from '../utils'; import { logger } from '../utils';
export class UdpInbound extends Inbound { export class UdpInbound extends Inbound {
@ -12,7 +12,6 @@ export class UdpInbound extends Inbound {
constructor(props) { constructor(props) {
super(props); super(props);
this.onReceive = this.onReceive.bind(this); this.onReceive = this.onReceive.bind(this);
this.onPresetFailed = this.onPresetFailed.bind(this);
this._socket = this.ctx.socket; this._socket = this.ctx.socket;
} }
@ -22,26 +21,6 @@ export class UdpInbound extends Inbound {
this.ctx.pipe.feed(type, buffer); this.ctx.pipe.feed(type, buffer);
} }
onBroadcast(action) {
switch (action.type) {
case PRESET_FAILED:
this.onPresetFailed(action);
break;
default:
break;
}
}
onPresetFailed(action) {
const { name, message } = action.payload;
logger.error(`[udp:inbound] [${this.remote}] preset "${name}" fail to process: ${message}`);
if (this._outbound) {
this._outbound.close();
this._outbound = null;
}
this.close();
}
write(buffer) { write(buffer) {
const { address, port } = this._rinfo; const { address, port } = this._rinfo;
const onSendError = (err) => { const onSendError = (err) => {

@ -0,0 +1,30 @@
import path from 'path';
import run from '../common/run-e2e';
const tlsKey = path.resolve(__dirname, 'resources', 'key.pem');
const tlsCert = path.resolve(__dirname, 'resources', 'cert.pem');
const clientJson = {
'service': 'socks5://127.0.0.1:1081',
'server': {
'service': 'h2://localhost:1082',
'key': '9{*2gdBSdCrgnSBD',
'presets': [
{ 'name': 'ss-base' },
],
'tls_cert': tlsCert,
'tls_cert_self_signed': true,
},
};
const serverJson = {
'service': 'h2://localhost:1082',
'key': '9{*2gdBSdCrgnSBD',
'presets': [
{ 'name': 'ss-base' },
],
'tls_cert': tlsCert,
'tls_key': tlsKey,
};
test('transport-layer-http2', async () => await run({ clientJson, serverJson }));