Guard against concurrent map access to topics
This commit is contained in:
parent
f1ed5bd63c
commit
85f61cdddd
12
msgbus.go
12
msgbus.go
|
@ -5,6 +5,7 @@ import (
|
|||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
@ -91,6 +92,8 @@ type Options struct {
|
|||
|
||||
// MessageBus ...
|
||||
type MessageBus struct {
|
||||
sync.Mutex
|
||||
|
||||
ttl time.Duration
|
||||
topics map[string]*Topic
|
||||
queues map[*Topic]*Queue
|
||||
|
@ -122,6 +125,9 @@ func (mb *MessageBus) Len() int {
|
|||
|
||||
// NewTopic ...
|
||||
func (mb *MessageBus) NewTopic(topic string) *Topic {
|
||||
mb.Lock()
|
||||
defer mb.Unlock()
|
||||
|
||||
t, ok := mb.topics[topic]
|
||||
if !ok {
|
||||
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
|
||||
|
@ -248,11 +254,7 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
topic := strings.TrimLeft(r.URL.Path, "/")
|
||||
topic = strings.TrimRight(topic, "/")
|
||||
|
||||
t, ok := mb.topics[topic]
|
||||
if !ok {
|
||||
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
|
||||
mb.topics[topic] = t
|
||||
}
|
||||
t := mb.NewTopic(topic)
|
||||
|
||||
switch r.Method {
|
||||
case "POST", "PUT":
|
||||
|
|
Loading…
Reference in New Issue