Fixed subscribers gauge
This commit is contained in:
parent
6ce5fb7104
commit
8da5d41e65
27
msgbus.go
27
msgbus.go
|
@ -352,13 +352,8 @@ func (mb *MessageBus) NotifyAll(message Message) {
|
|||
|
||||
// Subscribe ...
|
||||
func (mb *MessageBus) Subscribe(id, topic string) chan Message {
|
||||
defer func() {
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Gauge("bus", "subscribers").Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
log.Debugf("[msgbus] Subscribe id=%s topic=%s", id, topic)
|
||||
|
||||
t, ok := mb.topics[topic]
|
||||
if !ok {
|
||||
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
|
||||
|
@ -372,22 +367,22 @@ func (mb *MessageBus) Subscribe(id, topic string) chan Message {
|
|||
}
|
||||
|
||||
if ls.Exists(id) {
|
||||
// Already verified th listener exists
|
||||
// Already verified the listener exists
|
||||
ch, _ := ls.Get(id)
|
||||
return ch
|
||||
}
|
||||
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Gauge("bus", "subscribers").Inc()
|
||||
}
|
||||
|
||||
return ls.Add(id)
|
||||
}
|
||||
|
||||
// Unsubscribe ...
|
||||
func (mb *MessageBus) Unsubscribe(id, topic string) {
|
||||
defer func() {
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Gauge("bus", "subscribers").Dec()
|
||||
}
|
||||
}()
|
||||
|
||||
log.Debugf("[msgbus] Unsubscribe id=%s topic=%s", id, topic)
|
||||
|
||||
t, ok := mb.topics[topic]
|
||||
if !ok {
|
||||
return
|
||||
|
@ -399,8 +394,12 @@ func (mb *MessageBus) Unsubscribe(id, topic string) {
|
|||
}
|
||||
|
||||
if ls.Exists(id) {
|
||||
// Already verified th listener exists
|
||||
// Already verified the listener exists
|
||||
ls.Remove(id)
|
||||
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Gauge("bus", "subscribers").Dec()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue