This commit is contained in:
James Mills 2017-08-06 16:31:04 -07:00
джерело cee831d24a
коміт d4e65d0c45
Не вдалося знайти GPG ключ що відповідає даному підпису
Ідентифікатор GPG ключа: AC4C014F1440EBD6
3 змінених файлів з 21 додано та 15 видалено

@ -2,6 +2,7 @@ package main
import (
"flag"
"log"
"github.com/prologic/msgbus"
)
@ -15,5 +16,5 @@ func init() {
}
func main() {
msgbus.NewServer(bind).ListenAndServe()
log.Fatal(msgbus.NewServer().ListenAndServe(bind))
}

@ -34,7 +34,6 @@ func NewListeners() *Listeners {
// Add ...
func (ls *Listeners) Add(id string) chan *Message {
log.Printf("Listeners.Add(%s)\n", id)
ls.ids[id] = true
ls.chs[id] = make(chan *Message)
return ls.chs[id]
@ -42,7 +41,6 @@ func (ls *Listeners) Add(id string) chan *Message {
// Remove ...
func (ls *Listeners) Remove(id string) {
log.Printf("Listeners.Remove(%s)\n", id)
delete(ls.ids, id)
close(ls.chs[id])
@ -66,7 +64,6 @@ func (ls *Listeners) Get(id string) (chan *Message, bool) {
// NotifyAll ...
func (ls *Listeners) NotifyAll(message *Message) {
log.Printf("Listeners.NotifyAll(%v)\n", message)
for _, ch := range ls.chs {
ch <- message
}
@ -107,7 +104,10 @@ func (mb *MessageBus) NewMessage(payload []byte) *Message {
// Put ...
func (mb *MessageBus) Put(topic string, message *Message) {
log.Printf("MessageBus.Put(%s, %v)\n", topic, message)
log.Printf(
"[msgbus] PUT id=%d topic=%s payload=%s",
message.ID, topic, message.Payload,
)
q, ok := mb.topics[topic]
if !ok {
q = &Queue{}
@ -120,7 +120,7 @@ func (mb *MessageBus) Put(topic string, message *Message) {
// Get ...
func (mb *MessageBus) Get(topic string) (*Message, bool) {
log.Printf("MessageBus.Get(%s)\n", topic)
log.Printf("[msgbus] GET topic=%s", topic)
q, ok := mb.topics[topic]
if !ok {
return &Message{}, false
@ -135,7 +135,10 @@ func (mb *MessageBus) Get(topic string) (*Message, bool) {
// NotifyAll ...
func (mb *MessageBus) NotifyAll(topic string, message *Message) {
log.Printf("MessageBus.NotifyAll(%s, %v)\n", topic, message)
log.Printf(
"[msgbus] NotifyAll id=%d topic=%s payload=%s",
message.ID, topic, message.Payload,
)
ls, ok := mb.listeners[topic]
if !ok {
return
@ -145,7 +148,7 @@ func (mb *MessageBus) NotifyAll(topic string, message *Message) {
// Subscribe ...
func (mb *MessageBus) Subscribe(id, topic string) chan *Message {
log.Printf("MessageBus.Subscribe(%s, %s)\n", id, topic)
log.Printf("[msgbus] Subscribe id=%s topic=%s", id, topic)
ls, ok := mb.listeners[topic]
if !ok {
ls = NewListeners()
@ -162,7 +165,7 @@ func (mb *MessageBus) Subscribe(id, topic string) chan *Message {
// Unsubscribe ...
func (mb *MessageBus) Unsubscribe(id, topic string) {
log.Printf("MessageBus.Unsubscribe(%s, %s)\n", id, topic)
log.Printf("[msgbus] Unsubscribe id=%s topic=%s", id, topic)
ls, ok := mb.listeners[topic]
if !ok {
return

@ -14,7 +14,6 @@ import (
// Server ...
type Server struct {
bind string
bus *MessageBus
router *httprouter.Router
}
@ -101,8 +100,8 @@ func (s *Server) PushWebSocketHandler(topic string) websocket.Handler {
}
}
func (s *Server) ListenAndServe() {
log.Fatal(http.ListenAndServe(s.bind, s.router))
func (s *Server) ListenAndServe(bind string) error {
return http.ListenAndServe(bind, s.router)
}
func (s *Server) initRoutes() {
@ -113,10 +112,13 @@ func (s *Server) initRoutes() {
}
// NewServer ...
func NewServer(bind string) *Server {
func NewServer(bus *MessageBus) *Server {
if bus == nil {
bus = NewMessageBus()
}
server := &Server{
bind: bind,
bus: NewMessageBus(),
bus: bus,
router: httprouter.New(),
}