6
0
mirror of https://git.mills.io/prologic/msgbus.git synced 2024-06-25 00:09:08 +00:00
prologic-msgbus/client/client.go

250 lines
4.2 KiB
Go
Raw Normal View History

package client
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
2017-08-20 05:09:36 +00:00
"net/url"
2018-03-26 00:03:56 +00:00
"os"
2017-08-20 05:09:36 +00:00
"strings"
"time"
2018-03-26 00:03:56 +00:00
log "github.com/sirupsen/logrus"
"golang.org/x/net/websocket"
"github.com/prologic/msgbus"
)
const (
// DefaultReconnectInterval ...
DefaultReconnectInterval = 5
// DefaultRetryInterval ...
DefaultRetryInterval = 5
)
// HandlerFunc ...
type HandlerFunc func(msg *msgbus.Message) error
// Client ...
type Client struct {
2017-08-20 05:09:36 +00:00
url string
retry time.Duration
reconnect time.Duration
ws *websocket.Conn
}
// Options ...
type Options struct {
ReconnectInterval int
RetryInterval int
}
// NewClient ...
2017-08-20 05:09:36 +00:00
func NewClient(url string, options *Options) *Client {
var (
reconnectInterval = DefaultReconnectInterval
retryInterval = DefaultRetryInterval
)
2017-08-20 05:09:36 +00:00
url = strings.TrimSuffix(url, "/")
client := &Client{url: url}
if options != nil {
if options.ReconnectInterval != 0 {
reconnectInterval = options.ReconnectInterval
}
if options.RetryInterval != 0 {
retryInterval = options.RetryInterval
}
}
client.reconnect = time.Duration(reconnectInterval) * time.Second
client.retry = time.Duration(retryInterval) * time.Second
return client
}
// Handle ...
2018-03-26 00:03:56 +00:00
func (c *Client) Handle(msg *msgbus.Message) error {
out, err := json.Marshal(msg)
if err != nil {
log.Errorf("error marshalling message: %s", err)
return err
}
os.Stdout.Write(out)
os.Stdout.Write([]byte{'\r', '\n'})
2018-03-26 00:03:56 +00:00
return nil
}
// Pull ...
func (c *Client) Pull(topic string) {
var msg *msgbus.Message
2017-08-20 05:09:36 +00:00
url := fmt.Sprintf("%s/%s", c.url, topic)
client := &http.Client{}
for {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
2017-08-20 05:09:36 +00:00
log.Printf(
"error constructing pull request to %s: %s",
url, err,
)
time.Sleep(c.retry)
continue
}
res, err := client.Do(req)
if err != nil {
log.Printf("error sending pull request to %s: %s", url, err)
time.Sleep(c.retry)
continue
}
if res.StatusCode == http.StatusNotFound {
// Empty queue
break
}
defer res.Body.Close()
err = json.NewDecoder(res.Body).Decode(&msg)
if err != nil {
log.Errorf(
"error decoding response from %s for %s: %s",
url, topic, err,
)
break
} else {
err := c.Handle(msg)
if err != nil {
log.Errorf(
"error handling message from %s for %s: %s",
url, topic, err,
)
}
break
}
}
}
// Publish ...
func (c *Client) Publish(topic, message string) error {
var payload bytes.Buffer
payload.Write([]byte(message))
2017-08-20 05:09:36 +00:00
url := fmt.Sprintf("%s/%s", c.url, topic)
client := &http.Client{}
req, err := http.NewRequest("PUT", url, &payload)
if err != nil {
return err
}
_, err = client.Do(req)
if err != nil {
return err
}
return nil
}
// Subscribe ...
func (c *Client) Subscribe(topic string, handler HandlerFunc) *Subscriber {
return NewSubscriber(c, topic, handler)
}
// Subscriber ...
type Subscriber struct {
conn *websocket.Conn
client *Client
topic string
handler HandlerFunc
errch chan error
stopch chan bool
}
// NewSubscriber ...
func NewSubscriber(client *Client, topic string, handler HandlerFunc) *Subscriber {
if handler == nil {
handler = client.Handle
}
return &Subscriber{
client: client,
topic: topic,
handler: handler,
errch: make(chan error),
stopch: make(chan bool),
}
}
// Stop ...
func (s *Subscriber) Stop() {
close(s.errch)
s.stopch <- true
}
// Run ...
func (s *Subscriber) Run() {
var err error
origin := "http://localhost/"
2017-08-20 05:09:36 +00:00
u, err := url.Parse(s.client.url)
if err != nil {
log.Fatal("invalid url: %s", s.client.url)
}
u.Scheme = "ws"
u.Path += fmt.Sprintf("/%s", s.topic)
url := u.String()
for {
s.conn, err = websocket.Dial(url, "", origin)
if err != nil {
2018-03-26 00:03:56 +00:00
log.Warnf("error connecting to %s: %s", url, err)
time.Sleep(s.client.reconnect)
continue
}
go s.Reader()
select {
case err = <-s.errch:
if err != nil {
2018-03-26 00:03:56 +00:00
log.Warnf("lost connection to %s: %s", url, err)
time.Sleep(s.client.reconnect)
}
case <-s.stopch:
2018-03-26 00:03:56 +00:00
log.Infof("shutting down ...")
s.conn.Close()
break
}
}
}
// Reader ...
func (s *Subscriber) Reader() {
var msg *msgbus.Message
for {
err := websocket.JSON.Receive(s.conn, &msg)
if err != nil {
s.errch <- err
break
}
s.handler(msg)
}
}