Refactored client

This commit is contained in:
James Mills 2018-05-05 14:08:39 -07:00
orang tua 29053c2314
melakukan 760e61ca91
Tidak diketahui kunci yang ditemukan di database signature
GPG Key ID: AC4C014F1440EBD6
2 mengubah file dengan 141 tambahan dan 130 penghapusan

Melihat File

@ -10,9 +10,11 @@ import (
"os"
"regexp"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/jpillora/backoff"
log "github.com/sirupsen/logrus"
"github.com/prologic/msgbus"
@ -20,10 +22,10 @@ import (
const (
// DefaultReconnectInterval ...
DefaultReconnectInterval = 5
DefaultReconnectInterval = 2
// DefaultRetryInterval ...
DefaultRetryInterval = 5
// DefaultMaxReconnectInterval ...
DefaultMaxReconnectInterval = 64
)
var (
@ -37,21 +39,21 @@ var (
type Client struct {
url string
retry time.Duration
reconnect time.Duration
reconnectInterval time.Duration
maxReconnectInterval time.Duration
}
// Options ...
type Options struct {
ReconnectInterval int
RetryInterval int
ReconnectInterval int
MaxReconnectInterval int
}
// NewClient ...
func NewClient(url string, options *Options) *Client {
var (
reconnectInterval = DefaultReconnectInterval
retryInterval = DefaultRetryInterval
reconnectInterval = DefaultReconnectInterval
maxReconnectInterval = DefaultMaxReconnectInterval
)
url = strings.TrimSuffix(url, "/")
@ -63,13 +65,13 @@ func NewClient(url string, options *Options) *Client {
reconnectInterval = options.ReconnectInterval
}
if options.RetryInterval != 0 {
retryInterval = options.RetryInterval
if options.MaxReconnectInterval != 0 {
maxReconnectInterval = options.MaxReconnectInterval
}
}
client.reconnect = time.Duration(reconnectInterval) * time.Second
client.retry = time.Duration(retryInterval) * time.Second
client.reconnectInterval = time.Duration(reconnectInterval) * time.Second
client.maxReconnectInterval = time.Duration(maxReconnectInterval) * time.Second
return client
}
@ -88,51 +90,46 @@ func (c *Client) Handle(msg *msgbus.Message) error {
}
// Pull ...
func (c *Client) Pull(topic string) {
var msg *msgbus.Message
func (c *Client) Pull(topic string) (msg *msgbus.Message, err error) {
url := fmt.Sprintf("%s/%s", c.url, topic)
client := &http.Client{}
for {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Printf("error constructing request to %s: %s", url, err)
time.Sleep(c.retry)
continue
}
res, err := client.Do(req)
if err != nil {
log.Printf("error sending request to %s: %s", url, err)
time.Sleep(c.retry)
continue
}
if res.StatusCode == http.StatusNotFound {
// Empty queue
break
}
defer res.Body.Close()
err = json.NewDecoder(res.Body).Decode(&msg)
if err != nil {
log.Errorf(
"error decoding response from %s for %s: %s",
url, topic, err,
)
break
} else {
err := c.Handle(msg)
if err != nil {
log.Errorf(
"error handling message from %s for %s: %s",
url, topic, err,
)
}
break
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Errorf("error constructing request to %s: %s", url, err)
return
}
res, err := client.Do(req)
if err != nil {
log.Errorf("error sending request to %s: %s", url, err)
return
}
if res.StatusCode == http.StatusNotFound {
// Empty queue
return
}
defer res.Body.Close()
err = json.NewDecoder(res.Body).Decode(&msg)
if err != nil {
log.Errorf(
"error decoding response from %s for %s: %s",
url, topic, err,
)
return
}
err = c.Handle(msg)
if err != nil {
log.Errorf(
"error handling message from %s for %s: %s",
url, topic, err,
)
return
}
return
}
// Publish ...
@ -178,14 +175,18 @@ func (c *Client) Subscribe(topic string, handler msgbus.HandlerFunc) *Subscriber
// Subscriber ...
type Subscriber struct {
conn *websocket.Conn
sync.RWMutex
conn *websocket.Conn
client *Client
topic string
handler msgbus.HandlerFunc
errch chan error
stopch chan bool
url string
reconnectInterval time.Duration
maxReconnectInterval time.Duration
}
// NewSubscriber ...
@ -193,23 +194,97 @@ func NewSubscriber(client *Client, topic string, handler msgbus.HandlerFunc) *Su
if handler == nil {
handler = client.Handle
}
u, err := url.Parse(client.url)
if err != nil {
log.Fatal("invalid url: %s", client.url)
}
if strings.HasPrefix(client.url, "https") {
u.Scheme = "wss"
} else {
u.Scheme = "ws"
}
u.Path += fmt.Sprintf("/%s", topic)
url := u.String()
return &Subscriber{
client: client,
topic: topic,
handler: handler,
errch: make(chan error),
stopch: make(chan bool),
url: url,
reconnectInterval: client.reconnectInterval,
maxReconnectInterval: client.maxReconnectInterval,
}
}
func (s *Subscriber) closeAndReconnect() {
s.conn.Close()
go s.connect()
}
func (s *Subscriber) connect() {
b := &backoff.Backoff{
Min: s.reconnectInterval,
Max: s.maxReconnectInterval,
Factor: 2,
Jitter: false,
}
for {
d := b.Duration()
s.Lock()
conn, _, err := websocket.DefaultDialer.Dial(s.url, nil)
s.Unlock()
if err != nil {
log.Warnf("error connecting to %s: %s", s.url, err)
log.Infof("reconnecting in %s", d)
time.Sleep(d)
continue
}
log.Infof("successfully connected to %s", s.url)
s.conn = conn
go s.readLoop()
break
}
}
func (s *Subscriber) readLoop() {
var msg *msgbus.Message
for {
err := s.conn.ReadJSON(&msg)
if err != nil {
log.Errorf("error reading from %s: %s", s.url, err)
s.closeAndReconnect()
return
}
err = s.handler(msg)
if err != nil {
log.Warnf("error handling message: %s", err)
}
}
}
// Start ...
func (s *Subscriber) Start() {
go s.connect()
}
// Stop ...
func (s *Subscriber) Stop() {
log.Infof("shutting down ...")
close(s.errch)
s.errch = nil
s.stopch <- true
err := s.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Warnf("error sending close message: %s", err)
@ -219,70 +294,6 @@ func (s *Subscriber) Stop() {
if err != nil {
log.Warnf("error closing connection: %s", err)
}
}
// Run ...
func (s *Subscriber) Run() {
var err error
u, err := url.Parse(s.client.url)
if err != nil {
log.Fatal("invalid url: %s", s.client.url)
}
if strings.HasPrefix(s.client.url, "https") {
u.Scheme = "wss"
} else {
u.Scheme = "ws"
}
u.Path += fmt.Sprintf("/%s", s.topic)
url := u.String()
for {
s.conn, _, err = websocket.DefaultDialer.Dial(url, nil)
if err != nil {
log.Warnf("error connecting to %s: %s", url, err)
select {
case <-time.After(s.client.reconnect):
continue
case <-s.stopch:
s.conn.Close()
break
}
}
go s.Reader()
select {
case err = <-s.errch:
if err != nil {
log.Warnf("lost connection to %s: %s", url, err)
time.Sleep(s.client.reconnect)
}
case <-s.stopch:
break
}
}
}
// Reader ...
func (s *Subscriber) Reader() {
var msg *msgbus.Message
for {
err := s.conn.ReadJSON(&msg)
if err != nil {
if s.errch != nil {
s.errch <- err
}
break
}
err = s.handler(msg)
if err != nil {
log.Errorf("error handling message: %s", err)
}
}
s.conn = nil
}

Melihat File

@ -88,7 +88,7 @@ func subscribe(client *client.Client, topic, command string) {
}
s := client.Subscribe(topic, handler(command))
go s.Run()
s.Start()
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)