6
0
mirror of https://git.mills.io/prologic/msgbus.git synced 2024-06-25 16:28:57 +00:00
prologic-msgbus/msgbus.go

442 lines
8.2 KiB
Go
Raw Normal View History

2017-06-03 15:16:17 +00:00
package msgbus
import (
"encoding/json"
2018-04-06 08:27:25 +00:00
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
2017-06-03 15:16:17 +00:00
"time"
2018-03-26 00:03:56 +00:00
log "github.com/sirupsen/logrus"
"github.com/gorilla/websocket"
2017-06-03 15:16:17 +00:00
)
2017-08-14 07:34:12 +00:00
const (
2018-03-29 17:44:02 +00:00
// DefaultTTL is the default TTL (time to live) for newly created topics
2017-08-14 07:34:12 +00:00
DefaultTTL = 60 * time.Second
)
// TODO: Make this configurable?
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// HandlerFunc ...
type HandlerFunc func(msg *Message) error
2017-08-14 07:34:12 +00:00
// Topic ...
type Topic struct {
Name string `json:"name"`
TTL time.Duration `json:"ttl"`
Sequence uint64 `json:"seq"`
Created time.Time `json:"created"`
}
2017-06-03 15:16:17 +00:00
// Message ...
type Message struct {
ID uint64 `json:"id"`
2017-08-14 07:34:12 +00:00
Topic *Topic `json:"topic"`
2017-06-03 15:16:17 +00:00
Payload []byte `json:"payload"`
Created time.Time `json:"created"`
}
// Listeners ...
type Listeners struct {
ids map[string]bool
chs map[string]chan Message
2017-06-03 15:16:17 +00:00
}
// NewListeners ...
func NewListeners() *Listeners {
return &Listeners{
ids: make(map[string]bool),
chs: make(map[string]chan Message),
2017-06-03 15:16:17 +00:00
}
}
// Add ...
func (ls *Listeners) Add(id string) chan Message {
2017-06-03 15:16:17 +00:00
ls.ids[id] = true
ls.chs[id] = make(chan Message)
2017-06-03 15:16:17 +00:00
return ls.chs[id]
}
// Remove ...
func (ls *Listeners) Remove(id string) {
delete(ls.ids, id)
close(ls.chs[id])
delete(ls.chs, id)
}
// Exists ...
func (ls *Listeners) Exists(id string) bool {
_, ok := ls.ids[id]
return ok
}
// Get ...
func (ls *Listeners) Get(id string) (chan Message, bool) {
2017-06-03 15:16:17 +00:00
ch, ok := ls.chs[id]
if !ok {
return nil, false
}
return ch, true
}
// NotifyAll ...
func (ls *Listeners) NotifyAll(message Message) {
2017-06-03 15:16:17 +00:00
for _, ch := range ls.chs {
ch <- message
}
}
2017-08-14 07:34:12 +00:00
// Options ...
type Options struct {
2018-05-02 08:24:30 +00:00
DefaultTTL time.Duration
WithMetrics bool
2017-08-14 07:34:12 +00:00
}
2017-06-03 15:16:17 +00:00
// MessageBus ...
type MessageBus struct {
sync.Mutex
2018-05-02 07:41:14 +00:00
metrics *Metrics
2017-08-14 07:34:12 +00:00
ttl time.Duration
topics map[string]*Topic
queues map[*Topic]*Queue
listeners map[*Topic]*Listeners
2017-06-03 15:16:17 +00:00
}
// NewMessageBus ...
2018-05-02 08:24:30 +00:00
func NewMessageBus(options *Options) *MessageBus {
var (
ttl time.Duration
withMetrics bool
)
2017-08-14 07:34:12 +00:00
if options != nil {
ttl = options.DefaultTTL
2018-05-02 08:24:30 +00:00
withMetrics = options.WithMetrics
2017-08-14 07:34:12 +00:00
} else {
ttl = DefaultTTL
2018-05-02 08:24:30 +00:00
withMetrics = false
}
var metrics *Metrics
if withMetrics {
metrics = NewMetrics("msgbus")
ctime := time.Now()
// server uptime counter
metrics.NewCounterFunc(
"server", "uptime",
"Number of nanoseconds the server has been running",
func() float64 {
return float64(time.Since(ctime).Nanoseconds())
},
)
// server requests counter
metrics.NewCounter(
"http", "requests",
"Number of total HTTP requests processed",
)
// bus messages counter
metrics.NewCounter(
"bus", "messages",
"Number of total messages exchanged",
)
// bus topics gauge
metrics.NewCounter(
"bus", "topics",
"Number of active topics registered",
)
// bus subscribers gauge
metrics.NewGauge(
"bus", "subscribers",
"Number of active subscribers",
)
2017-08-14 07:34:12 +00:00
}
2017-06-03 15:16:17 +00:00
return &MessageBus{
2018-05-02 07:41:14 +00:00
metrics: metrics,
2017-08-14 07:34:12 +00:00
ttl: ttl,
topics: make(map[string]*Topic),
queues: make(map[*Topic]*Queue),
listeners: make(map[*Topic]*Listeners),
2017-06-03 15:16:17 +00:00
}
}
// Len ...
func (mb *MessageBus) Len() int {
return len(mb.topics)
}
2018-05-02 08:24:30 +00:00
// Metrics ...
func (mb *MessageBus) Metrics() *Metrics {
return mb.metrics
}
2017-08-14 07:34:12 +00:00
// NewTopic ...
func (mb *MessageBus) NewTopic(topic string) *Topic {
mb.Lock()
defer mb.Unlock()
2017-08-14 07:34:12 +00:00
t, ok := mb.topics[topic]
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
mb.topics[topic] = t
2018-05-02 08:24:30 +00:00
if mb.metrics != nil {
mb.metrics.Counter("bus", "topics").Inc()
}
2017-08-14 07:34:12 +00:00
}
return t
}
2017-06-03 15:16:17 +00:00
// NewMessage ...
2017-08-14 07:34:12 +00:00
func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message {
defer func() {
topic.Sequence++
2018-05-02 08:24:30 +00:00
if mb.metrics != nil {
mb.metrics.Counter("bus", "messages").Inc()
}
2017-08-14 07:34:12 +00:00
}()
return Message{
ID: topic.Sequence,
Topic: topic,
2017-06-03 15:16:17 +00:00
Payload: payload,
Created: time.Now(),
}
}
// Put ...
2017-08-14 07:34:12 +00:00
func (mb *MessageBus) Put(message Message) {
2018-03-26 00:03:56 +00:00
log.Debugf(
"[msgbus] PUT id=%d topic=%s payload=%s",
message.ID, message.Topic.Name, message.Payload,
)
2017-08-14 07:34:12 +00:00
q, ok := mb.queues[message.Topic]
2017-06-03 15:16:17 +00:00
if !ok {
q = &Queue{}
2017-08-14 07:34:12 +00:00
mb.queues[message.Topic] = q
2017-06-03 15:16:17 +00:00
}
q.Push(message)
2017-08-14 07:34:12 +00:00
mb.NotifyAll(message)
2017-06-03 15:16:17 +00:00
}
// Get ...
2017-08-14 07:34:12 +00:00
func (mb *MessageBus) Get(topic *Topic) (Message, bool) {
2018-03-26 00:03:56 +00:00
log.Debugf("[msgbus] GET topic=%s", topic)
2017-08-14 07:34:12 +00:00
q, ok := mb.queues[topic]
2017-06-03 15:16:17 +00:00
if !ok {
return Message{}, false
2017-06-03 15:16:17 +00:00
}
m := q.Pop()
if m == nil {
return Message{}, false
2017-06-03 15:16:17 +00:00
}
return m.(Message), true
2017-06-03 15:16:17 +00:00
}
// NotifyAll ...
2017-08-14 07:34:12 +00:00
func (mb *MessageBus) NotifyAll(message Message) {
2018-03-26 00:03:56 +00:00
log.Debugf(
"[msgbus] NotifyAll id=%d topic=%s payload=%s",
message.ID, message.Topic.Name, message.Payload,
)
2017-08-14 07:34:12 +00:00
ls, ok := mb.listeners[message.Topic]
2017-06-03 15:16:17 +00:00
if !ok {
return
}
ls.NotifyAll(message)
}
// Subscribe ...
func (mb *MessageBus) Subscribe(id, topic string) chan Message {
2018-05-02 07:41:14 +00:00
defer func() {
2018-05-02 08:24:30 +00:00
if mb.metrics != nil {
mb.metrics.Gauge("bus", "subscribers").Inc()
}
2018-05-02 07:41:14 +00:00
}()
2018-03-26 00:03:56 +00:00
log.Debugf("[msgbus] Subscribe id=%s topic=%s", id, topic)
2017-08-14 07:34:12 +00:00
t, ok := mb.topics[topic]
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
mb.topics[topic] = t
}
ls, ok := mb.listeners[t]
2017-06-03 15:16:17 +00:00
if !ok {
ls = NewListeners()
2017-08-14 07:34:12 +00:00
mb.listeners[t] = ls
2017-06-03 15:16:17 +00:00
}
if ls.Exists(id) {
// Already verified th listener exists
ch, _ := ls.Get(id)
return ch
}
return ls.Add(id)
}
// Unsubscribe ...
func (mb *MessageBus) Unsubscribe(id, topic string) {
2018-05-02 07:41:14 +00:00
defer func() {
2018-05-02 08:24:30 +00:00
if mb.metrics != nil {
mb.metrics.Gauge("bus", "subscribers").Dec()
}
2018-05-02 07:41:14 +00:00
}()
2018-03-26 00:03:56 +00:00
log.Debugf("[msgbus] Unsubscribe id=%s topic=%s", id, topic)
2017-08-14 07:34:12 +00:00
t, ok := mb.topics[topic]
if !ok {
return
}
ls, ok := mb.listeners[t]
2017-06-03 15:16:17 +00:00
if !ok {
return
}
if ls.Exists(id) {
// Already verified th listener exists
ls.Remove(id)
}
}
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
2018-05-02 07:41:14 +00:00
defer func() {
2018-05-02 08:24:30 +00:00
if mb.metrics != nil {
mb.metrics.Counter("http", "requests").Inc()
}
2018-05-02 07:41:14 +00:00
}()
if r.Method == "GET" && (r.URL.Path == "/" || r.URL.Path == "") {
2018-04-06 08:27:25 +00:00
// XXX: guard with a mutex?
out, err := json.Marshal(mb.topics)
if err != nil {
2018-04-06 08:27:25 +00:00
msg := fmt.Sprintf("error serializing topics: %s", err)
http.Error(w, msg, http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(out)
return
}
topic := strings.TrimLeft(r.URL.Path, "/")
topic = strings.TrimRight(topic, "/")
t := mb.NewTopic(topic)
2017-08-14 07:34:12 +00:00
switch r.Method {
case "POST", "PUT":
body, err := ioutil.ReadAll(r.Body)
if err != nil {
2018-04-06 08:27:25 +00:00
msg := fmt.Sprintf("error reading payload: %s", err)
http.Error(w, msg, http.StatusBadRequest)
return
}
2018-04-06 08:27:25 +00:00
message := mb.NewMessage(t, body)
mb.Put(message)
msg := fmt.Sprintf(
"message successfully published to %s with sequence %d",
t.Name, t.Sequence,
)
w.Write([]byte(msg))
case "GET":
if r.Header.Get("Upgrade") == "websocket" {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Errorf("error creating websocket client: %s", err)
return
}
NewClient(conn, t, mb).Start()
return
}
2017-08-14 07:34:12 +00:00
message, ok := mb.Get(t)
if !ok {
2018-04-06 08:27:25 +00:00
msg := fmt.Sprintf("no messages enqueued for topic: %s", topic)
http.Error(w, msg, http.StatusNotFound)
return
}
out, err := json.Marshal(message)
if err != nil {
2018-04-06 08:27:25 +00:00
msg := fmt.Sprintf("error serializing message: %s", err)
http.Error(w, msg, http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(out)
case "DELETE":
http.Error(w, "Not Implemented", http.StatusNotImplemented)
2018-05-02 07:41:14 +00:00
// TODO: Implement deleting topics
}
}
2017-08-14 07:34:12 +00:00
// Client ...
type Client struct {
conn *websocket.Conn
2017-08-14 07:34:12 +00:00
topic *Topic
bus *MessageBus
id string
ch chan Message
}
2017-08-14 07:34:12 +00:00
// NewClient ...
func NewClient(conn *websocket.Conn, topic *Topic, bus *MessageBus) *Client {
return &Client{conn: conn, topic: topic, bus: bus}
}
// Start ...
func (c *Client) Start() {
c.id = c.conn.RemoteAddr().String()
c.ch = c.bus.Subscribe(c.id, c.topic.Name)
defer func() {
c.bus.Unsubscribe(c.id, c.topic.Name)
}()
var err error
for {
msg := <-c.ch
err = c.conn.WriteJSON(msg)
if err != nil {
// TODO: Retry? Put the message back in the queue?
// TODO: Bump a counter (Prometheus)
log.Errorf("Error sending msg to %s", c.id)
continue
}
}
}