mirror of
https://git.mills.io/prologic/msgbus.git
synced 2024-06-28 09:41:43 +00:00
![James Mills](/assets/img/avatar_default.png)
Closes #20 Co-authored-by: James Mills <prologic@shortcircuit.net.au> Co-authored-by: xuu <xuu@noreply@mills.io> Reviewed-on: https://git.mills.io/prologic/msgbus/pulls/26
173 lines
3.4 KiB
Go
173 lines
3.4 KiB
Go
package msgbus
|
|
|
|
import (
|
|
sync "github.com/sasha-s/go-deadlock"
|
|
)
|
|
|
|
// minCapacity is the smallest capacity that queue may have.
|
|
// Must be power of 2 for bitwise modulus: x % n == x & (n - 1).
|
|
const minCapacity = 2
|
|
|
|
// Queue represents a single instance of a bounded queue data structure
|
|
// with access to both side. If maxlen is non-zero the queue is bounded
|
|
// otherwise unbounded.
|
|
type Queue struct {
|
|
sync.RWMutex
|
|
|
|
buf []interface{}
|
|
head int
|
|
tail int
|
|
count int
|
|
maxlen int
|
|
}
|
|
|
|
// NewQueue creates a new instance of Queue with the provided maxlen
|
|
func NewQueue(maxlen ...int) *Queue {
|
|
if len(maxlen) > 0 {
|
|
return &Queue{maxlen: maxlen[0]}
|
|
}
|
|
return &Queue{}
|
|
}
|
|
|
|
// Len returns the number of elements currently stored in the queue.
|
|
func (q *Queue) Len() int {
|
|
q.RLock()
|
|
defer q.RUnlock()
|
|
|
|
return q.count
|
|
}
|
|
|
|
// MaxLen returns the maxlen of the queue
|
|
func (q *Queue) MaxLen() int {
|
|
q.RLock()
|
|
defer q.RUnlock()
|
|
|
|
return q.maxlen
|
|
}
|
|
|
|
// Size returns the current size of the queue
|
|
func (q *Queue) Size() int {
|
|
q.RLock()
|
|
defer q.RUnlock()
|
|
|
|
return len(q.buf)
|
|
}
|
|
|
|
// Empty returns true if the queue is empty false otherwise
|
|
func (q *Queue) Empty() bool {
|
|
q.RLock()
|
|
defer q.RUnlock()
|
|
|
|
return q.count == 0
|
|
}
|
|
|
|
// Full returns true if the queue is full false otherwise
|
|
func (q *Queue) Full() bool {
|
|
q.RLock()
|
|
defer q.RUnlock()
|
|
|
|
return q.count == q.maxlen
|
|
}
|
|
|
|
// Push appends an element to the back of the queue.
|
|
func (q *Queue) Push(elem interface{}) {
|
|
q.Lock()
|
|
defer q.Unlock()
|
|
|
|
q.growIfFull()
|
|
|
|
q.buf[q.tail] = elem
|
|
// Calculate new tail position.
|
|
q.tail = q.next(q.tail)
|
|
q.count++
|
|
}
|
|
|
|
// Pop removes and returns the element from the front of the queue.
|
|
func (q *Queue) Pop() interface{} {
|
|
q.Lock()
|
|
defer q.Unlock()
|
|
|
|
if q.count <= 0 {
|
|
return nil
|
|
}
|
|
|
|
ret := q.buf[q.head]
|
|
q.buf[q.head] = nil
|
|
// Calculate new head position.
|
|
q.head = q.next(q.head)
|
|
q.count--
|
|
|
|
q.shrinkIfExcess()
|
|
return ret
|
|
}
|
|
|
|
// Peek returns the element at the front of the queue.
|
|
func (q *Queue) Peek() interface{} {
|
|
q.RLock()
|
|
defer q.RUnlock()
|
|
|
|
if q.count <= 0 {
|
|
return nil
|
|
}
|
|
return q.buf[q.head]
|
|
}
|
|
|
|
// ForEach applys the function `f` over each item in the queue for read-only
|
|
// access into the queue in O(n) time for indexining into the queue.
|
|
func (q *Queue) ForEach(f func(elem interface{}) error) error {
|
|
q.Lock()
|
|
defer q.Unlock()
|
|
|
|
if q.count <= 0 {
|
|
return nil
|
|
}
|
|
|
|
for i := 0; i < q.count; i++ {
|
|
if err := f(q.buf[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// next returns the next buffer position wrapping around buffer.
|
|
func (q *Queue) next(i int) int {
|
|
return (i + 1) & (len(q.buf) - 1) // bitwise modulus
|
|
}
|
|
|
|
// growIfFull resizes up if the buffer is full.
|
|
func (q *Queue) growIfFull() {
|
|
if len(q.buf) == 0 {
|
|
q.buf = make([]interface{}, minCapacity)
|
|
return
|
|
}
|
|
if q.count == len(q.buf) && (q.maxlen == 0 || q.count < q.maxlen) {
|
|
q.resize()
|
|
}
|
|
}
|
|
|
|
// shrinkIfExcess resize down if the buffer 1/4 full.
|
|
func (q *Queue) shrinkIfExcess() {
|
|
if len(q.buf) > minCapacity && (q.count<<2) == len(q.buf) {
|
|
q.resize()
|
|
}
|
|
}
|
|
|
|
// resize resizes the queue to fit exactly twice its current contents.
|
|
// This results in shrinking if the queue is less than half-full, or growing
|
|
// the queue when it is full.
|
|
func (q *Queue) resize() {
|
|
newBuf := make([]interface{}, q.count<<1)
|
|
if q.tail > q.head {
|
|
copy(newBuf, q.buf[q.head:q.tail])
|
|
} else {
|
|
n := copy(newBuf, q.buf[q.head:])
|
|
copy(newBuf[n:], q.buf[:q.tail])
|
|
}
|
|
|
|
q.head = 0
|
|
q.tail = q.count
|
|
q.buf = newBuf
|
|
}
|