Simplify a lot of the code and wrap up MessageBus into a Middleware that plays with standard net/http

This commit is contained in:
James Mills 2017-08-09 02:54:11 -07:00
parent ba697f2f97
commit 215e226f69
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
4 changed files with 89 additions and 120 deletions

View File

@ -78,7 +78,7 @@ func (c *Client) Handle(msg *msgbus.Message) {
func (c *Client) Pull(topic string) {
var msg *msgbus.Message
url := fmt.Sprintf("http://%s:%d/pull/%s", c.host, c.port, topic)
url := fmt.Sprintf("http://%s:%d/%s", c.host, c.port, topic)
client := &http.Client{}
for {
@ -168,7 +168,7 @@ func (s *Subscriber) Run() {
origin := "http://localhost/"
url := fmt.Sprintf(
"ws://%s:%d/push/%s",
"ws://%s:%d/%s",
s.client.host, s.client.port, s.topic,
)

View File

@ -3,6 +3,7 @@ package main
import (
"flag"
"log"
"net/http"
"github.com/prologic/msgbus"
)
@ -16,5 +17,6 @@ func init() {
}
func main() {
log.Fatal(msgbus.NewServer(nil).ListenAndServe(bind))
http.Handle("/", msgbus.NewMessageBus())
log.Fatal(http.ListenAndServe(bind, nil))
}

View File

@ -1,8 +1,15 @@
package msgbus
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"time"
"golang.org/x/net/websocket"
)
// Message ...
@ -173,3 +180,80 @@ func (mb *MessageBus) Unsubscribe(id, topic string) {
ls.Remove(id)
}
}
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" && (r.URL.Path == "/" || r.URL.Path == "") {
for topic, _ := range mb.topics {
w.Write([]byte(fmt.Sprintf("%s\n", topic)))
}
w.WriteHeader(http.StatusOK)
return
}
topic := strings.TrimLeft(r.URL.Path, "/")
topic = strings.TrimRight(topic, "/")
switch r.Method {
case "POST", "PUT":
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
mb.Put(topic, mb.NewMessage(body))
case "GET":
if r.Header.Get("Upgrade") == "websocket" {
NewClient(topic, mb).Handler().ServeHTTP(w, r)
return
}
message, ok := mb.Get(topic)
if !ok {
http.Error(w, "Not Found", http.StatusNotFound)
return
}
out, err := json.Marshal(message)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write(out)
case "DELETE":
http.Error(w, "Not Implemented", http.StatusNotImplemented)
}
}
type Client struct {
topic string
bus *MessageBus
id string
ch chan Message
}
func NewClient(topic string, bus *MessageBus) *Client {
return &Client{topic: topic, bus: bus}
}
func (c *Client) Handler() websocket.Handler {
return func(conn *websocket.Conn) {
c.id = conn.Request().RemoteAddr
c.ch = c.bus.Subscribe(c.id, c.topic)
defer func() {
c.bus.Unsubscribe(c.id, c.topic)
}()
var err error
for {
msg := <-c.ch
err = websocket.JSON.Send(conn, msg)
if err != nil {
log.Printf("Error sending msg to %s", c.id)
continue
}
}
}
}

117
server.go
View File

@ -1,117 +0,0 @@
package msgbus
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"golang.org/x/net/websocket"
"github.com/julienschmidt/httprouter"
)
// Server ...
type Server struct {
bus *MessageBus
router *httprouter.Router
}
// IndexHandler ...
func (s *Server) IndexHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
fmt.Fprint(w, "Welcome!\n")
}
}
// PushHandler ...
func (s *Server) PushHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
topic := p.ByName("topic")
websocket.Handler(s.PushWebSocketHandler(topic)).ServeHTTP(w, r)
}
}
// PullHandler ...
func (s *Server) PullHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
topic := p.ByName("topic")
message, ok := s.bus.Get(topic)
if !ok {
http.Error(w, "Not Found", http.StatusNotFound)
return
}
out, err := json.Marshal(message)
if err != nil {
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
w.Write(out)
}
}
// PutHandler ...
func (s *Server) PutHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
topic := p.ByName("topic")
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
s.bus.Put(topic, s.bus.NewMessage(body))
}
}
// PushWebSocketHandler ...
func (s *Server) PushWebSocketHandler(topic string) websocket.Handler {
return func(conn *websocket.Conn) {
id := conn.Request().RemoteAddr
ch := s.bus.Subscribe(id, topic)
defer func() {
s.bus.Unsubscribe(id, topic)
}()
var err error
for {
msg := <-ch
err = websocket.JSON.Send(conn, msg)
if err != nil {
log.Printf("Error sending msg to %s", id)
continue
}
}
}
}
func (s *Server) ListenAndServe(bind string) error {
return http.ListenAndServe(bind, s.router)
}
func (s *Server) initRoutes() {
s.router.GET("/", s.IndexHandler())
s.router.GET("/push/:topic", s.PushHandler())
s.router.GET("/pull/:topic", s.PullHandler())
s.router.PUT("/:topic", s.PutHandler())
}
// NewServer ...
func NewServer(bus *MessageBus) *Server {
if bus == nil {
bus = NewMessageBus()
}
server := &Server{
bus: bus,
router: httprouter.New(),
}
server.initRoutes()
return server
}