implement EventLimiter for rate limiting of sending events

This commit is contained in:
Liam Stanley 2016-11-22 13:47:15 -05:00
parent 144843520a
commit 85420256ae
2 changed files with 110 additions and 1 deletions

@ -4,6 +4,8 @@
package girc
import "time"
// Sender is an interface for sending IRC messages.
type Sender interface {
// Send sends the given message and returns any errors.
@ -16,7 +18,81 @@ type serverSender struct {
writer *ircEncoder
}
// Send sends the specified event.
// Send write the specified event.
func (s serverSender) Send(event *Event) error {
return s.writer.Encode(event)
}
// EventLimiter is a custom ticker which lets you rate limit sending events
// to a function (e.g. Client.Send()), with optional burst support. See
// NewEventLimiter() for more information.
type EventLimiter struct {
tick *time.Ticker
throttle chan time.Time
fn func(*Event) error
}
// loop is used to read events from the internal time.Ticker.
func (el *EventLimiter) loop() {
// This should exit itself once el.Stop() is called.
for t := range el.tick.C {
select {
case el.throttle <- t:
default:
}
}
}
// Stop closes the ticker, and prevents re-use of the EventLimiter. Use this
// to prevent EventLimiter from keeping unnecessary pointers in memory.
func (el *EventLimiter) Stop() {
el.tick.Stop()
el.fn = nil
}
// Send is the subtitute function used to send the event the the previously
// specified send function.
//
// This WILL panic if Stop() was already called on the EventLimiter.
func (el *EventLimiter) Send(event *Event) error {
// Ensure nobody is sending to it once it's closed.
if el.fn == nil {
panic("attempted send on closed EventLimiter")
}
<-el.throttle
return el.fn(event)
}
// SendAll sends a list of events to Send(). SendAll will return the first
// error it gets when attempting to Send() to the predefined Send function.
// It will not attempt to continue processing the list of events.
func (el *EventLimiter) SendAll(events ...*Event) error {
for i := 0; i < len(events); i++ {
if err := el.Send(events[i]); err != nil {
return err
}
}
return nil
}
// NewEventLimiter returns a NewEventLimiter which can be used to rate limit
// events being sent to a Send function. This does support bursting a
// certain amount of messages if there are less than burstCount.
//
// Ensure that Stop() is called on the returned EventLimiter, otherwise
// the limiter may keep unwanted pointers to data in memory.
func NewEventLimiter(burstCount int, rate time.Duration, eventFunc func(event *Event) error) *EventLimiter {
limiter := &EventLimiter{
tick: time.NewTicker(rate),
throttle: make(chan time.Time, burstCount),
fn: eventFunc,
}
// Push the ticket into the background. If you want to stop this, simply
// use EventLimiter.Stop().
go limiter.loop()
return limiter
}

@ -7,6 +7,7 @@ package girc
import (
"bytes"
"testing"
"time"
)
func TestSender(t *testing.T) {
@ -21,3 +22,35 @@ func TestSender(t *testing.T) {
t.Errorf("serverSender{writer: newEncoder(bytes.Buffer)} = %v, want %v", bw, e.String()+"\r\n")
}
}
func TestEventLimiter(t *testing.T) {
events := []*Event{}
sendFunc := func(event *Event) error {
events = append(events, event)
return nil
}
limiter := NewEventLimiter(1, 150*time.Millisecond, sendFunc)
var e1, e2 *Event
go func() {
if err := limiter.SendAll(e1, e2); err != nil {
t.Fatalf("SendAll gave: %v", err)
}
}()
// Checking it immediately should yield 1 time.
if len(events) > 1 {
t.Fatalf("limiter has %v events, wanted 0 or 1", len(events))
}
time.Sleep(500 * time.Millisecond)
// It should now show a length of two.
if len(events) != 2 {
t.Fatalf("limiter has %v events, wanted 2", len(events))
}
limiter.Stop()
}