Added basic metrics

This commit is contained in:
James Mills 2018-05-02 00:41:14 -07:00
parent 9f10fb1754
commit f20b6cd8b9
Non sono state trovate chiavi note per questa firma nel database
ID Chiave GPG: AC4C014F1440EBD6
4 ha cambiato i file con 271 aggiunte e 4 eliminazioni

Vedi File

@ -13,7 +13,6 @@ import (
)
func main() {
var (
version bool
debug bool
@ -40,8 +39,46 @@ func main() {
os.Exit(0)
}
metrics := msgbus.NewMetrics("msgbus")
ctime := time.Now()
// server uptime counter
metrics.NewCounterFunc(
"server", "uptime",
"Number of nanoseconds the server has been running",
func() float64 {
return float64(time.Since(ctime).Nanoseconds())
},
)
// server requests counter
metrics.NewCounter(
"http", "requests",
"Number of total HTTP requests processed",
)
// bus messages counter
metrics.NewCounter(
"bus", "messages",
"Number of total messages exchanged",
)
// bus topics gauge
metrics.NewCounter(
"bus", "topics",
"Number of active topics registered",
)
// bus subscribers gauge
metrics.NewGauge(
"bus", "subscribers",
"Number of active subscribers",
)
options := msgbus.Options{DefaultTTL: ttl}
http.Handle("/", msgbus.NewMessageBus(&options))
http.Handle("/", msgbus.NewMessageBus(metrics, &options))
http.Handle("/metrics", metrics.Handler())
log.Infof("msgbusd %s listening on %s", msgbus.FullVersion(), bind)
log.Fatal(http.ListenAndServe(bind, nil))
}

Vedi File

@ -7,7 +7,7 @@ import (
)
func main() {
m := msgbus.NewMessageBus(nil)
m := msgbus.NewMessageBus(msgbus.NewMetrics("msgbus"), nil)
t := m.NewTopic("foo")
m.Put(m.NewMessage(t, []byte("Hello World!")))

211
metrics.go Normal file
Vedi File

@ -0,0 +1,211 @@
package msgbus
import (
"fmt"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// DefObjectives ...
var DefObjectives = map[float64]float64{
0.50: 0.05,
0.90: 0.01,
0.95: 0.005,
0.99: 0.001,
}
// Metrics ...
type Metrics struct {
namespace string
metrics map[string]prometheus.Metric
gaugevecs map[string]*prometheus.GaugeVec
sumvecs map[string]*prometheus.SummaryVec
}
// NewMetrics ...
func NewMetrics(namespace string) *Metrics {
return &Metrics{
namespace: namespace,
metrics: make(map[string]prometheus.Metric),
gaugevecs: make(map[string]*prometheus.GaugeVec),
sumvecs: make(map[string]*prometheus.SummaryVec),
}
}
// NewCounter ...
func (m *Metrics) NewCounter(subsystem, name, help string) prometheus.Counter {
counter := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.namespace,
Subsystem: subsystem,
Name: name,
Help: help,
},
)
key := fmt.Sprintf("%s_%s", subsystem, name)
m.metrics[key] = counter
prometheus.MustRegister(counter)
return counter
}
// NewCounterFunc ...
func (m *Metrics) NewCounterFunc(subsystem, name, help string, f func() float64) prometheus.CounterFunc {
counter := prometheus.NewCounterFunc(
prometheus.CounterOpts{
Namespace: m.namespace,
Subsystem: subsystem,
Name: name,
Help: help,
},
f,
)
key := fmt.Sprintf("%s_%s", subsystem, name)
m.metrics[key] = counter
prometheus.MustRegister(counter)
return counter
}
// NewGauge ...
func (m *Metrics) NewGauge(subsystem, name, help string) prometheus.Gauge {
guage := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.namespace,
Subsystem: subsystem,
Name: name,
Help: help,
},
)
key := fmt.Sprintf("%s_%s", subsystem, name)
m.metrics[key] = guage
prometheus.MustRegister(guage)
return guage
}
// NewGaugeFunc ...
func (m *Metrics) NewGaugeFunc(subsystem, name, help string, f func() float64) prometheus.GaugeFunc {
guage := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: m.namespace,
Subsystem: subsystem,
Name: name,
Help: help,
},
f,
)
key := fmt.Sprintf("%s_%s", subsystem, name)
m.metrics[key] = guage
prometheus.MustRegister(guage)
return guage
}
// NewGaugeVec ...
func (m *Metrics) NewGaugeVec(subsystem, name, help string, labels []string) *prometheus.GaugeVec {
gauagevec := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: m.namespace,
Subsystem: subsystem,
Name: name,
Help: help,
},
labels,
)
key := fmt.Sprintf("%s_%s", subsystem, name)
m.gaugevecs[key] = gauagevec
prometheus.MustRegister(gauagevec)
return gauagevec
}
// NewSummary ...
func (m *Metrics) NewSummary(subsystem, name, help string) prometheus.Summary {
summary := prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: m.namespace,
Subsystem: subsystem,
Name: name,
Help: help,
Objectives: DefObjectives,
},
)
key := fmt.Sprintf("%s_%s", subsystem, name)
m.metrics[key] = summary
prometheus.MustRegister(summary)
return summary
}
// NewSummaryVec ...
func (m *Metrics) NewSummaryVec(subsystem, name, help string, labels []string) *prometheus.SummaryVec {
sumvec := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: m.namespace,
Subsystem: subsystem,
Name: name,
Help: help,
Objectives: DefObjectives,
},
labels,
)
key := fmt.Sprintf("%s_%s", subsystem, name)
m.sumvecs[key] = sumvec
prometheus.MustRegister(sumvec)
return sumvec
}
// Counter ...
func (m *Metrics) Counter(subsystem, name string) prometheus.Counter {
key := fmt.Sprintf("%s_%s", subsystem, name)
return m.metrics[key].(prometheus.Counter)
}
// Gauge ...
func (m *Metrics) Gauge(subsystem, name string) prometheus.Gauge {
key := fmt.Sprintf("%s_%s", subsystem, name)
return m.metrics[key].(prometheus.Gauge)
}
// GaugeVec ...
func (m *Metrics) GaugeVec(subsystem, name string) *prometheus.GaugeVec {
key := fmt.Sprintf("%s_%s", subsystem, name)
return m.gaugevecs[key]
}
// Summary ...
func (m *Metrics) Summary(subsystem, name string) prometheus.Summary {
key := fmt.Sprintf("%s_%s", subsystem, name)
return m.metrics[key].(prometheus.Summary)
}
// SummaryVec ...
func (m *Metrics) SummaryVec(subsystem, name string) *prometheus.SummaryVec {
key := fmt.Sprintf("%s_%s", subsystem, name)
return m.sumvecs[key]
}
// Handler ...
func (m *Metrics) Handler() http.Handler {
return promhttp.Handler()
}
// Run ...
func (m *Metrics) Run(addr string) {
http.Handle("/", m.Handler())
log.Infof("metrics endpoint listening on %s", addr)
log.Fatal(http.ListenAndServe(addr, nil))
}

