src: remove preset.js, implement _write() in IPreset

This commit is contained in:
Micooz 2018-05-02 21:20:00 +08:00
parent 08bab711e0
commit 5435494ac3
5 changed files with 76 additions and 155 deletions

@ -1,6 +1,5 @@
export * from './config';
export * from './hub';
export * from './pipe';
export * from './preset';
export * from './relay';
export * from './mux-relay';

@ -1,13 +1,13 @@
import EventEmitter from 'events';
import { Preset } from './preset';
import { logger } from '../utils';
import { PIPE_ENCODE } from '../constants';
import { getPresetClassByName } from '../presets';
import { PRESET_FAILED } from '../presets/actions';
import { IPresetAddressing } from '../presets/defs';
import { logger } from '../utils';
// .on('broadcast')
// .on(`pre_${direction}`)
// .on(`post_${direction}`)
// .on(`pre_${type}`)
// .on(`post_${type}`)
export class Pipe extends EventEmitter {
_config = null;
@ -39,9 +39,8 @@ export class Pipe extends EventEmitter {
initTargetAddress({ host, port }) {
const presets = this.getPresets();
for (const preset of presets) {
const impl = preset.getImplement();
if (impl instanceof IPresetAddressing) {
impl.onInitTargetAddress({ host, port });
if (preset instanceof IPresetAddressing) {
preset.onInitTargetAddress({ host, port });
}
}
}
@ -56,9 +55,8 @@ export class Pipe extends EventEmitter {
const presets = this.getPresets();
const preset = presets.find((m) => m.name === targetName);
if (preset) {
const impl = preset.getImplement();
const value = impl[propertyName];
return value !== undefined ? value : impl.constructor[propertyName];
const value = preset[propertyName];
return value !== undefined ? value : preset.constructor[propertyName];
} else {
logger.warn(`[preset] "${fromName}" cannot read property from nonexistent preset "${targetName}".`);
}
@ -95,7 +93,7 @@ export class Pipe extends EventEmitter {
presets.push(preset);
}
// destroy redundant presets
Object.keys(mdIndex).forEach((key) => mdIndex[key].destroy());
Object.keys(mdIndex).forEach((key) => mdIndex[key].onDestroy());
// update members
this._encode_presets = presets;
this._decode_presets = [].concat(presets).reverse();
@ -120,7 +118,7 @@ export class Pipe extends EventEmitter {
destroy() {
if (!this._destroyed) {
this.getPresets().forEach((preset) => {
preset.destroy();
preset.onDestroy();
preset.removeAllListeners();
});
this._encode_presets = null;
@ -131,13 +129,14 @@ export class Pipe extends EventEmitter {
}
_createPreset(rawPreset, index) {
const _preset = new Preset({ config: this._config, preset: rawPreset });
this._attachEvents(_preset);
// set readProperty() and getStore()
const impl = _preset.getImplement();
impl.readProperty = (...args) => this.onReadProperty(_preset.name, ...args);
impl.getStore = () => this._config.stores[index];
return _preset;
const { name, params = {} } = rawPreset;
const ImplClass = getPresetClassByName(name);
const preset = new ImplClass({ config: this._config, params });
preset.readProperty = (...args) => this.onReadProperty(preset.name, ...args);
preset.getStore = () => this._config.stores[index];
preset.onInit(params);
this._attachEvents(preset);
return preset;
}
_attachEvents(preset) {
@ -153,28 +152,28 @@ export class Pipe extends EventEmitter {
}));
}
_feed(direction, buffer, extraArgs) {
const presets = this.getPresets(direction);
_feed(type, buffer, extraArgs) {
const presets = this.getPresets(type);
// args to be injected
const isUdp = this._isPipingUdp;
const direct = (buf, isReverse = false) => this.emit(isReverse ? `post_${-direction}` : `post_${direction}`, buf);
const direct = (buf, isReverse = false) => this.emit(isReverse ? `post_${-type}` : `post_${type}`, buf);
// check if it's necessary to pipe
if (presets.length < 1) {
return direct(buffer);
}
// create event chain among presets
const event = `next_${direction}`;
const event = `next_${type}`;
const first = presets[0];
if (!first.hasListener(event)) {
if (!first.listenerCount(event) > 0) {
const last = presets.reduce((prev, next) => {
prev.on(event, (buf) => next.write({ direction, buffer: buf, direct, isUdp }, extraArgs));
prev.on(event, (buf) => next._write({ type, buffer: buf, direct, isUdp }, extraArgs));
return next;
});
// the last preset send data out via direct(buf, false)
last.on(event, direct);
}
// begin pipe
first.write({ direction, buffer, direct, isUdp }, extraArgs);
first._write({ type, buffer, direct, isUdp }, extraArgs);
}
}

@ -1,91 +0,0 @@
import EventEmitter from 'events';
import { getPresetClassByName } from '../presets';
import { kebabCase } from '../utils';
import { PIPE_ENCODE } from '../constants';
import { CONNECT_TO_REMOTE } from '../presets/actions';
export class Preset extends EventEmitter {
_config = null;
_impl = null;
constructor({ config, preset }) {
super();
this._config = config;
// initialize preset instance
const { name, params = {} } = preset;
const ImplClass = getPresetClassByName(name);
this._impl = new ImplClass({ config, params });
this._impl.next = this.onPresetNext;
this._impl.broadcast = this.onPresetBroadcast;
this._impl.fail = this.onPresetFail;
this._impl.resolveTargetAddress = this.onResolveTargetAddress;
this._impl.onInit(params);
}
get name() {
return kebabCase(this._impl.constructor.name).replace(/(.*)-preset/i, '$1');
}
getImplement() {
return this._impl;
}
hasListener(event) {
return this.listenerCount(event) > 0;
}
onPresetNext = (direction, buffer) => {
this.emit(`next_${direction}`, buffer);
};
onPresetBroadcast = (action) => {
this.emit('broadcast', this.name, action);
};
onPresetFail = (message) => {
this.emit('fail', this.name, message);
};
onResolveTargetAddress = ({ host, port }, cb) => {
this.onPresetBroadcast({ type: CONNECT_TO_REMOTE, payload: { host, port, onConnected: cb } });
};
write({ direction, buffer, direct, isUdp }, extraArgs) {
const type = (direction === PIPE_ENCODE ? 'Out' : 'In') + (isUdp ? 'Udp' : '');
// prepare args
const broadcast = this.onPresetBroadcast;
const fail = this.onPresetFail;
const next = (processed, isReverse = false) => {
// oh my nice hack to deal with reverse pipeline if haven't been created
const hasListener = this.emit(`next_${isReverse ? -direction : direction}`, processed);
if (!hasListener) {
direct(processed, isReverse);
}
};
// clientXXX, serverXXX
const nextLifeCycleHook = (buf/* , isReverse = false */) => {
const args = { buffer: buf, next, broadcast, direct, fail };
const ret = this._config.is_client ? this._impl[`client${type}`](args, extraArgs) : this._impl[`server${type}`](args, extraArgs);
if (ret instanceof Buffer) {
next(ret);
}
};
// beforeXXX
// NOTE: next(buf, isReverse) is not available in beforeXXX
const args = { buffer, next: nextLifeCycleHook, broadcast, direct, fail };
const ret = this._impl[`before${type}`](args, extraArgs);
if (ret instanceof Buffer) {
nextLifeCycleHook(ret);
}
}
destroy() {
this._impl.onDestroy();
}
}

@ -1,4 +1,8 @@
/* eslint-disable no-unused-vars */
import EventEmitter from 'events';
import { PIPE_ENCODE } from '../constants';
import { CONNECT_TO_REMOTE } from './actions';
import { kebabCase } from '../utils';
/**
* @lifecycle
@ -12,14 +16,46 @@
* @note
* static onCheckParams() and static onCache() are called only once since new Hub().
*/
export class IPreset {
export class IPreset extends EventEmitter {
/**
* config
* @type {Config}
*/
_config = null;
_write({ type, buffer, direct, isUdp }, extraArgs) {
const postfix = (type === PIPE_ENCODE ? 'Out' : 'In') + (isUdp ? 'Udp' : '');
// prepare args
const broadcast = (action) => this.emit('broadcast', this.name, action);
const fail = (message) => this.emit('fail', this.name, message);
const next = (processed, isReverse = false) => {
// oh my nice hack to deal with reverse pipeline if haven't been created
const hasListener = this.emit(`next_${isReverse ? -type : type}`, processed);
if (!hasListener) {
direct(processed, isReverse);
}
};
// clientXXX, serverXXX
const nextLifeCycleHook = (buf/*, isReverse = false */) => {
const args = { buffer: buf, next, broadcast, direct, fail };
const ret = this._config.is_client ? this[`client${postfix}`](args, extraArgs) : this[`server${postfix}`](args, extraArgs);
if (ret instanceof Buffer) {
next(ret);
}
};
// beforeXXX
// NOTE: next(buf, isReverse) is not available in beforeXXX
const args = { buffer, next: nextLifeCycleHook, broadcast, direct, fail };
const ret = this[`before${postfix}`](args, extraArgs);
if (ret instanceof Buffer) {
nextLifeCycleHook(ret);
}
}
get name() {
return kebabCase(this.constructor.name).replace(/(.*)-preset/i, '$1');
}
/**
* check params passed to the preset, if any errors, should throw directly
* @param params
@ -45,9 +81,8 @@ export class IPreset {
* @param params
*/
constructor({ config, params } = {}) {
if (config) {
this._config = config;
}
super();
this._config = config;
}
/**
@ -117,10 +152,10 @@ export class IPreset {
return buffer;
}
// auto-generated methods, DO NOT implement them!
next(direction, buffer) {
// reserved methods, DO NOT overwrite them!
next(type, buffer) {
this.emit(`next_${type}`, buffer);
}
/**
@ -155,15 +190,17 @@ export class IPresetAddressing extends IPreset {
}
// reserved methods, DO NOT overwrite them!
/**
* DO NOT overwrite it!
* call it when target address was resolved on server side,
* call it when target address was resolved on server side
* @param host
* @param port
* @param callback
*/
resolveTargetAddress({ host, port }, callback) {
const action = { type: CONNECT_TO_REMOTE, payload: { host, port, onConnected: callback } };
this.emit('broadcast', this.name, action);
}
}

@ -1,23 +0,0 @@
import { Preset } from '../../../src';
test('Preset#constructor', () => {
expect(() => new Preset({ preset: { 'name': 'unknown-preset' } })).toThrow();
});
test('Preset#hasListener', () => {
const preset = new Preset({ preset: { 'name': 'ss-base' } });
expect(preset.hasListener('event')).toBe(false);
});
test('Preset#onPresetNext', () => {
const preset = new Preset({ preset: { 'name': 'ss-base' } });
preset.on('next_1', (arg) => {
expect(arg).toBe(null);
});
preset.onPresetNext(1, null);
});
test('Preset#getImplement', () => {
const preset = new Preset({ preset: { 'name': 'ss-base' } });
expect(preset.getImplement()).toBeDefined();
});