Fixed concurrent websocket conn access from writeLoop

This commit is contained in:
James Mills 2018-05-12 11:14:18 -07:00
rodič 17a205e7f7
revize c785a8d710
V databázi nebyl nalezen žádný známý klíč pro tento podpis
ID GPG klíče: AC4C014F1440EBD6

Zobrazit soubor

@ -197,6 +197,8 @@ type Subscriber struct {
url string
reconnectInterval time.Duration
maxReconnectInterval time.Duration
closeWriteChan chan bool
}
// NewSubscriber ...
@ -228,10 +230,13 @@ func NewSubscriber(client *Client, topic string, handler msgbus.HandlerFunc) *Su
url: url,
reconnectInterval: client.reconnectInterval,
maxReconnectInterval: client.maxReconnectInterval,
closeWriteChan: make(chan bool, 1),
}
}
func (s *Subscriber) closeAndReconnect() {
s.closeWriteChan <- true
s.conn.Close()
go s.connect()
}
@ -260,6 +265,7 @@ func (s *Subscriber) connect() {
s.Lock()
s.conn = conn
s.closeWriteChan = make(chan bool, 1)
s.Unlock()
go s.readLoop()
@ -313,13 +319,17 @@ func (s *Subscriber) writeLoop() {
}()
for {
<-ticker.C
s.conn.SetWriteDeadline(time.Now().Add(writeWait))
t := time.Now()
message := []byte(fmt.Sprintf("%d", t.UnixNano()))
if err := s.conn.WriteMessage(websocket.PingMessage, message); err != nil {
log.Errorf("error sending ping to %s: %s", s.url, err)
s.closeAndReconnect()
select {
case <-ticker.C:
s.conn.SetWriteDeadline(time.Now().Add(writeWait))
t := time.Now()
message := []byte(fmt.Sprintf("%d", t.UnixNano()))
if err := s.conn.WriteMessage(websocket.PingMessage, message); err != nil {
log.Errorf("error sending ping to %s: %s", s.url, err)
s.closeAndReconnect()
return
}
case <-s.closeWriteChan:
return
}
}