Per topic sequences with ttl

This commit is contained in:
James Mills 2017-08-14 00:34:12 -07:00
parent 017aa45218
commit dc7243ffa9
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
4 changed files with 113 additions and 47 deletions

View File

@ -70,7 +70,7 @@ func NewClient(host string, port int, options *Options) *Client {
func (c *Client) Handle(msg *msgbus.Message) {
log.Printf(
"[msgbus] received message: id=%d topic=%s payload=%s",
msg.ID, msg.Topic, msg.Payload,
msg.ID, msg.Topic.Name, msg.Payload,
)
}

View File

@ -4,19 +4,23 @@ import (
"flag"
"log"
"net/http"
"time"
"github.com/prologic/msgbus"
)
var (
bind string
ttl time.Duration
)
func init() {
flag.StringVar(&bind, "bind", ":8000", "interface and port to bind to")
flag.DurationVar(&ttl, "ttl", 60*time.Second, "default ttl")
}
func main() {
http.Handle("/", msgbus.NewMessageBus())
options := msgbus.Options{DefaultTTL: ttl}
http.Handle("/", msgbus.NewMessageBus(&options))
log.Fatal(http.ListenAndServe(bind, nil))
}

View File

@ -7,16 +7,18 @@ import (
)
func main() {
m := msgbus.NewMessageBus()
m.Put("foo", m.NewMessage([]byte("Hello World!")))
m := msgbus.NewMessageBus(nil)
t := m.NewTopic("foo")
m.Put(m.NewMessage(t, []byte("Hello World!")))
msg, ok := m.Get("foo")
msg, ok := m.Get(t)
if !ok {
log.Printf("No more messages in queue: foo")
} else {
log.Printf(
"Received message: id=%s topic=%s payload=%s",
msg.ID, msg.Topic, msg.Payload,
)
return
}
log.Printf(
"Received message: id=%s topic=%s payload=%s",
msg.ID, msg.Topic, msg.Payload,
)
}

134
msgbus.go
View File

@ -12,11 +12,24 @@ import (
"golang.org/x/net/websocket"
)
const (
DefaultTTL = 60 * time.Second
)
// Topic ...
type Topic struct {
Name string `json:"name"`
TTL time.Duration `json:"ttl"`
Sequence uint64 `json:"seq"`
Created time.Time `json:"created"`
}
// Message ...
type Message struct {
ID uint64 `json:"id"`
Topic string `json:"topic"`
Topic *Topic `json:"topic"`
Payload []byte `json:"payload"`
Expires time.Time `json:"expires"`
Created time.Time `json:"created"`
}
@ -71,18 +84,34 @@ func (ls *Listeners) NotifyAll(message Message) {
}
}
// Options ...
type Options struct {
DefaultTTL time.Duration
}
// MessageBus ...
type MessageBus struct {
seqid uint64
topics map[string]*Queue
listeners map[string]*Listeners
ttl time.Duration
topics map[string]*Topic
queues map[*Topic]*Queue
listeners map[*Topic]*Listeners
}
// NewMessageBus ...
func NewMessageBus() *MessageBus {
func NewMessageBus(options *Options) *MessageBus {
var ttl time.Duration
if options != nil {
ttl = options.DefaultTTL
} else {
ttl = DefaultTTL
}
return &MessageBus{
topics: make(map[string]*Queue),
listeners: make(map[string]*Listeners),
ttl: ttl,
topics: make(map[string]*Topic),
queues: make(map[*Topic]*Queue),
listeners: make(map[*Topic]*Listeners),
}
}
@ -91,41 +120,52 @@ func (mb *MessageBus) Len() int {
return len(mb.topics)
}
// NewTopic ...
func (mb *MessageBus) NewTopic(topic string) *Topic {
t, ok := mb.topics[topic]
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
mb.topics[topic] = t
}
return t
}
// NewMessage ...
func (mb *MessageBus) NewMessage(payload []byte) Message {
message := Message{
ID: mb.seqid,
func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message {
defer func() {
topic.Sequence++
}()
return Message{
ID: topic.Sequence,
Topic: topic,
Payload: payload,
Created: time.Now(),
}
mb.seqid++
return message
}
// Put ...
func (mb *MessageBus) Put(topic string, message Message) {
message.Topic = topic
func (mb *MessageBus) Put(message Message) {
log.Printf(
"[msgbus] PUT id=%d topic=%s payload=%s",
message.ID, message.Topic, message.Payload,
message.ID, message.Topic.Name, message.Payload,
)
q, ok := mb.topics[topic]
q, ok := mb.queues[message.Topic]
if !ok {
q = &Queue{}
mb.topics[topic] = q
mb.queues[message.Topic] = q
}
q.Push(message)
mb.NotifyAll(topic, message)
mb.NotifyAll(message)
}
// Get ...
func (mb *MessageBus) Get(topic string) (Message, bool) {
func (mb *MessageBus) Get(topic *Topic) (Message, bool) {
log.Printf("[msgbus] GET topic=%s", topic)
q, ok := mb.topics[topic]
q, ok := mb.queues[topic]
if !ok {
return Message{}, false
}
@ -138,12 +178,12 @@ func (mb *MessageBus) Get(topic string) (Message, bool) {
}
// NotifyAll ...
func (mb *MessageBus) NotifyAll(topic string, message Message) {
func (mb *MessageBus) NotifyAll(message Message) {
log.Printf(
"[msgbus] NotifyAll id=%d topic=%s payload=%s",
message.ID, message.Topic, message.Payload,
message.ID, message.Topic.Name, message.Payload,
)
ls, ok := mb.listeners[topic]
ls, ok := mb.listeners[message.Topic]
if !ok {
return
}
@ -153,10 +193,16 @@ func (mb *MessageBus) NotifyAll(topic string, message Message) {
// Subscribe ...
func (mb *MessageBus) Subscribe(id, topic string) chan Message {
log.Printf("[msgbus] Subscribe id=%s topic=%s", id, topic)
ls, ok := mb.listeners[topic]
t, ok := mb.topics[topic]
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
mb.topics[topic] = t
}
ls, ok := mb.listeners[t]
if !ok {
ls = NewListeners()
mb.listeners[topic] = ls
mb.listeners[t] = ls
}
if ls.Exists(id) {
@ -170,7 +216,12 @@ func (mb *MessageBus) Subscribe(id, topic string) chan Message {
// Unsubscribe ...
func (mb *MessageBus) Unsubscribe(id, topic string) {
log.Printf("[msgbus] Unsubscribe id=%s topic=%s", id, topic)
ls, ok := mb.listeners[topic]
t, ok := mb.topics[topic]
if !ok {
return
}
ls, ok := mb.listeners[t]
if !ok {
return
}
@ -183,7 +234,7 @@ func (mb *MessageBus) Unsubscribe(id, topic string) {
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" && (r.URL.Path == "/" || r.URL.Path == "") {
for topic, _ := range mb.topics {
for topic := range mb.topics {
w.Write([]byte(fmt.Sprintf("%s\n", topic)))
}
w.WriteHeader(http.StatusOK)
@ -193,6 +244,12 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
topic := strings.TrimLeft(r.URL.Path, "/")
topic = strings.TrimRight(topic, "/")
t, ok := mb.topics[topic]
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
mb.topics[topic] = t
}
switch r.Method {
case "POST", "PUT":
body, err := ioutil.ReadAll(r.Body)
@ -200,14 +257,14 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
mb.Put(topic, mb.NewMessage(body))
mb.Put(mb.NewMessage(t, body))
case "GET":
if r.Header.Get("Upgrade") == "websocket" {
NewClient(topic, mb).Handler().ServeHTTP(w, r)
NewClient(t, mb).Handler().ServeHTTP(w, r)
return
}
message, ok := mb.Get(topic)
message, ok := mb.Get(t)
if !ok {
http.Error(w, "Not Found", http.StatusNotFound)
@ -226,23 +283,26 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
// Client ...
type Client struct {
topic string
topic *Topic
bus *MessageBus
id string
ch chan Message
}
func NewClient(topic string, bus *MessageBus) *Client {
// NewClient ...
func NewClient(topic *Topic, bus *MessageBus) *Client {
return &Client{topic: topic, bus: bus}
}
// Handler ...
func (c *Client) Handler() websocket.Handler {
return func(conn *websocket.Conn) {
c.id = conn.Request().RemoteAddr
c.ch = c.bus.Subscribe(c.id, c.topic)
c.ch = c.bus.Subscribe(c.id, c.topic.Name)
defer func() {
c.bus.Unsubscribe(c.id, c.topic)
c.bus.Unsubscribe(c.id, c.topic.Name)
}()
var err error