This commit is contained in:
James Mills 2018-05-02 01:24:30 -07:00
rodič b5c1b9aa07
revize 3dbba632c7
V databázi nebyl nalezen žádný známý klíč pro tento podpis
ID GPG klíče: AC4C014F1440EBD6
3 změnil soubory, kde provedl 78 přidání a 48 odebrání

Zobrazit soubor

@ -39,46 +39,14 @@ func main() {
os.Exit(0)
}
metrics := msgbus.NewMetrics("msgbus")
opts := msgbus.Options{
DefaultTTL: ttl,
WithMetrics: true,
}
mb := msgbus.NewMessageBus(&opts)
ctime := time.Now()
// server uptime counter
metrics.NewCounterFunc(
"server", "uptime",
"Number of nanoseconds the server has been running",
func() float64 {
return float64(time.Since(ctime).Nanoseconds())
},
)
// server requests counter
metrics.NewCounter(
"http", "requests",
"Number of total HTTP requests processed",
)
// bus messages counter
metrics.NewCounter(
"bus", "messages",
"Number of total messages exchanged",
)
// bus topics gauge
metrics.NewCounter(
"bus", "topics",
"Number of active topics registered",
)
// bus subscribers gauge
metrics.NewGauge(
"bus", "subscribers",
"Number of active subscribers",
)
options := msgbus.Options{DefaultTTL: ttl}
http.Handle("/", msgbus.NewMessageBus(metrics, &options))
http.Handle("/metrics", metrics.Handler())
http.Handle("/", mb)
http.Handle("/metrics", mb.Metrics().Handler())
log.Infof("msgbusd %s listening on %s", msgbus.FullVersion(), bind)
log.Fatal(http.ListenAndServe(bind, nil))
}

Zobrazit soubor

@ -7,7 +7,7 @@ import (
)
func main() {
m := msgbus.NewMessageBus(msgbus.NewMetrics("msgbus"), nil)
m := msgbus.NewMessageBus(nil)
t := m.NewTopic("foo")
m.Put(m.NewMessage(t, []byte("Hello World!")))

Zobrazit soubor

@ -100,7 +100,8 @@ func (ls *Listeners) NotifyAll(message Message) {
// Options ...
type Options struct {
DefaultTTL time.Duration
DefaultTTL time.Duration
WithMetrics bool
}
// MessageBus ...
@ -116,13 +117,59 @@ type MessageBus struct {
}
// NewMessageBus ...
func NewMessageBus(metrics *Metrics, options *Options) *MessageBus {
var ttl time.Duration
func NewMessageBus(options *Options) *MessageBus {
var (
ttl time.Duration
withMetrics bool
)
if options != nil {
ttl = options.DefaultTTL
withMetrics = options.WithMetrics
} else {
ttl = DefaultTTL
withMetrics = false
}
var metrics *Metrics
if withMetrics {
metrics = NewMetrics("msgbus")
ctime := time.Now()
// server uptime counter
metrics.NewCounterFunc(
"server", "uptime",
"Number of nanoseconds the server has been running",
func() float64 {
return float64(time.Since(ctime).Nanoseconds())
},
)
// server requests counter
metrics.NewCounter(
"http", "requests",
"Number of total HTTP requests processed",
)
// bus messages counter
metrics.NewCounter(
"bus", "messages",
"Number of total messages exchanged",
)
// bus topics gauge
metrics.NewCounter(
"bus", "topics",
"Number of active topics registered",
)
// bus subscribers gauge
metrics.NewGauge(
"bus", "subscribers",
"Number of active subscribers",
)
}
return &MessageBus{
@ -140,6 +187,11 @@ func (mb *MessageBus) Len() int {
return len(mb.topics)
}
// Metrics ...
func (mb *MessageBus) Metrics() *Metrics {
return mb.metrics
}
// NewTopic ...
func (mb *MessageBus) NewTopic(topic string) *Topic {
mb.Lock()
@ -149,7 +201,9 @@ func (mb *MessageBus) NewTopic(topic string) *Topic {
if !ok {
t = &Topic{Name: topic, TTL: mb.ttl, Created: time.Now()}
mb.topics[topic] = t
mb.metrics.Counter("bus", "topics").Inc()
if mb.metrics != nil {
mb.metrics.Counter("bus", "topics").Inc()
}
}
return t
}
@ -158,7 +212,9 @@ func (mb *MessageBus) NewTopic(topic string) *Topic {
func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message {
defer func() {
topic.Sequence++
mb.metrics.Counter("bus", "messages").Inc()
if mb.metrics != nil {
mb.metrics.Counter("bus", "messages").Inc()
}
}()
return Message{
@ -218,7 +274,9 @@ func (mb *MessageBus) NotifyAll(message Message) {
// Subscribe ...
func (mb *MessageBus) Subscribe(id, topic string) chan Message {
defer func() {
mb.metrics.Gauge("bus", "subscribers").Inc()
if mb.metrics != nil {
mb.metrics.Gauge("bus", "subscribers").Inc()
}
}()
log.Debugf("[msgbus] Subscribe id=%s topic=%s", id, topic)
@ -245,7 +303,9 @@ func (mb *MessageBus) Subscribe(id, topic string) chan Message {
// Unsubscribe ...
func (mb *MessageBus) Unsubscribe(id, topic string) {
defer func() {
mb.metrics.Gauge("bus", "subscribers").Dec()
if mb.metrics != nil {
mb.metrics.Gauge("bus", "subscribers").Dec()
}
}()
log.Debugf("[msgbus] Unsubscribe id=%s topic=%s", id, topic)
@ -267,7 +327,9 @@ func (mb *MessageBus) Unsubscribe(id, topic string) {
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer func() {
mb.metrics.Counter("http", "requests").Inc()
if mb.metrics != nil {
mb.metrics.Counter("http", "requests").Inc()
}
}()
if r.Method == "GET" && (r.URL.Path == "/" || r.URL.Path == "") {