6
0
mirror of https://git.mills.io/prologic/msgbus.git synced 2024-06-25 00:09:08 +00:00
prologic-msgbus/client/client.go

341 lines
6.5 KiB
Go
Raw Normal View History

package client
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
2017-08-20 05:09:36 +00:00
"net/url"
2018-03-26 00:03:56 +00:00
"os"
"strconv"
2017-08-20 05:09:36 +00:00
"strings"
2018-05-05 21:08:39 +00:00
"sync"
"time"
"github.com/gorilla/websocket"
2018-05-05 21:08:39 +00:00
"github.com/jpillora/backoff"
2018-03-26 00:03:56 +00:00
log "github.com/sirupsen/logrus"
"github.com/prologic/msgbus"
)
const (
// DefaultReconnectInterval ...
2018-05-05 21:08:39 +00:00
DefaultReconnectInterval = 2
2018-05-05 21:08:39 +00:00
// DefaultMaxReconnectInterval ...
DefaultMaxReconnectInterval = 64
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
// Client ...
type Client struct {
2017-08-20 05:09:36 +00:00
url string
2018-05-05 21:08:39 +00:00
reconnectInterval time.Duration
maxReconnectInterval time.Duration
}
// Options ...
type Options struct {
2018-05-05 21:08:39 +00:00
ReconnectInterval int
MaxReconnectInterval int
}
// NewClient ...
2017-08-20 05:09:36 +00:00
func NewClient(url string, options *Options) *Client {
var (
2018-05-05 21:08:39 +00:00
reconnectInterval = DefaultReconnectInterval
maxReconnectInterval = DefaultMaxReconnectInterval
)
2017-08-20 05:09:36 +00:00
url = strings.TrimSuffix(url, "/")
client := &Client{url: url}
if options != nil {
if options.ReconnectInterval != 0 {
reconnectInterval = options.ReconnectInterval
}
2018-05-05 21:08:39 +00:00
if options.MaxReconnectInterval != 0 {
maxReconnectInterval = options.MaxReconnectInterval
}
}
2018-05-05 21:08:39 +00:00
client.reconnectInterval = time.Duration(reconnectInterval) * time.Second
client.maxReconnectInterval = time.Duration(maxReconnectInterval) * time.Second
return client
}
// Handle ...
2018-03-26 00:03:56 +00:00
func (c *Client) Handle(msg *msgbus.Message) error {
out, err := json.Marshal(msg)
if err != nil {
log.Errorf("error marshalling message: %s", err)
return err
}
os.Stdout.Write(out)
os.Stdout.Write([]byte{'\r', '\n'})
2018-03-26 00:03:56 +00:00
return nil
}
// Pull ...
2018-05-05 21:08:39 +00:00
func (c *Client) Pull(topic string) (msg *msgbus.Message, err error) {
2017-08-20 05:09:36 +00:00
url := fmt.Sprintf("%s/%s", c.url, topic)
client := &http.Client{}
2018-05-05 21:08:39 +00:00
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Errorf("error constructing request to %s: %s", url, err)
return
}
2018-05-05 21:08:39 +00:00
res, err := client.Do(req)
if err != nil {
log.Errorf("error sending request to %s: %s", url, err)
return
}
2018-05-05 21:08:39 +00:00
if res.StatusCode == http.StatusNotFound {
// Empty queue
return
}
2018-05-05 21:08:39 +00:00
defer res.Body.Close()
err = json.NewDecoder(res.Body).Decode(&msg)
if err != nil {
log.Errorf(
"error decoding response from %s for %s: %s",
url, topic, err,
)
return
}
err = c.Handle(msg)
if err != nil {
log.Errorf(
"error handling message from %s for %s: %s",
url, topic, err,
)
return
}
2018-05-05 21:08:39 +00:00
return
}
// Publish ...
func (c *Client) Publish(topic, message string) error {
var payload bytes.Buffer
payload.Write([]byte(message))
2017-08-20 05:09:36 +00:00
url := fmt.Sprintf("%s/%s", c.url, topic)
client := &http.Client{}
req, err := http.NewRequest("PUT", url, &payload)
if err != nil {
2018-04-06 08:27:25 +00:00
return fmt.Errorf("error constructing request: %s", err)
}
2018-04-06 08:27:25 +00:00
res, err := client.Do(req)
if err != nil {
2018-04-06 08:27:25 +00:00
return fmt.Errorf("error publishing message: %s", err)
}
2018-05-14 10:04:45 +00:00
if res.StatusCode != 201 {
return fmt.Errorf("unexpected response: %s", res.Status)
}
return nil
}
// Subscribe ...
func (c *Client) Subscribe(topic string, handler msgbus.HandlerFunc) *Subscriber {
return NewSubscriber(c, topic, handler)
}
// Subscriber ...
type Subscriber struct {
2018-05-05 21:08:39 +00:00
sync.RWMutex
conn *websocket.Conn
client *Client
topic string
handler msgbus.HandlerFunc
2018-05-05 21:08:39 +00:00
url string
reconnectInterval time.Duration
maxReconnectInterval time.Duration
closeWriteChan chan bool
}
// NewSubscriber ...
func NewSubscriber(client *Client, topic string, handler msgbus.HandlerFunc) *Subscriber {
if handler == nil {
handler = client.Handle
}
2018-05-05 21:08:39 +00:00
u, err := url.Parse(client.url)
2017-08-20 05:09:36 +00:00
if err != nil {
2018-05-05 21:08:39 +00:00
log.Fatal("invalid url: %s", client.url)
2017-08-20 05:09:36 +00:00
}
2018-05-05 21:08:39 +00:00
if strings.HasPrefix(client.url, "https") {
u.Scheme = "wss"
} else {
u.Scheme = "ws"
}
2018-05-05 21:08:39 +00:00
u.Path += fmt.Sprintf("/%s", topic)
2017-08-20 05:09:36 +00:00
url := u.String()
2018-05-05 21:08:39 +00:00
return &Subscriber{
client: client,
topic: topic,
handler: handler,
url: url,
reconnectInterval: client.reconnectInterval,
maxReconnectInterval: client.maxReconnectInterval,
closeWriteChan: make(chan bool, 1),
2018-05-05 21:08:39 +00:00
}
}
func (s *Subscriber) closeAndReconnect() {
s.closeWriteChan <- true
2018-05-05 21:08:39 +00:00
s.conn.Close()
go s.connect()
}
func (s *Subscriber) connect() {
b := &backoff.Backoff{
Min: s.reconnectInterval,
Max: s.maxReconnectInterval,
Factor: 2,
Jitter: false,
}
for {
2018-05-05 21:08:39 +00:00
d := b.Duration()
conn, _, err := websocket.DefaultDialer.Dial(s.url, nil)
if err != nil {
2018-05-05 21:08:39 +00:00
log.Warnf("error connecting to %s: %s", s.url, err)
log.Infof("reconnecting in %s", d)
time.Sleep(d)
continue
}
2018-05-05 21:08:39 +00:00
log.Infof("successfully connected to %s", s.url)
s.Lock()
2018-05-05 21:08:39 +00:00
s.conn = conn
s.closeWriteChan = make(chan bool, 1)
s.Unlock()
2018-05-05 21:08:39 +00:00
go s.readLoop()
go s.writeLoop()
2018-05-05 21:08:39 +00:00
break
}
}
2018-05-05 21:08:39 +00:00
func (s *Subscriber) readLoop() {
var msg *msgbus.Message
s.conn.SetReadDeadline(time.Now().Add(pongWait))
s.conn.SetPongHandler(func(message string) error {
log.Debugf("recieved pong from %s: %s", s.url, message)
t, err := strconv.ParseInt(message, 10, 64)
d := time.Duration(time.Now().UnixNano() - t)
if err != nil {
log.Warnf("garbage pong reply from %s: %s", s.url, err)
} else {
log.Debugf("pong latency of %s: %s", s.url, d)
}
s.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
err := s.conn.ReadJSON(&msg)
if err != nil {
2018-05-05 21:08:39 +00:00
log.Errorf("error reading from %s: %s", s.url, err)
s.closeAndReconnect()
return
}
2018-05-05 21:08:39 +00:00
err = s.handler(msg)
if err != nil {
2018-05-05 21:08:39 +00:00
log.Warnf("error handling message: %s", err)
}
}
}
2018-05-05 21:08:39 +00:00
func (s *Subscriber) writeLoop() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
if s.conn != nil {
s.conn.Close()
}
}()
for {
select {
case <-ticker.C:
s.conn.SetWriteDeadline(time.Now().Add(writeWait))
t := time.Now()
message := []byte(fmt.Sprintf("%d", t.UnixNano()))
if err := s.conn.WriteMessage(websocket.PingMessage, message); err != nil {
log.Errorf("error sending ping to %s: %s", s.url, err)
s.closeAndReconnect()
return
}
case <-s.closeWriteChan:
return
}
}
}
2018-05-05 21:08:39 +00:00
// Start ...
func (s *Subscriber) Start() {
go s.connect()
}
// Stop ...
func (s *Subscriber) Stop() {
log.Infof("shutting down ...")
err := s.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Warnf("error sending close message: %s", err)
}
err = s.conn.Close()
if err != nil {
log.Warnf("error closing connection: %s", err)
}
s.conn = nil
}