Add metrics for queue depths

This commit is contained in:
James Mills 2018-05-07 21:51:54 -07:00
parent 0d6386f98d
commit f169926a0c
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6

View File

@ -224,6 +224,13 @@ func NewMessageBus(options *Options) *MessageBus {
"Number of active topics registered",
)
// bus queues gauge vec
metrics.NewGaugeVec(
"bus", "queues",
"Queue depths of each topic",
[]string{"topic"},
)
// bus subscribers gauge
metrics.NewGauge(
"bus", "subscribers",
@ -291,21 +298,26 @@ func (mb *MessageBus) Put(message Message) {
message.ID, message.Topic.Name, message.Payload,
)
q, ok := mb.queues[message.Topic]
t := message.Topic
q, ok := mb.queues[t]
if !ok {
q = &Queue{}
mb.queues[message.Topic] = q
}
q.Push(message)
if mb.metrics != nil {
mb.metrics.GaugeVec("bus", "queues").WithLabelValues(t.Name).Inc()
}
mb.NotifyAll(message)
}
// Get ...
func (mb *MessageBus) Get(topic *Topic) (Message, bool) {
log.Debugf("[msgbus] GET topic=%s", topic)
func (mb *MessageBus) Get(t *Topic) (Message, bool) {
log.Debugf("[msgbus] GET topic=%s", t)
q, ok := mb.queues[topic]
q, ok := mb.queues[t]
if !ok {
return Message{}, false
}
@ -317,6 +329,7 @@ func (mb *MessageBus) Get(topic *Topic) (Message, bool) {
if mb.metrics != nil {
mb.metrics.Counter("bus", "fetched").Inc()
mb.metrics.GaugeVec("bus", "queues").WithLabelValues(t.Name).Dec()
}
return m.(Message), true