This commit is contained in:
James Mills 2022-04-02 22:16:57 +10:00
джерело ecfc0cc61e
коміт 76fad092bf
Не вдалося знайти GPG ключ що відповідає даному підпису
Ідентифікатор GPG ключа: AC4C014F1440EBD6
3 змінених файлів з 18 додано та 12 видалено

@ -35,7 +35,7 @@ cli:
-ldflags "-w \
-X $(shell go list).Version=$(VERSION) \
-X $(shell go list).Commit=$(COMMIT)" \
./cmd/msgbus/
./cmd/msgbus/...
server: generate
@$(GOCMD) build $(FLAGS) -tags "netgo static_build" -installsuffix netgo \

@ -42,7 +42,7 @@ func init() {
flag.PrintDefaults()
}
flag.BoolVarP(&debug, "debug", "D", false, "enable debug logging")
flag.BoolVarP(&debug, "debug", "d", false, "enable debug logging")
flag.StringVarP(&bind, "bind", "b", "0.0.0.0:8000", "[int]:<port> to bind to")
flag.BoolVarP(&version, "version", "v", false, "display version information")

@ -426,15 +426,17 @@ func (mb *MessageBus) Get(t *Topic) (Message, bool) {
// publish ...
func (mb *MessageBus) publish(message Message) {
ls, ok := mb.subscribers[message.Topic]
subs, ok := mb.subscribers[message.Topic]
if !ok {
log.Debugf("no subscribers for %s", message.Topic.Name)
return
}
n := ls.NotifyAll(message)
if n != ls.Len() && mb.metrics != nil {
log.Warnf("%d/%d subscribers notified", n, ls.Len())
mb.metrics.Counter("bus", "dropped").Add(float64(ls.Len() - n))
log.Debug("notifying subscribers")
n := subs.NotifyAll(message)
if n != subs.Len() && mb.metrics != nil {
log.Warnf("%d/%d subscribers notified", n, subs.Len())
mb.metrics.Counter("bus", "dropped").Add(float64(subs.Len() - n))
}
}
@ -506,14 +508,14 @@ func (mb *MessageBus) Unsubscribe(id, topic string) {
return
}
ls, ok := mb.subscribers[t]
subs, ok := mb.subscribers[t]
if !ok {
return
}
if ls.HasSubscriber(id) {
if subs.HasSubscriber(id) {
// Already verified the listener exists
ls.RemoveSubscriber(id)
subs.RemoveSubscriber(id)
if mb.metrics != nil {
mb.metrics.Gauge("bus", "subscribers").Dec()
@ -546,10 +548,12 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
topic := strings.TrimLeft(r.URL.Path, "/")
topic = strings.TrimRight(topic, "/")
log.Debugf("r.URL.Path: %s", r.URL.Path)
topic := strings.Trim(r.URL.Path, "/")
t := mb.NewTopic(topic)
log.Debugf("request for topic %#v", t.Name)
switch r.Method {
case "POST", "PUT":
@ -610,6 +614,8 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
i := SafeParseInt(r.URL.Query().Get("index"), -1)
log.Debugf("new subscriber for %s from %s", t.Name, r.RemoteAddr)
NewClient(conn, t, i, mb).Start()
return
}