Vedi File

@ -107,6 +107,8 @@ type Options struct {
type MessageBus struct {
sync.Mutex
metrics *Metrics
ttl time.Duration
topics map[string]*Topic
queues map[*Topic]*Queue
@ -114,7 +116,7 @@ type MessageBus struct {
}
// NewMessageBus ...
func NewMessageBus(options *Options) *MessageBus {
func NewMessageBus(metrics *Metrics, options *Options) *MessageBus {
var ttl time.Duration
if options != nil {
@ -124,6 +126,8 @@ func NewMessageBus(options *Options) *MessageBus {
}
return &MessageBus{
metrics: metrics,
ttl: ttl,
topics: make(map[string]*Topic),
queues: make(map[*Topic]*Queue),
@ -145,6 +149,7 @@ func (mb *MessageBus) NewTopic(topic string) *Topic {
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
mb.topics[topic] = t
mb.metrics.Counter("bus", "topics").Inc()
}
return t
}
@ -153,6 +158,7 @@ func (mb *MessageBus) NewTopic(topic string) *Topic {
func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message {
defer func() {
topic.Sequence++
mb.metrics.Counter("bus", "messages").Inc()
}()
return Message{
@ -211,6 +217,10 @@ func (mb *MessageBus) NotifyAll(message Message) {
// Subscribe ...
func (mb *MessageBus) Subscribe(id, topic string) chan Message {
defer func() {
mb.metrics.Gauge("bus", "subscribers").Inc()
}()
log.Debugf("[msgbus] Subscribe id=%s topic=%s", id, topic)
t, ok := mb.topics[topic]
if !ok {
@ -234,6 +244,10 @@ func (mb *MessageBus) Subscribe(id, topic string) chan Message {
// Unsubscribe ...
func (mb *MessageBus) Unsubscribe(id, topic string) {
defer func() {
mb.metrics.Gauge("bus", "subscribers").Dec()
}()
log.Debugf("[msgbus] Unsubscribe id=%s topic=%s", id, topic)
t, ok := mb.topics[topic]
if !ok {
@ -252,6 +266,10 @@ func (mb *MessageBus) Unsubscribe(id, topic string) {
}
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer func() {
mb.metrics.Counter("http", "requests").Inc()
}()
if r.Method == "GET" && (r.URL.Path == "/" || r.URL.Path == "") {
// XXX: guard with a mutex?
out, err := json.Marshal(mb.topics)
@ -319,6 +337,7 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write(out)
case "DELETE":
http.Error(w, "Not Implemented", http.StatusNotImplemented)
// TODO: Implement deleting topics
}
}