src,test: rename Middleware -> Preset

This commit is contained in:
Micooz 2018-02-17 12:20:33 +08:00
parent 48d6e777dd
commit f18868c800
5 changed files with 221 additions and 139 deletions

@ -1,28 +1,27 @@
import EventEmitter from 'events';
import {Middleware} from './middleware';
import {PIPE_ENCODE} from '../constants';
import {PRESET_FAILED} from '../presets/defs';
import {logger} from '../utils';
import { Preset } from './preset';
import { PIPE_ENCODE } from '../constants';
import { PRESET_FAILED } from '../presets/actions';
import { logger } from '../utils';
// .on('broadcast')
// .on(`pre_${direction}`)
// .on(`post_${direction}`)
export class Pipe extends EventEmitter {
_upstream_middlewares = [];
_downstream_middlewares = [];
_config = null;
_isPipingUdp = false;
_encode_presets = [];
_decode_presets = [];
_cacheBuffer = null;
_rawPresets = null;
_destroyed = false;
_presets = null;
_config = null;
get destroyed() {
return this._destroyed;
}
@ -31,84 +30,79 @@ export class Pipe extends EventEmitter {
return this._presets;
}
constructor({config, presets, isUdp = false}) {
constructor({ config, presets, isUdp = false }) {
super();
this.broadcast = this.broadcast.bind(this);
this.onReadProperty = this.onReadProperty.bind(this);
this._config = config;
this._isPipingUdp = isUdp;
this.createMiddlewares(presets);
// presets
const _presets = presets.map(this._createPreset.bind(this));
this._encode_presets = _presets;
this._decode_presets = [].concat(_presets).reverse();
this._rawPresets = presets;
}
broadcast(name, action) {
const middlewares = this.getMiddlewares();
broadcast = (name, action) => {
const presets = this.getPresets();
const results = [];
for (const middleware of middlewares) {
if (middleware.name !== name) {
results.push(middleware.notify(action));
for (const preset of presets) {
if (preset.name !== name) {
results.push(preset.notify(action));
}
}
// if no middleware handled this action, bubble up to where pipe created.
// if no preset handled this action, bubble up to where pipe created.
if (name !== 'pipe' && results.every((result) => !!result === false)) {
this.emit('broadcast', action);
}
}
};
onReadProperty(name, presetName, propertyName) {
const middlewares = this.getMiddlewares();
const ms = middlewares.find((m) => m.name === presetName);
if (ms) {
const impl = ms.getImplement();
onReadProperty = (fromName, targetName, propertyName) => {
const presets = this.getPresets();
const preset = presets.find((m) => m.name === targetName);
if (preset) {
const impl = preset.getImplement();
const value = impl[propertyName];
return value !== undefined ? value : impl.constructor[propertyName];
} else {
logger.warn(`[preset] "${name}" cannot read property from nonexistent preset "${presetName}".`);
logger.warn(`[preset] "${fromName}" cannot read property from nonexistent preset "${targetName}".`);
}
}
};
createMiddlewares(presets) {
const middlewares = presets.map((preset, i) => this._createMiddleware(preset, i));
this._upstream_middlewares = middlewares;
this._downstream_middlewares = [].concat(middlewares).reverse();
this._presets = presets;
}
getMiddlewares(direction = PIPE_ENCODE) {
getPresets(direction = PIPE_ENCODE) {
if (direction === PIPE_ENCODE) {
return this._upstream_middlewares || [];
return this._encode_presets || [];
} else {
return this._downstream_middlewares || [];
return this._decode_presets || [];
}
}
updateMiddlewares(presets) {
// create index of previous middlewares for fast locate
updatePresets(rawPresets) {
// create index of previous presets for fast locate
const mdIndex = {};
for (const md of this.getMiddlewares()) {
mdIndex[md.name] = md;
for (const preset of this.getPresets()) {
mdIndex[preset.name] = preset;
}
// create non-exist middleware and reuse exist one
const middlewares = [];
for (let i = 0; i < presets.length; i++) {
const preset = presets[i];
let md = mdIndex[preset.name];
if (md) {
// create non-exist preset and reuse exist one
const presets = [];
for (let i = 0; i < rawPresets.length; i++) {
const rawPreset = rawPresets[i];
let preset = mdIndex[rawPreset.name];
if (preset) {
// remove all listeners for later re-chain later in _feed()
md.removeAllListeners();
preset.removeAllListeners();
// keep common listeners
this._attachEvents(md);
delete mdIndex[preset.name];
this._attachEvents(preset);
delete mdIndex[rawPreset.name];
} else {
md = this._createMiddleware(preset, i);
preset = this._createPreset(rawPreset, i);
}
middlewares.push(md);
presets.push(preset);
}
// destroy redundant middlewares
Object.keys(mdIndex).forEach((key) => mdIndex[key].onDestroy());
// destroy redundant presets
Object.keys(mdIndex).forEach((key) => mdIndex[key].destroy());
// update members
this._upstream_middlewares = middlewares;
this._downstream_middlewares = [].concat(middlewares).reverse();
this._presets = presets;
this._encode_presets = presets;
this._decode_presets = [].concat(presets).reverse();
this._presets = rawPresets;
}
feed(direction, buffer, extraArgs) {
@ -128,62 +122,58 @@ export class Pipe extends EventEmitter {
}
destroy() {
if (this._destroyed) {
return;
if (!this._destroyed) {
this.getPresets().forEach((preset) => preset.destroy());
this._encode_presets = null;
this._decode_presets = null;
this._rawPresets = null;
this._cacheBuffer = null;
this._destroyed = true;
this.removeAllListeners();
}
const middlewares = this.getMiddlewares();
for (const middleware of middlewares) {
middleware.onDestroy();
}
this._upstream_middlewares = null;
this._downstream_middlewares = null;
this._presets = null;
this._cacheBuffer = null;
this._destroyed = true;
this.removeAllListeners();
}
_createMiddleware(preset, index) {
const middleware = new Middleware({config: this._config, preset});
this._attachEvents(middleware);
_createPreset(rawPreset, index) {
const _preset = new Preset({ config: this._config, preset: rawPreset });
this._attachEvents(_preset);
// set readProperty() and getStore()
const impl = middleware.getImplement();
impl.readProperty = (...args) => this.onReadProperty(middleware.name, ...args);
const impl = _preset.getImplement();
impl.readProperty = (...args) => this.onReadProperty(_preset.name, ...args);
impl.getStore = () => this._config.stores[index];
return middleware;
return _preset;
}
_attachEvents(middleware) {
middleware.setMaxListeners(4);
middleware.on('broadcast', this.broadcast);
middleware.on('fail', (name, message) => void this.broadcast(name, {
_attachEvents(preset) {
preset.setMaxListeners(4);
preset.on('broadcast', this.broadcast);
preset.on('fail', (name, message) => void this.broadcast(name, {
type: PRESET_FAILED,
payload: {
name,
message,
orgData: this._cacheBuffer
}
orgData: this._cacheBuffer,
},
}));
}
_feed(direction, buffer, extraArgs) {
const middlewares = this.getMiddlewares(direction);
const presets = this.getPresets(direction);
// args to be injected
const isUdp = this._isPipingUdp;
const direct = (buf, isReverse = false) => this.emit(isReverse ? `post_${-direction}` : `post_${direction}`, buf);
// create event chain among middlewares
// create event chain among presets
const event = `next_${direction}`;
const first = middlewares[0];
const first = presets[0];
if (!first.hasListener(event)) {
const last = middlewares.reduce((prev, next) => {
prev.on(event, (buf) => next.write({direction, buffer: buf, direct, isUdp}, extraArgs));
const last = presets.reduce((prev, next) => {
prev.on(event, (buf) => next.write({ direction, buffer: buf, direct, isUdp }, extraArgs));
return next;
});
// the last middleware send data out via direct(buf, false)
// the last preset send data out via direct(buf, false)
last.on(event, direct);
}
// begin pipe
first.write({direction, buffer, direct, isUdp}, extraArgs);
first.write({ direction, buffer, direct, isUdp }, extraArgs);
}
}

@ -1,30 +1,27 @@
import EventEmitter from 'events';
import {getPresetClassByName} from '../presets';
import {PIPE_ENCODE} from '../constants';
import {kebabCase} from '../utils';
import { getPresetClassByName } from '../presets';
import { kebabCase } from '../utils';
import { PIPE_ENCODE } from '../constants';
function createPreset({config, preset}) {
function createPreset({ config, preset }) {
const name = preset.name;
const params = preset.params || {};
const ImplClass = getPresetClassByName(name);
const instance = new ImplClass({config, params});
const instance = new ImplClass({ config, params });
instance.onInit(params);
return instance;
}
export class Middleware extends EventEmitter {
export class Preset extends EventEmitter {
_config = null;
_impl = null;
constructor({config, preset}) {
constructor({ config, preset }) {
super();
this.onPresetNext = this.onPresetNext.bind(this);
this.onPresetBroadcast = this.onPresetBroadcast.bind(this);
this.onPresetFail = this.onPresetFail.bind(this);
this._config = config;
this._impl = createPreset({config, preset});
this._impl = createPreset({ config, preset });
this._impl.next = this.onPresetNext;
this._impl.broadcast = this.onPresetBroadcast;
this._impl.fail = this.onPresetFail;
@ -46,24 +43,19 @@ export class Middleware extends EventEmitter {
return this._impl.onNotified(action);
}
onPresetNext(direction, buffer) {
onPresetNext = (direction, buffer) => {
this.emit(`next_${direction}`, buffer);
}
};
onPresetBroadcast(action) {
onPresetBroadcast = (action) => {
this.emit('broadcast', this.name, action);
}
};
onPresetFail(message) {
onPresetFail = (message) => {
this.emit('fail', this.name, message);
}
};
onDestroy() {
this._impl.onDestroy();
this.removeAllListeners();
}
write({direction, buffer, direct, isUdp}, extraArgs) {
write({ direction, buffer, direct, isUdp }, extraArgs) {
const type = (direction === PIPE_ENCODE ? 'Out' : 'In') + (isUdp ? 'Udp' : '');
// prepare args
@ -79,7 +71,7 @@ export class Middleware extends EventEmitter {
// clientXXX, serverXXX
const nextLifeCycleHook = (buf/* , isReverse = false */) => {
const args = {buffer: buf, next, broadcast, direct, fail};
const args = { buffer: buf, next, broadcast, direct, fail };
const ret = this._config.is_client ? this._impl[`client${type}`](args, extraArgs) : this._impl[`server${type}`](args, extraArgs);
if (ret instanceof Buffer) {
next(ret);
@ -88,11 +80,16 @@ export class Middleware extends EventEmitter {
// beforeXXX
// NOTE: next(buf, isReverse) is not available in beforeXXX
const args = {buffer, next: nextLifeCycleHook, broadcast, direct, fail};
const args = { buffer, next: nextLifeCycleHook, broadcast, direct, fail };
const ret = this._impl[`before${type}`](args, extraArgs);
if (ret instanceof Buffer) {
nextLifeCycleHook(ret);
}
}
destroy() {
this._impl.onDestroy();
this.removeAllListeners();
}
}

95
src/presets/actions.js Normal file

@ -0,0 +1,95 @@
// - pushed by relay
/**
* {
* type: CONNECTION_CREATED,
* payload: {
* host: '127.0.0.1',
* port: 12345
* }
* }
*/
export const CONNECTION_CREATED = '@action:connection_created';
/**
* {
* type: CONNECTION_CLOSED
* payload: {
* host: '127.0.0.1',
* port: 12345
* }
* }
*/
export const CONNECTION_CLOSED = '@action:connection_closed';
/**
* {
* type: CONNECTION_WILL_CLOSE
* payload: {
* host: '127.0.0.1',
* port: 12345
* }
* }
*/
export const CONNECTION_WILL_CLOSE = '@action:connection_will_close';
// - emitted by presets
/**
* {
* type: CONNECT_TO_REMOTE,
* payload: {
* host: 'bing.com',
* port: 443,
* onConnected: () => {},
* keepAlive: false
* }
* }
*/
export const CONNECT_TO_REMOTE = '@action:connect_to_remote';
/**
* {
* type: CONNECTED_TO_REMOTE,
* payload: {
* host: 'bing.com',
* port: 443
* }
* }
*/
export const CONNECTED_TO_REMOTE = '@action:connected_to_remote';
/**
* {
* type: PRESET_FAILED,
* payload: {
* name: 'custom' or null,
* message: 'explain',
* orgData: <Buffer> or null
* }
* }
*/
export const PRESET_FAILED = '@action:preset_failed';
/**
* {
* type: CHANGE_PRESET_SUITE,
* payload: {
* type: <PIPE_ENCODE|PIPE_DECODE>,
* suite: [...],
* data: <Buffer>
* }
* }
*/
export const CHANGE_PRESET_SUITE = '@action:change_preset_suite';
export const PRESET_CLOSE_CONNECTION = '@action:preset_close_connection';
export const PRESET_PAUSE_RECV = '@action:preset_pause_recv';
export const PRESET_PAUSE_SEND = '@action:preset_pause_send';
export const PRESET_RESUME_RECV = '@action:preset_resume_recv';
export const PRESET_RESUME_SEND = '@action:preset_resume_send';
export const MUX_NEW_CONN = '@action:mux_new_conn';
export const MUX_DATA_FRAME = '@action:mux_data_frame';
export const MUX_CLOSE_CONN = '@action:mux_close_conn';

@ -1,23 +0,0 @@
import {Middleware} from '../../../src/core/middleware';
test('Middleware#constructor', () => {
expect(() => new Middleware({preset: {'name': 'unknown-preset'}})).toThrow();
});
test('Middleware#hasListener', () => {
const middleware = new Middleware({preset: {'name': 'ss-base'}});
expect(middleware.hasListener('event')).toBe(false);
});
test('Middleware#onPresetNext', () => {
const middleware = new Middleware({preset: {'name': 'ss-base'}});
middleware.on('next_1', (arg) => {
expect(arg).toBe(null);
});
middleware.onPresetNext(1, null);
});
test('Middleware#getImplement', () => {
const middleware = new Middleware({preset: {'name': 'ss-base'}});
expect(middleware.getImplement()).toBeDefined();
});

@ -0,0 +1,23 @@
import { Preset } from '../../../src';
test('Preset#constructor', () => {
expect(() => new Preset({ preset: { 'name': 'unknown-preset' } })).toThrow();
});
test('Preset#hasListener', () => {
const preset = new Preset({ preset: { 'name': 'ss-base' } });
expect(preset.hasListener('event')).toBe(false);
});
test('Preset#onPresetNext', () => {
const preset = new Preset({ preset: { 'name': 'ss-base' } });
preset.on('next_1', (arg) => {
expect(arg).toBe(null);
});
preset.onPresetNext(1, null);
});
test('Preset#getImplement', () => {
const preset = new Preset({ preset: { 'name': 'ss-base' } });
expect(preset.getImplement()).toBeDefined();
});