Add extra counters for fetched, delivered and dropped messages
This commit is contained in:
parent
b1d7a8dad4
commit
4eec89138b
33
msgbus.go
33
msgbus.go
|
@ -159,6 +159,24 @@ func NewMessageBus(options *Options) *MessageBus {
|
|||
"Number of total messages exchanged",
|
||||
)
|
||||
|
||||
// bus dropped counter
|
||||
metrics.NewCounter(
|
||||
"bus", "dropped",
|
||||
"Number of messages dropped to subscribers",
|
||||
)
|
||||
|
||||
// bus delivered counter
|
||||
metrics.NewCounter(
|
||||
"bus", "delivered",
|
||||
"Number of messages delivered to subscribers",
|
||||
)
|
||||
|
||||
// bus fetched counter
|
||||
metrics.NewCounter(
|
||||
"bus", "fetched",
|
||||
"Number of messages fetched from clients",
|
||||
)
|
||||
|
||||
// bus topics gauge
|
||||
metrics.NewCounter(
|
||||
"bus", "topics",
|
||||
|
@ -255,6 +273,11 @@ func (mb *MessageBus) Get(topic *Topic) (Message, bool) {
|
|||
if m == nil {
|
||||
return Message{}, false
|
||||
}
|
||||
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Counter("bus", "fetched").Inc()
|
||||
}
|
||||
|
||||
return m.(Message), true
|
||||
}
|
||||
|
||||
|
@ -430,12 +453,18 @@ func (c *Client) Start() {
|
|||
|
||||
for {
|
||||
msg := <-c.ch
|
||||
c.conn.SetWriteDeadline(time.Now().Add(time.Second * 1))
|
||||
err = c.conn.WriteJSON(msg)
|
||||
if err != nil {
|
||||
// TODO: Retry? Put the message back in the queue?
|
||||
// TODO: Bump a counter (Prometheus)
|
||||
log.Errorf("Error sending msg to %s", c.id)
|
||||
continue
|
||||
if c.bus.metrics != nil {
|
||||
c.bus.metrics.Counter("bus", "dropped").Inc()
|
||||
}
|
||||
} else {
|
||||
if c.bus.metrics != nil {
|
||||
c.bus.metrics.Counter("bus", "delivered").Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue