6
0
mirror of https://git.mills.io/prologic/msgbus.git synced 2024-06-25 16:28:57 +00:00
prologic-msgbus/msgbus.go

328 lines
6.0 KiB
Go
Raw Normal View History

2017-06-03 15:16:17 +00:00
package msgbus
import (
"encoding/json"
"io/ioutil"
"net/http"
"strings"
"sync"
2017-06-03 15:16:17 +00:00
"time"
2018-03-26 00:03:56 +00:00
log "github.com/sirupsen/logrus"
"golang.org/x/net/websocket"
2017-06-03 15:16:17 +00:00
)
2017-08-14 07:34:12 +00:00
const (
2018-03-29 17:44:02 +00:00
// DefaultTTL is the default TTL (time to live) for newly created topics
2017-08-14 07:34:12 +00:00
DefaultTTL = 60 * time.Second
)
// Topic ...
type Topic struct {
Name string `json:"name"`
TTL time.Duration `json:"ttl"`
Sequence uint64 `json:"seq"`
Created time.Time `json:"created"`
}
2017-06-03 15:16:17 +00:00
// Message ...
type Message struct {
ID uint64 `json:"id"`
2017-08-14 07:34:12 +00:00
Topic *Topic `json:"topic"`
2017-06-03 15:16:17 +00:00
Payload []byte `json:"payload"`
Created time.Time `json:"created"`
}
// Listeners ...
type Listeners struct {
ids map[string]bool
chs map[string]chan Message
2017-06-03 15:16:17 +00:00
}
// NewListeners ...
func NewListeners() *Listeners {
return &Listeners{
ids: make(map[string]bool),
chs: make(map[string]chan Message),
2017-06-03 15:16:17 +00:00
}
}
// Add ...
func (ls *Listeners) Add(id string) chan Message {
2017-06-03 15:16:17 +00:00
ls.ids[id] = true
ls.chs[id] = make(chan Message)
2017-06-03 15:16:17 +00:00
return ls.chs[id]
}
// Remove ...
func (ls *Listeners) Remove(id string) {
delete(ls.ids, id)
close(ls.chs[id])
delete(ls.chs, id)
}
// Exists ...
func (ls *Listeners) Exists(id string) bool {
_, ok := ls.ids[id]
return ok
}
// Get ...
func (ls *Listeners) Get(id string) (chan Message, bool) {
2017-06-03 15:16:17 +00:00
ch, ok := ls.chs[id]
if !ok {
return nil, false
}
return ch, true
}
// NotifyAll ...
func (ls *Listeners) NotifyAll(message Message) {
2017-06-03 15:16:17 +00:00
for _, ch := range ls.chs {
ch <- message
}
}
2017-08-14 07:34:12 +00:00
// Options ...
type Options struct {
DefaultTTL time.Duration
}
2017-06-03 15:16:17 +00:00
// MessageBus ...
type MessageBus struct {
sync.Mutex
2017-08-14 07:34:12 +00:00
ttl time.Duration
topics map[string]*Topic
queues map[*Topic]*Queue
listeners map[*Topic]*Listeners
2017-06-03 15:16:17 +00:00
}
// NewMessageBus ...
2017-08-14 07:34:12 +00:00
func NewMessageBus(options *Options) *MessageBus {
var ttl time.Duration
if options != nil {
ttl = options.DefaultTTL
} else {
ttl = DefaultTTL
}
2017-06-03 15:16:17 +00:00
return &MessageBus{
2017-08-14 07:34:12 +00:00
ttl: ttl,
topics: make(map[string]*Topic),
queues: make(map[*Topic]*Queue),
listeners: make(map[*Topic]*Listeners),
2017-06-03 15:16:17 +00:00
}
}
// Len ...
func (mb *MessageBus) Len() int {
return len(mb.topics)
}
2017-08-14 07:34:12 +00:00
// NewTopic ...
func (mb *MessageBus) NewTopic(topic string) *Topic {
mb.Lock()
defer mb.Unlock()
2017-08-14 07:34:12 +00:00
t, ok := mb.topics[topic]
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
mb.topics[topic] = t
}
return t
}
2017-06-03 15:16:17 +00:00
// NewMessage ...
2017-08-14 07:34:12 +00:00
func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message {
defer func() {
topic.Sequence++
}()
return Message{
ID: topic.Sequence,
Topic: topic,
2017-06-03 15:16:17 +00:00
Payload: payload,
Created: time.Now(),
}
}
// Put ...
2017-08-14 07:34:12 +00:00
func (mb *MessageBus) Put(message Message) {
2018-03-26 00:03:56 +00:00
log.Debugf(
"[msgbus] PUT id=%d topic=%s payload=%s",
message.ID, message.Topic.Name, message.Payload,
)
2017-08-14 07:34:12 +00:00
q, ok := mb.queues[message.Topic]
2017-06-03 15:16:17 +00:00
if !ok {
q = &Queue{}
2017-08-14 07:34:12 +00:00
mb.queues[message.Topic] = q
2017-06-03 15:16:17 +00:00
}
q.Push(message)
2017-08-14 07:34:12 +00:00
mb.NotifyAll(message)
2017-06-03 15:16:17 +00:00
}
// Get ...
2017-08-14 07:34:12 +00:00
func (mb *MessageBus) Get(topic *Topic) (Message, bool) {
2018-03-26 00:03:56 +00:00
log.Debugf("[msgbus] GET topic=%s", topic)
2017-08-14 07:34:12 +00:00
q, ok := mb.queues[topic]
2017-06-03 15:16:17 +00:00
if !ok {
return Message{}, false
2017-06-03 15:16:17 +00:00
}
m := q.Pop()
if m == nil {
return Message{}, false
2017-06-03 15:16:17 +00:00
}
return m.(Message), true
2017-06-03 15:16:17 +00:00
}
// NotifyAll ...
2017-08-14 07:34:12 +00:00
func (mb *MessageBus) NotifyAll(message Message) {
2018-03-26 00:03:56 +00:00
log.Debugf(
"[msgbus] NotifyAll id=%d topic=%s payload=%s",
message.ID, message.Topic.Name, message.Payload,
)
2017-08-14 07:34:12 +00:00
ls, ok := mb.listeners[message.Topic]
2017-06-03 15:16:17 +00:00
if !ok {
return
}
ls.NotifyAll(message)
}
// Subscribe ...
func (mb *MessageBus) Subscribe(id, topic string) chan Message {
2018-03-26 00:03:56 +00:00
log.Debugf("[msgbus] Subscribe id=%s topic=%s", id, topic)
2017-08-14 07:34:12 +00:00
t, ok := mb.topics[topic]
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
mb.topics[topic] = t
}
ls, ok := mb.listeners[t]
2017-06-03 15:16:17 +00:00
if !ok {
ls = NewListeners()
2017-08-14 07:34:12 +00:00
mb.listeners[t] = ls
2017-06-03 15:16:17 +00:00
}
if ls.Exists(id) {
// Already verified th listener exists
ch, _ := ls.Get(id)
return ch
}
return ls.Add(id)
}
// Unsubscribe ...
func (mb *MessageBus) Unsubscribe(id, topic string) {
2018-03-26 00:03:56 +00:00
log.Debugf("[msgbus] Unsubscribe id=%s topic=%s", id, topic)
2017-08-14 07:34:12 +00:00
t, ok := mb.topics[topic]
if !ok {
return
}
ls, ok := mb.listeners[t]
2017-06-03 15:16:17 +00:00
if !ok {
return
}
if ls.Exists(id) {
// Already verified th listener exists
ls.Remove(id)
}
}
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" && (r.URL.Path == "/" || r.URL.Path == "") {
out, err := json.Marshal(mb.topics)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(out)
return
}
topic := strings.TrimLeft(r.URL.Path, "/")
topic = strings.TrimRight(topic, "/")
t := mb.NewTopic(topic)
2017-08-14 07:34:12 +00:00
switch r.Method {
case "POST", "PUT":
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
2017-08-14 07:34:12 +00:00
mb.Put(mb.NewMessage(t, body))
case "GET":
if r.Header.Get("Upgrade") == "websocket" {
2017-08-14 07:34:12 +00:00
NewClient(t, mb).Handler().ServeHTTP(w, r)
return
}
2017-08-14 07:34:12 +00:00
message, ok := mb.Get(t)
if !ok {
http.Error(w, "Not Found", http.StatusNotFound)
return
}
out, err := json.Marshal(message)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(out)
case "DELETE":
http.Error(w, "Not Implemented", http.StatusNotImplemented)
}
}
2017-08-14 07:34:12 +00:00
// Client ...
type Client struct {
2017-08-14 07:34:12 +00:00
topic *Topic
bus *MessageBus
id string
ch chan Message
}
2017-08-14 07:34:12 +00:00
// NewClient ...
func NewClient(topic *Topic, bus *MessageBus) *Client {
return &Client{topic: topic, bus: bus}
}
2017-08-14 07:34:12 +00:00
// Handler ...
func (c *Client) Handler() websocket.Handler {
return func(conn *websocket.Conn) {
c.id = conn.Request().RemoteAddr
2017-08-14 07:34:12 +00:00
c.ch = c.bus.Subscribe(c.id, c.topic.Name)
defer func() {
2017-08-14 07:34:12 +00:00
c.bus.Unsubscribe(c.id, c.topic.Name)
}()
var err error
for {
msg := <-c.ch
err = websocket.JSON.Send(conn, msg)
if err != nil {
2018-03-03 19:58:58 +00:00
// TODO: Retry? Put the message back in the queue?
2018-03-26 00:03:56 +00:00
log.Errorf("Error sending msg to %s", c.id)
continue
}
}
}
}