Refactor client reconnect and add graceful shutdown

This commit is contained in:
James Mills 2017-08-19 00:15:45 -07:00
rodič dc7243ffa9
revize 8cb2806793
V databázi nebyl nalezen žádný známý klíč pro tento podpis
ID GPG klíče: AC4C014F1440EBD6
2 změnil soubory, kde provedl 52 přidání a 15 odebrání

Zobrazit soubor

@ -140,30 +140,34 @@ func (c *Client) Publish(topic, message string) error {
// Subscribe ...
func (c *Client) Subscribe(topic string) *Subscriber {
return &Subscriber{client: c, topic: topic}
return &Subscriber{
client: c,
topic: topic,
errch: make(chan error),
stopch: make(chan bool),
}
}
// Subscriber ...
type Subscriber struct {
conn *websocket.Conn
client *Client
topic string
conn *websocket.Conn
topic string
stopchan chan bool
errch chan error
stopch chan bool
}
// Stop ...
func (s *Subscriber) Stop() {
s.stopchan <- true
close(s.errch)
s.stopch <- true
}
// Run ...
func (s *Subscriber) Run() {
var (
err error
msg *msgbus.Message
)
var err error
origin := "http://localhost/"
@ -180,15 +184,32 @@ func (s *Subscriber) Run() {
continue
}
for {
err = websocket.JSON.Receive(s.conn, &msg)
go s.Reader()
select {
case err = <-s.errch:
if err != nil {
log.Printf("lost connection to %s: %s", url, err)
time.Sleep(s.client.reconnect)
break
} else {
s.client.Handle(msg)
}
case <-s.stopch:
log.Printf("shutting down ...")
s.conn.Close()
break
}
}
}
// Reader ...
func (s *Subscriber) Reader() {
var msg *msgbus.Message
for {
err := websocket.JSON.Receive(s.conn, &msg)
if err != nil {
s.errch <- err
break
}
s.client.Handle(msg)
}
}

Zobrazit soubor

@ -5,6 +5,8 @@ import (
"io/ioutil"
"log"
"os"
"os/signal"
"syscall"
"github.com/prologic/msgbus/client"
)
@ -68,5 +70,19 @@ func subscribe(client *client.Client, topic string) {
}
s := client.Subscribe(topic)
s.Run()
go s.Run()
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
log.Printf("caught signal %s: ", sig)
s.Stop()
done <- true
}()
<-done
}