Add support for control messages and various client improvements

This commit is contained in:
James Mills 2018-05-03 00:57:52 -07:00
parent 4eec89138b
commit 32f92b201f
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
2 changed files with 112 additions and 22 deletions

View File

@ -204,8 +204,21 @@ func NewSubscriber(client *Client, topic string, handler msgbus.HandlerFunc) *Su
// Stop ...
func (s *Subscriber) Stop() {
log.Infof("shutting down ...")
close(s.errch)
s.errch = nil
s.stopch <- true
err := s.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Warnf("error sending close message: %s", err)
}
err = s.conn.Close()
if err != nil {
log.Warnf("error closing connection: %s", err)
}
}
// Run ...
@ -236,7 +249,6 @@ func (s *Subscriber) Run() {
case <-time.After(s.client.reconnect):
continue
case <-s.stopch:
log.Infof("shutting down ...")
s.conn.Close()
break
}
@ -251,8 +263,6 @@ func (s *Subscriber) Run() {
time.Sleep(s.client.reconnect)
}
case <-s.stopch:
log.Infof("shutting down ...")
s.conn.Close()
break
}
}
@ -265,7 +275,9 @@ func (s *Subscriber) Reader() {
for {
err := s.conn.ReadJSON(&msg)
if err != nil {
s.errch <- err
if s.errch != nil {
s.errch <- err
}
break
}
err = s.handler(msg)

114
msgbus.go
View File

@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"sync"
"time"
@ -17,12 +18,24 @@ import (
const (
// DefaultTTL is the default TTL (time to live) for newly created topics
DefaultTTL = 60 * time.Second
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 2048
)
// TODO: Make this configurable?
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
ReadBufferSize: 4096,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool {
return true
},
@ -441,30 +454,95 @@ func NewClient(conn *websocket.Conn, topic *Topic, bus *MessageBus) *Client {
return &Client{conn: conn, topic: topic, bus: bus}
}
// Start ...
func (c *Client) Start() {
c.id = c.conn.RemoteAddr().String()
c.ch = c.bus.Subscribe(c.id, c.topic.Name)
func (c *Client) readPump() {
defer func() {
c.bus.Unsubscribe(c.id, c.topic.Name)
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(message string) error {
log.Debugf("recieved pong from %s: %s", c.id, message)
t, err := strconv.ParseInt(message, 10, 64)
d := time.Duration(time.Now().UnixNano() - t)
if err != nil {
log.Warnf("garbage pong reply from %s: %s", c.id, err)
} else {
log.Debugf("pong latency of %s: %s", c.id, d)
}
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
log.Errorf("unexpected close error from %s: %v", c.id, err)
}
break
}
log.Debugf("recieved message from %s: %s", c.id, message)
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
var err error
for {
msg := <-c.ch
c.conn.SetWriteDeadline(time.Now().Add(time.Second * 1))
err = c.conn.WriteJSON(msg)
if err != nil {
// TODO: Retry? Put the message back in the queue?
log.Errorf("Error sending msg to %s", c.id)
if c.bus.metrics != nil {
c.bus.metrics.Counter("bus", "dropped").Inc()
select {
case msg, ok := <-c.ch:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The bus closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
} else {
if c.bus.metrics != nil {
c.bus.metrics.Counter("bus", "delivered").Inc()
err = c.conn.WriteJSON(msg)
if err != nil {
// TODO: Retry? Put the message back in the queue?
log.Errorf("Error sending msg to %s", c.id)
if c.bus.metrics != nil {
c.bus.metrics.Counter("bus", "dropped").Inc()
}
} else {
if c.bus.metrics != nil {
c.bus.metrics.Counter("bus", "delivered").Inc()
}
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
t := time.Now()
message := []byte(fmt.Sprintf("%d", t.UnixNano()))
if err := c.conn.WriteMessage(websocket.PingMessage, message); err != nil {
log.Errorf("error sending ping to %s: %s", c.id, err)
return
}
}
}
}
// Start ...
func (c *Client) Start() {
c.id = c.conn.RemoteAddr().String()
c.ch = c.bus.Subscribe(c.id, c.topic.Name)
c.conn.SetCloseHandler(func(code int, text string) error {
log.Debugf("recieved close from client %s", c.id)
c.bus.Unsubscribe(c.id, c.topic.Name)
message := websocket.FormatCloseMessage(code, "")
c.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second*1))
return nil
})
go c.writePump()
go c.readPump()
}