Add support for two-way ping/pong in both directions server<->client to avoid weird Docker Swarm IPVS/Overlay networking issues

See: gorilla/websocket#378
This commit is contained in:
James Mills 2018-05-07 16:13:57 -07:00
parent f316d5b3cd
commit fd4fec78bc
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6

View File

@ -9,6 +9,7 @@ import (
"net/url"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
@ -26,6 +27,18 @@ const (
// DefaultMaxReconnectInterval ...
DefaultMaxReconnectInterval = 64
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 2048
)
var (
@ -248,11 +261,18 @@ func (s *Subscriber) connect() {
log.Infof("successfully connected to %s", s.url)
conn.SetCloseHandler(func(code int, text string) error {
log.Debugf("recieved close from server %s: (%d) %s", s.url, code, text)
s.closeAndReconnect()
return nil
})
s.Lock()
s.conn = conn
s.Unlock()
go s.readLoop()
go s.writeLoop()
break
}
@ -261,6 +281,23 @@ func (s *Subscriber) connect() {
func (s *Subscriber) readLoop() {
var msg *msgbus.Message
s.conn.SetReadLimit(maxMessageSize)
s.conn.SetReadDeadline(time.Now().Add(pongWait))
s.conn.SetPongHandler(func(message string) error {
log.Debugf("recieved pong from %s: %s", s.url, message)
t, err := strconv.ParseInt(message, 10, 64)
d := time.Duration(time.Now().UnixNano() - t)
if err != nil {
log.Warnf("garbage pong reply from %s: %s", s.url, err)
} else {
log.Debugf("pong latency of %s: %s", s.url, d)
}
s.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
err := s.conn.ReadJSON(&msg)
if err != nil {
@ -276,6 +313,28 @@ func (s *Subscriber) readLoop() {
}
}
func (s *Subscriber) writeLoop() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
if s.conn != nil {
s.conn.Close()
}
}()
for {
<-ticker.C
s.conn.SetWriteDeadline(time.Now().Add(writeWait))
t := time.Now()
message := []byte(fmt.Sprintf("%d", t.UnixNano()))
if err := s.conn.WriteMessage(websocket.PingMessage, message); err != nil {
log.Errorf("error sending ping to %s: %s", s.url, err)
s.closeAndReconnect()
return
}
}
}
// Start ...
func (s *Subscriber) Start() {
go s.connect()