Fixed drop rate for subscriber listeners by adding buffering to channels (configurable)

This commit is contained in:
James Mills 2018-05-12 12:32:22 -07:00
джерело 61667a8521
коміт 36f899d02a
Не вдалося знайти GPG ключ що відповідає даному підпису
Ідентифікатор GPG ключа: AC4C014F1440EBD6
2 змінених файлів з 34 додано та 3 видалено

@ -16,6 +16,7 @@ func main() {
version bool
debug bool
bind string
bufferLength int
maxQueueSize int
maxPayloadSize int
)
@ -25,6 +26,7 @@ func main() {
flag.StringVar(&bind, "bind", ":8000", "interface and port to bind to")
flag.IntVar(&bufferLength, "buffer-length", msgbus.DefaultBufferLength, "buffer length")
flag.IntVar(&maxQueueSize, "max-queue-size", msgbus.DefaultMaxQueueSize, "maximum queue size")
flag.IntVar(&maxPayloadSize, "max-payload-size", msgbus.DefaultMaxPayloadSize, "maximum payload size")
@ -42,6 +44,7 @@ func main() {
}
opts := msgbus.Options{
BufferLength: bufferLength,
MaxQueueSize: maxQueueSize,
MaxPayloadSize: maxPayloadSize,
WithMetrics: true,

@ -22,6 +22,9 @@ const (
// DefaultMaxPayloadSize is the default maximum payload size
DefaultMaxPayloadSize = 4096 // 4KB
// DefaultBufferLength is the default buffer length for subscriber chans
DefaultBufferLength = 10
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
@ -59,17 +62,36 @@ type Message struct {
Created time.Time `json:"created"`
}
// ListenerOptions ...
type ListenerOptions struct {
BufferLength int
}
// Listeners ...
type Listeners struct {
sync.RWMutex
buflen int
ids map[string]bool
chs map[string]chan Message
}
// NewListeners ...
func NewListeners() *Listeners {
func NewListeners(options *ListenerOptions) *Listeners {
var (
bufferLength int
)
if options != nil {
bufferLength = options.BufferLength
} else {
bufferLength = DefaultBufferLength
}
return &Listeners{
buflen: bufferLength,
ids: make(map[string]bool),
chs: make(map[string]chan Message),
}
@ -89,7 +111,7 @@ func (ls *Listeners) Add(id string) chan Message {
defer ls.Unlock()
ls.ids[id] = true
ls.chs[id] = make(chan Message)
ls.chs[id] = make(chan Message, ls.buflen)
return ls.chs[id]
}
@ -148,6 +170,7 @@ func (ls *Listeners) NotifyAll(message Message) int {
// Options ...
type Options struct {
BufferLength int
MaxQueueSize int
MaxPayloadSize int
WithMetrics bool
@ -159,6 +182,7 @@ type MessageBus struct {
metrics *Metrics
bufferLength int
maxQueueSize int
maxPayloadSize int
@ -170,16 +194,19 @@ type MessageBus struct {
// New ...
func New(options *Options) *MessageBus {
var (
bufferLength int
maxQueueSize int
maxPayloadSize int
withMetrics bool
)
if options != nil {
bufferLength = options.BufferLength
maxQueueSize = options.MaxQueueSize
maxPayloadSize = options.MaxPayloadSize
withMetrics = options.WithMetrics
} else {
bufferLength = DefaultBufferLength
maxQueueSize = DefaultMaxQueueSize
maxPayloadSize = DefaultMaxPayloadSize
withMetrics = false
@ -274,6 +301,7 @@ func New(options *Options) *MessageBus {
return &MessageBus{
metrics: metrics,
bufferLength: bufferLength,
maxQueueSize: maxQueueSize,
maxPayloadSize: maxPayloadSize,
@ -409,7 +437,7 @@ func (mb *MessageBus) Subscribe(id, topic string) chan Message {
ls, ok := mb.listeners[t]
if !ok {
ls = NewListeners()
ls = NewListeners(&ListenerOptions{BufferLength: mb.bufferLength})
mb.listeners[t] = ls
}