blinksocks/src/transports/h2.js

177 lines
4.2 KiB
JavaScript

// import http2 from 'http2';
import { Inbound, Outbound } from './defs';
import { logger } from '../utils';
export class Http2Inbound extends Inbound {
_stream = null;
constructor(props) {
super(props);
this._stream = this._conn;
this._stream.on('data', this.onReceive);
this._stream.on('drain', this.onDrain);
this._stream.on('error', this.onError);
this._stream.on('close', this.onClose);
this._stream.on('timeout', this.onTimeout);
this._stream.setTimeout(this._config.timeout);
}
get name() {
return 'h2:inbound';
}
get bufferSize() {
return this._stream ? this._stream.session.socket.bufferSize : 0;
}
get writable() {
return this._stream && this._stream.writable;
}
write(buffer) {
if (this.writable) {
this._stream.write(buffer);
}
}
onError = (err) => {
logger.warn(`[${this.name}] [${this.remote}] ${err.message}`);
this.emit('_error', err);
};
onReceive = (buffer) => {
this.emit('data', buffer);
};
onDrain = () => {
this.emit('drain');
};
onTimeout = () => {
logger.warn(`[${this.name}] [${this.remote}] timeout: no I/O on the connection for ${this._config.timeout / 1e3}s`);
this.onClose();
};
onClose = () => {
this.close();
const outbound = this.getOutbound();
if (outbound) {
outbound.close();
this.setOutbound(null);
}
};
close() {
if (this._stream) {
this._stream.close();
this._stream = null;
}
if (!this._destroyed) {
this._destroyed = true;
this.emit('close');
}
}
}
export class Http2Outbound extends Outbound {
_stream = null;
get name() {
return 'h2:outbound';
}
get bufferSize() {
return this._stream ? this._stream.session.socket.bufferSize : 0;
}
get writable() {
return this._stream && this._stream.writable;
}
write(buffer) {
if (this.writable) {
this._stream.write(buffer);
}
}
onError = (err) => {
logger.warn(`[${this.name}] [${this.remote}] ${err.message}`);
this.emit('_error', err);
};
onReceive = (buffer) => {
this.emit('data', buffer);
};
onDrain = () => {
this.emit('drain');
};
onTimeout = () => {
logger.warn(`[${this.name}] [${this.remote}] timeout: no I/O on the connection for ${this._config.timeout / 1e3}s`);
this.onClose();
};
onClose = () => {
this.close();
const inbound = this.getInbound();
if (inbound) {
inbound.close();
this.setInbound(null);
}
};
close() {
if (this._stream) {
this._stream.close();
this._stream = null;
}
if (!this._destroyed) {
this._destroyed = true;
this.emit('close');
}
}
async connect() {
return new Promise((resolve) => {
if (!this._stream) {
const { server_host, server_port, server_pathname } = this._config;
const address = `h2://${server_host}:${server_port}` + (server_pathname ? server_pathname : '');
logger.info(`[${this.name}] [${this.remote}] connecting to ${address}`);
try {
const http2 = require('http2');
const { HTTP2_HEADER_PATH, HTTP2_HEADER_METHOD } = http2.constants;
const options = {};
if (this._config.tls_cert_self_signed) {
options.ca = this._config.tls_cert;
}
const session = http2.connect(`https://${server_host}:${server_port}`, options);
session.on('connect', resolve);
this._stream = session.request({
[HTTP2_HEADER_METHOD]: 'POST',
[HTTP2_HEADER_PATH]: server_pathname || '/',
}, {
endStream: false,
});
this._stream.on('error', this.onError);
this._stream.on('data', this.onReceive);
this._stream.on('drain', this.onDrain);
this._stream.on('timeout', this.onTimeout);
this._stream.on('close', this.onClose);
this._stream.setTimeout(this._config.timeout);
} catch (err) {
logger.error(`[${this.name}] [${this.remote}] cannot connect to ${address}, ${err.message}`);
this.emit('_error', err);
this.onClose();
}
} else {
resolve();
}
});
}
}