Add extra logging and fix a concurrency bug

This commit is contained in:
James Mills 2018-05-06 09:34:52 -07:00
rodič b5862245e1
revize 1e0200788a
V databázi nebyl nalezen žádný známý klíč pro tento podpis
ID GPG klíče: AC4C014F1440EBD6
2 změnil soubory, kde provedl 7 přidání a 6 odebrání

Zobrazit soubor

@ -237,9 +237,7 @@ func (s *Subscriber) connect() {
for {
d := b.Duration()
s.Lock()
conn, _, err := websocket.DefaultDialer.Dial(s.url, nil)
s.Unlock()
if err != nil {
log.Warnf("error connecting to %s: %s", s.url, err)
@ -250,7 +248,9 @@ func (s *Subscriber) connect() {
log.Infof("successfully connected to %s", s.url)
s.Lock()
s.conn = conn
s.Unlock()
go s.readLoop()

Zobrazit soubor

@ -115,12 +115,12 @@ func (ls *Listeners) NotifyAll(message Message) int {
for id, ch := range ls.chs {
select {
case ch <- message:
log.Debugf("successfully published message to %s", message, id)
log.Debugf("successfully published message to %s: %#v", id, message)
i++
default:
// TODO: Drop this client?
// TODO: Retry later?
log.Warnf("cannot publish message to %q", message, id)
log.Warnf("cannot publish message to %s: %#v", id, message)
}
}
@ -518,9 +518,10 @@ func (c *Client) readPump() {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
log.Errorf("unexpected close error from %s: %v", c.id, err)
log.Errorf("unexpected close error from %s: %s", c.id, err)
c.bus.Unsubscribe(c.id, c.topic.Name)
}
log.Errorf("error reading from %s: %s", c.id, err)
break
}
log.Debugf("recieved message from %s: %s", c.id, message)
@ -549,7 +550,7 @@ func (c *Client) writePump() {
err = c.conn.WriteJSON(msg)
if err != nil {
// TODO: Retry? Put the message back in the queue?
log.Errorf("Error sending msg to %s", c.id)
log.Errorf("Error sending msg to %s: %s", c.id, err)
if c.bus.metrics != nil {
c.bus.metrics.Counter("client", "errors").Inc()
}