Improved metrics
This commit is contained in:
parent
850ebb6f5f
commit
29053c2314
47
msgbus.go
47
msgbus.go
|
@ -74,6 +74,11 @@ func NewListeners() *Listeners {
|
|||
}
|
||||
}
|
||||
|
||||
// Length ...
|
||||
func (ls *Listeners) Length() int {
|
||||
return len(ls.ids)
|
||||
}
|
||||
|
||||
// Add ...
|
||||
func (ls *Listeners) Add(id string) chan Message {
|
||||
ls.ids[id] = true
|
||||
|
@ -105,18 +110,21 @@ func (ls *Listeners) Get(id string) (chan Message, bool) {
|
|||
}
|
||||
|
||||
// NotifyAll ...
|
||||
func (ls *Listeners) NotifyAll(message Message) {
|
||||
func (ls *Listeners) NotifyAll(message Message) int {
|
||||
i := 0
|
||||
for id, ch := range ls.chs {
|
||||
select {
|
||||
case ch <- message:
|
||||
log.Debugf("successfully published message to %s", message, id)
|
||||
i++
|
||||
default:
|
||||
// TODO: Bump a counter?
|
||||
// TODO: Drop this client?
|
||||
// TODO: Retry later?
|
||||
log.Warnf("cannot publish message to %s", message, id)
|
||||
log.Warnf("cannot publish message to %q", message, id)
|
||||
}
|
||||
}
|
||||
|
||||
return i
|
||||
}
|
||||
|
||||
// Options ...
|
||||
|
@ -170,8 +178,20 @@ func NewMessageBus(options *Options) *MessageBus {
|
|||
|
||||
// server requests counter
|
||||
metrics.NewCounter(
|
||||
"http", "requests",
|
||||
"Number of total HTTP requests processed",
|
||||
"server", "requests",
|
||||
"Number of total requests processed",
|
||||
)
|
||||
|
||||
// client latency summary
|
||||
metrics.NewSummary(
|
||||
"client", "latency_seconds",
|
||||
"Client latency in seconds",
|
||||
)
|
||||
|
||||
// client errors counter
|
||||
metrics.NewCounter(
|
||||
"client", "errors",
|
||||
"Number of errors publishing messages to clients",
|
||||
)
|
||||
|
||||
// bus messages counter
|
||||
|
@ -312,7 +332,12 @@ func (mb *MessageBus) NotifyAll(message Message) {
|
|||
if !ok {
|
||||
return
|
||||
}
|
||||
ls.NotifyAll(message)
|
||||
|
||||
n := ls.NotifyAll(message)
|
||||
if n != ls.Length() && mb.metrics != nil {
|
||||
log.Warnf("%d/%d subscribers notified", n, ls.Length())
|
||||
mb.metrics.Counter("bus", "dropped").Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe ...
|
||||
|
@ -372,7 +397,7 @@ func (mb *MessageBus) Unsubscribe(id, topic string) {
|
|||
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
defer func() {
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Counter("http", "requests").Inc()
|
||||
mb.metrics.Counter("server", "requests").Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -480,6 +505,12 @@ func (c *Client) readPump() {
|
|||
log.Debugf("pong latency of %s: %s", c.id, d)
|
||||
}
|
||||
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
|
||||
if c.bus.metrics != nil {
|
||||
v := c.bus.metrics.Summary("client", "latency_seconds")
|
||||
v.Observe(d.Seconds())
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
|
@ -519,7 +550,7 @@ func (c *Client) writePump() {
|
|||
// TODO: Retry? Put the message back in the queue?
|
||||
log.Errorf("Error sending msg to %s", c.id)
|
||||
if c.bus.metrics != nil {
|
||||
c.bus.metrics.Counter("bus", "dropped").Inc()
|
||||
c.bus.metrics.Counter("client", "errors").Inc()
|
||||
}
|
||||
} else {
|
||||
if c.bus.metrics != nil {
|
||||
|
|
Loading…
Reference in New Issue