Added configurable bounded queues with a deque data structure with added metrics (#6)

Adds configurable bounded queues with a deque data structure with added metrics
This commit is contained in:
James Mills 2018-05-09 23:25:13 -07:00 committed by GitHub
parent c39cfce477
commit dd992ab5d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 244 additions and 142 deletions

View File

@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"os"
"time"
log "github.com/sirupsen/logrus"
@ -14,17 +13,20 @@ import (
func main() {
var (
version bool
debug bool
bind string
ttl time.Duration
version bool
debug bool
bind string
maxQueueSize int
maxPayloadSize int
)
flag.BoolVar(&version, "v", false, "display version information")
flag.BoolVar(&debug, "d", false, "enable debug logging")
flag.StringVar(&bind, "bind", ":8000", "interface and port to bind to")
flag.DurationVar(&ttl, "ttl", 60*time.Second, "default ttl")
flag.IntVar(&maxQueueSize, "max-queue-size", msgbus.DefaultMaxQueueSize, "maximum queue size")
flag.IntVar(&maxPayloadSize, "max-payload-size", msgbus.DefaultMaxPayloadSize, "maximum payload size")
flag.Parse()
@ -40,8 +42,9 @@ func main() {
}
opts := msgbus.Options{
DefaultTTL: ttl,
WithMetrics: true,
MaxQueueSize: maxQueueSize,
MaxPayloadSize: maxPayloadSize,
WithMetrics: true,
}
mb := msgbus.New(&opts)

View File

@ -16,8 +16,11 @@ import (
)
const (
// DefaultTTL is the default TTL (time to live) for newly created topics
DefaultTTL = 60 * time.Second
// DefaultMaxQueueSize is the default maximum size of queues
DefaultMaxQueueSize = 1000 // ~4MB per queue (1000 * 4KB)
// DefaultMaxPayloadSize is the default maximum payload size
DefaultMaxPayloadSize = 4096 // 4KB
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
@ -43,10 +46,9 @@ type HandlerFunc func(msg *Message) error
// Topic ...
type Topic struct {
Name string `json:"name"`
TTL time.Duration `json:"ttl"`
Sequence uint64 `json:"seq"`
Created time.Time `json:"created"`
Name string `json:"name"`
Sequence uint64 `json:"seq"`
Created time.Time `json:"created"`
}
// Message ...
@ -126,8 +128,9 @@ func (ls *Listeners) NotifyAll(message Message) int {
// Options ...
type Options struct {
DefaultTTL time.Duration
WithMetrics bool
MaxQueueSize int
MaxPayloadSize int
WithMetrics bool
}
// MessageBus ...
@ -136,7 +139,9 @@ type MessageBus struct {
metrics *Metrics
ttl time.Duration
maxQueueSize int
maxPayloadSize int
topics map[string]*Topic
queues map[*Topic]*Queue
listeners map[*Topic]*Listeners
@ -145,15 +150,18 @@ type MessageBus struct {
// New ...
func New(options *Options) *MessageBus {
var (
ttl time.Duration
withMetrics bool
maxQueueSize int
maxPayloadSize int
withMetrics bool
)
if options != nil {
ttl = options.DefaultTTL
maxQueueSize = options.MaxQueueSize
maxPayloadSize = options.MaxPayloadSize
withMetrics = options.WithMetrics
} else {
ttl = DefaultTTL
maxQueueSize = DefaultMaxQueueSize
maxPayloadSize = DefaultMaxPayloadSize
withMetrics = false
}
@ -221,10 +229,18 @@ func New(options *Options) *MessageBus {
"Number of active topics registered",
)
// bus queues gauge vec
// queue len gauge vec
metrics.NewGaugeVec(
"bus", "queues",
"Queue depths of each topic",
"queue", "len",
"Queue length of each topic",
[]string{"topic"},
)
// queue size gauge vec
// TODO: Implement this gauge by somehow getting queue sizes per topic!
metrics.NewGaugeVec(
"queue", "size",
"Queue length of each topic",
[]string{"topic"},
)
@ -238,7 +254,9 @@ func New(options *Options) *MessageBus {
return &MessageBus{
metrics: metrics,
ttl: ttl,
maxQueueSize: maxQueueSize,
maxPayloadSize: maxPayloadSize,
topics: make(map[string]*Topic),
queues: make(map[*Topic]*Queue),
listeners: make(map[*Topic]*Listeners),
@ -262,7 +280,7 @@ func (mb *MessageBus) NewTopic(topic string) *Topic {
t, ok := mb.topics[topic]
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
t = &Topic{Name: topic, Created: time.Now()}
mb.topics[topic] = t
if mb.metrics != nil {
mb.metrics.Counter("bus", "topics").Inc()
@ -298,13 +316,13 @@ func (mb *MessageBus) Put(message Message) {
t := message.Topic
q, ok := mb.queues[t]
if !ok {
q = &Queue{}
q = NewQueue(mb.maxQueueSize)
mb.queues[message.Topic] = q
}
q.Push(message)
if mb.metrics != nil {
mb.metrics.GaugeVec("bus", "queues").WithLabelValues(t.Name).Inc()
mb.metrics.GaugeVec("queue", "len").WithLabelValues(t.Name).Inc()
}
mb.NotifyAll(message)
@ -326,7 +344,7 @@ func (mb *MessageBus) Get(t *Topic) (Message, bool) {
if mb.metrics != nil {
mb.metrics.Counter("bus", "fetched").Inc()
mb.metrics.GaugeVec("bus", "queues").WithLabelValues(t.Name).Dec()
mb.metrics.GaugeVec("queue", "len").WithLabelValues(t.Name).Dec()
}
return m.(Message), true
@ -356,7 +374,7 @@ func (mb *MessageBus) Subscribe(id, topic string) chan Message {
t, ok := mb.topics[topic]
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
t = &Topic{Name: topic, Created: time.Now()}
mb.topics[topic] = t
}
@ -431,19 +449,30 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "POST", "PUT":
if r.ContentLength > int64(mb.maxPayloadSize) {
msg := "payload exceeds max-payload-size"
http.Error(w, msg, http.StatusRequestEntityTooLarge)
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
msg := fmt.Sprintf("error reading payload: %s", err)
http.Error(w, msg, http.StatusBadRequest)
return
}
if len(body) > mb.maxPayloadSize {
msg := "payload exceeds max-payload-size"
http.Error(w, msg, http.StatusRequestEntityTooLarge)
return
}
message := mb.NewMessage(t, body)
mb.Put(message)
msg := fmt.Sprintf(
"message successfully published to %s with sequence %d",
t.Name, t.Sequence,
t.Name, message.ID,
)
w.Write([]byte(msg))
case "GET":

174
queue.go
View File

@ -4,76 +4,150 @@ import (
"sync"
)
// QueueNode ...
type QueueNode struct {
data interface{}
next *QueueNode
}
// minCapacity is the smallest capacity that queue may have.
// Must be power of 2 for bitwise modulus: x % n == x & (n - 1).
const minCapacity = 2
// Queue ...
// Queue represents a single instance of a bounded queue data structure
// with access to both side. If maxlen is non-zero the queue is bounded
// otherwise unbounded.
type Queue struct {
sync.Mutex
sync.RWMutex
head *QueueNode
tail *QueueNode
size int
buf []interface{}
head int
tail int
count int
maxlen int
}
// Len ...
func (q *Queue) Len() int {
q.Lock()
defer q.Unlock()
return q.size
}
// Push ...
func (q *Queue) Push(item interface{}) {
q.Lock()
defer q.Unlock()
n := &QueueNode{data: item}
if q.tail == nil {
q.tail = n
q.head = n
} else {
q.tail.next = n
q.tail = n
// NewQueue creates a new instance of Queue with the provided maxlen
func NewQueue(maxlen ...int) *Queue {
if len(maxlen) > 0 {
return &Queue{maxlen: maxlen[0]}
}
q.size++
return &Queue{}
}
// Pop ...
// Len returns the number of elements currently stored in the queue.
func (q *Queue) Len() int {
q.RLock()
defer q.RUnlock()
return q.count
}
// MaxLen returns the maxlen of the queue
func (q *Queue) MaxLen() int {
q.RLock()
defer q.RUnlock()
return q.maxlen
}
// Size returns the current size of the queue
func (q *Queue) Size() int {
q.RLock()
defer q.RUnlock()
return len(q.buf)
}
// Empty returns true if the queue is empty false otherwise
func (q *Queue) Empty() bool {
q.RLock()
defer q.RUnlock()
return q.count == 0
}
// Full returns true if the queue is full false otherwise
func (q *Queue) Full() bool {
q.RLock()
defer q.RUnlock()
return q.count == q.maxlen
}
// Push appends an element to the back of the queue.
func (q *Queue) Push(elem interface{}) {
q.Lock()
defer q.Unlock()
q.growIfFull()
q.buf[q.tail] = elem
// Calculate new tail position.
q.tail = q.next(q.tail)
q.count++
}
// Pop removes and returns the element from the front of the queue.
func (q *Queue) Pop() interface{} {
q.Lock()
defer q.Unlock()
if q.head == nil {
if q.count <= 0 {
return nil
}
n := q.head
q.head = n.next
ret := q.buf[q.head]
q.buf[q.head] = nil
// Calculate new head position.
q.head = q.next(q.head)
q.count--
if q.head == nil {
q.tail = nil
}
q.size--
return n.data
q.shrinkIfExcess()
return ret
}
// Peek ...
// Peek returns the element at the front of the queue.
func (q *Queue) Peek() interface{} {
q.Lock()
defer q.Unlock()
q.RLock()
defer q.RUnlock()
n := q.head
if n == nil {
if q.count <= 0 {
return nil
}
return n.data
return q.buf[q.head]
}
// next returns the next buffer position wrapping around buffer.
func (q *Queue) next(i int) int {
return (i + 1) & (len(q.buf) - 1) // bitwise modulus
}
// growIfFull resizes up if the buffer is full.
func (q *Queue) growIfFull() {
if len(q.buf) == 0 {
q.buf = make([]interface{}, minCapacity)
return
}
if q.count == len(q.buf) && q.count < q.maxlen {
q.resize()
}
}
// shrinkIfExcess resize down if the buffer 1/4 full.
func (q *Queue) shrinkIfExcess() {
if len(q.buf) > minCapacity && (q.count<<2) == len(q.buf) {
q.resize()
}
}
// resize resizes the queue to fit exactly twice its current contents.
// This results in shrinking if the queue is less than half-full, or growing
// the queue when it is full.
func (q *Queue) resize() {
newBuf := make([]interface{}, q.count<<1)
if q.tail > q.head {
copy(newBuf, q.buf[q.head:q.tail])
} else {
n := copy(newBuf, q.buf[q.head:])
copy(newBuf[n:], q.buf[:q.tail])
}
q.head = 0
q.tail = q.count
q.buf = newBuf
}

View File

@ -6,93 +6,89 @@ import (
"github.com/stretchr/testify/assert"
)
func TestQueueLen(t *testing.T) {
func TestEmpty(t *testing.T) {
q := Queue{}
assert.Equal(t, q.Len(), 0)
assert.Zero(t, q.Len())
assert.True(t, q.Empty())
}
func TestQueuePush(t *testing.T) {
func TestSimple(t *testing.T) {
assert := assert.New(t)
q := Queue{}
q.Push(1)
assert.Equal(t, q.Len(), 1)
for i := 0; i < minCapacity; i++ {
q.Push(i)
}
for i := 0; i < minCapacity; i++ {
assert.Equal(q.Peek(), i)
assert.Equal(q.Pop(), i)
}
}
func TestQueuePop(t *testing.T) {
q := Queue{}
q.Push(1)
assert.Equal(t, q.Len(), 1)
assert.Equal(t, q.Pop(), 1)
assert.Equal(t, q.Len(), 0)
func TestMaxLen(t *testing.T) {
q := Queue{maxlen: minCapacity}
assert.Equal(t, q.MaxLen(), minCapacity)
}
func TestQueuePeek(t *testing.T) {
q := Queue{}
q.Push(1)
assert.Equal(t, q.Len(), 1)
assert.Equal(t, q.Peek(), 1)
assert.Equal(t, q.Len(), 1)
func TestFull(t *testing.T) {
q := Queue{maxlen: minCapacity}
for i := 0; i < minCapacity; i++ {
q.Push(i)
}
assert.True(t, q.Full())
}
func TestQueue(t *testing.T) {
func TestBufferWrap(t *testing.T) {
q := Queue{}
q.Push(1)
q.Push(2)
q.Push(3)
assert.Equal(t, q.Len(), 3)
assert.Equal(t, q.Peek(), 1)
assert.Equal(t, q.Pop(), 1)
assert.Equal(t, q.Len(), 2)
assert.Equal(t, q.Peek(), 2)
for i := 0; i < minCapacity; i++ {
q.Push(i)
}
assert.Equal(t, q.Pop(), 2)
assert.Equal(t, q.Len(), 1)
assert.Equal(t, q.Peek(), 3)
for i := 0; i < 3; i++ {
q.Pop()
q.Push(minCapacity + i)
}
assert.Equal(t, q.Pop(), 3)
assert.Equal(t, q.Len(), 0)
assert.Equal(t, q.Peek(), nil)
for i := 0; i < minCapacity; i++ {
assert.Equal(t, q.Peek().(int), i+3)
q.Pop()
}
}
func BenchmarkQueuePush(b *testing.B) {
func TestLen(t *testing.T) {
assert := assert.New(t)
q := Queue{}
assert.Zero(q.Len())
for i := 0; i < 1000; i++ {
q.Push(i)
assert.Equal(q.Len(), i+1)
}
for i := 0; i < 1000; i++ {
q.Pop()
assert.Equal(q.Len(), 1000-i-1)
}
}
func BenchmarkPush(b *testing.B) {
q := Queue{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Push(i)
}
}
func BenchmarkQueuePeekEmpty(b *testing.B) {
q := Queue{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Peek()
}
}
func BenchmarkQueuePeekNonEmpty(b *testing.B) {
q := Queue{}
q.Push(1)
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Peek()
}
}
func BenchmarkQueuePopEmpty(b *testing.B) {
q := Queue{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Pop()
}
}
func BenchmarkQueuePopNonEmpty(b *testing.B) {
func BenchmarkPushPop(b *testing.B) {
q := Queue{}
for i := 0; i < b.N; i++ {
q.Push(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Pop()
}