Fix more data races

This commit is contained in:
James Mills 2022-03-21 01:10:02 +10:00
parent 06a193b0cc
commit c42b2c5462
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
2 changed files with 17 additions and 16 deletions

View File

@ -286,7 +286,9 @@ func (s *Subscriber) readLoop() {
for {
err := s.conn.ReadJSON(&msg)
if err != nil {
log.Errorf("error reading from %s: %s", s.url, err)
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
log.Errorf("error reading from %s: %s", s.url, err)
}
s.closeAndReconnect()
return
}
@ -302,6 +304,8 @@ func (s *Subscriber) writeLoop() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
s.RLock()
defer s.RUnlock()
if s.conn != nil {
s.conn.Close()
}
@ -331,19 +335,16 @@ func (s *Subscriber) Start() {
// Stop ...
func (s *Subscriber) Stop() {
log.Infof("shutting down ...")
s.Lock()
defer s.Unlock()
err := s.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
if err := s.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
log.Warnf("error sending close message: %s", err)
}
err = s.conn.Close()
if err != nil {
if err := s.conn.Close(); err != nil {
log.Warnf("error closing connection: %s", err)
}
s.Lock()
s.conn = nil
s.Unlock()
}

View File

@ -598,12 +598,11 @@ func (c *Client) readPump() {
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
log.Errorf("unexpected close error from %s: %s", c.id, err)
c.bus.Unsubscribe(c.id, c.topic.Name)
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
log.Errorf("error reading from %s: %s", c.id, err)
}
log.Errorf("error reading from %s: %s", c.id, err)
break
c.bus.Unsubscribe(c.id, c.topic.Name)
return
}
log.Debugf("recieved message from %s: %s", c.id, message)
}
@ -624,7 +623,8 @@ func (c *Client) writePump() {
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The bus closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "bus closed")
c.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(writeWait))
return
}
@ -659,8 +659,8 @@ func (c *Client) Start() {
c.conn.SetCloseHandler(func(code int, text string) error {
c.bus.Unsubscribe(c.id, c.topic.Name)
message := websocket.FormatCloseMessage(code, "")
c.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second*1))
message := websocket.FormatCloseMessage(code, text)
c.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(writeWait))
return nil
})