Client wrapper and reconnecting client

See #2
This commit is contained in:
James Mills 2017-08-07 00:31:53 -07:00
rodič b622ab6808
revize c76ef2f61d
V databázi nebyl nalezen žádný známý klíč pro tento podpis
ID GPG klíče: AC4C014F1440EBD6
3 změnil soubory, kde provedl 242 přidání a 78 odebrání

194
client/client.go Normal file
Zobrazit soubor

@ -0,0 +1,194 @@
package client
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"golang.org/x/net/websocket"
"github.com/prologic/msgbus"
)
const (
// DefaultReconnectInterval ...
DefaultReconnectInterval = 5
// DefaultRetryInterval ...
DefaultRetryInterval = 5
)
// Client ...
type Client struct {
host string
port int
retry time.Duration
reconnect time.Duration
ws *websocket.Conn
}
// Options ...
type Options struct {
ReconnectInterval int
RetryInterval int
}
// NewClient ...
func NewClient(host string, port int, options *Options) *Client {
var (
reconnectInterval = DefaultReconnectInterval
retryInterval = DefaultRetryInterval
)
client := &Client{
host: host,
port: port,
}
if options != nil {
if options.ReconnectInterval != 0 {
reconnectInterval = options.ReconnectInterval
}
if options.RetryInterval != 0 {
retryInterval = options.RetryInterval
}
}
client.reconnect = time.Duration(reconnectInterval) * time.Second
client.retry = time.Duration(retryInterval) * time.Second
return client
}
// Handle ...
func (c *Client) Handle(msg *msgbus.Message) {
log.Printf(
"[msgbus] received message: id=%d topic=%s payload=%s",
msg.ID, msg.Topic, msg.Payload,
)
}
// Pull ...
func (c *Client) Pull(topic string) {
var msg *msgbus.Message
url := fmt.Sprintf("http://%s:%d/pull/%s", c.host, c.port, topic)
client := &http.Client{}
for {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Printf("error constructing pull request to %s: %s", url, err)
time.Sleep(c.retry)
continue
}
res, err := client.Do(req)
if err != nil {
log.Printf("error sending pull request to %s: %s", url, err)
time.Sleep(c.retry)
continue
}
if res.StatusCode == http.StatusNotFound {
break
}
defer res.Body.Close()
err = json.NewDecoder(res.Body).Decode(&msg)
if err != nil {
log.Printf(
"error decoding response from %s for %s: %s",
url, topic, err,
)
time.Sleep(c.retry)
break
} else {
c.Handle(msg)
}
}
}
// Publish ...
func (c *Client) Publish(topic, message string) error {
var payload bytes.Buffer
payload.Write([]byte(message))
url := fmt.Sprintf("http://%s:%d/%s", c.host, c.port, topic)
client := &http.Client{}
req, err := http.NewRequest("PUT", url, &payload)
if err != nil {
return err
}
_, err = client.Do(req)
if err != nil {
return err
}
return nil
}
// Subscribe ...
func (c *Client) Subscribe(topic string) *Subscriber {
return &Subscriber{client: c, topic: topic}
}
// Subscriber ...
type Subscriber struct {
client *Client
topic string
conn *websocket.Conn
stopchan chan bool
}
// Stop ...
func (s *Subscriber) Stop() {
s.stopchan <- true
}
// Run ...
func (s *Subscriber) Run() {
var (
err error
msg *msgbus.Message
)
origin := "http://localhost/"
url := fmt.Sprintf(
"ws://%s:%d/push/%s",
s.client.host, s.client.port, s.topic,
)
for {
s.conn, err = websocket.Dial(url, "", origin)
if err != nil {
log.Printf("error connecting to %s: %s", url, err)
time.Sleep(s.client.reconnect)
continue
}
for {
err = websocket.JSON.Receive(s.conn, &msg)
if err != nil {
log.Printf("lost connection to %s: %s", url, err)
time.Sleep(s.client.reconnect)
break
} else {
s.client.Handle(msg)
}
}
}
}

Zobrazit soubor

@ -1,49 +1,40 @@
package main
import (
"bytes"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"golang.org/x/net/websocket"
"github.com/prologic/msgbus"
"github.com/prologic/msgbus/client"
)
const defaultTopic = "hello"
var (
host string
port int
err error
msg msgbus.Message
ws *websocket.Conn
)
func main() {
var (
host string
port int
)
func init() {
flag.StringVar(&host, "host", "localhost", "host to connect to")
flag.IntVar(&port, "port", 8000, "port to connect to")
}
func main() {
flag.Parse()
client := client.NewClient(host, port, nil)
if flag.Arg(0) == "sub" {
subscribe(flag.Arg(1))
subscribe(client, flag.Arg(1))
} else if flag.Arg(0) == "pub" {
publish(flag.Arg(1), flag.Arg(2))
publish(client, flag.Arg(1), flag.Arg(2))
} else if flag.Arg(0) == "pull" {
pull(client, flag.Arg(1))
} else {
log.Fatalf("invalid command %s", flag.Arg(0))
}
}
func publish(topic string, message string) {
var payload bytes.Buffer
func publish(client *client.Client, topic, message string) {
if topic == "" {
topic = defaultTopic
}
@ -52,54 +43,30 @@ func publish(topic string, message string) {
log.Printf("Reading message from stdin...\n")
buf, err := ioutil.ReadAll(os.Stdin)
if err != nil {
log.Fatal(err)
log.Fatalf("error reading message from stdin: %s", err)
}
payload.Write(buf)
} else {
payload.Write([]byte(message))
message = string(buf[:])
}
url := fmt.Sprintf("http://%s:%d/%s", host, port, topic)
client := &http.Client{}
req, err := http.NewRequest("PUT", url, &payload)
err := client.Publish(topic, message)
if err != nil {
log.Fatal(err)
}
_, err = client.Do(req)
if err != nil {
log.Fatal(err)
log.Fatalf("error publishing message: %s", err)
}
}
func subscribe(topic string) {
func pull(client *client.Client, topic string) {
if topic == "" {
topic = defaultTopic
}
origin := "http://localhost/"
url := fmt.Sprintf("ws://%s:%d/push/%s", host, port, topic)
ws, err = websocket.Dial(url, "", origin)
if err != nil {
log.Fatal(err)
}
log.Printf("Listening for messages from %s", url)
for {
err = websocket.JSON.Receive(ws, &msg)
if err != nil {
log.Fatal(err)
}
ack := msgbus.Ack{Ack: msg.ID}
err = websocket.JSON.Send(ws, &ack)
if err != nil {
log.Fatal(err)
}
log.Printf("Received: %s\n", msg.Payload)
}
client.Pull(topic)
}
func subscribe(client *client.Client, topic string) {
if topic == "" {
topic = defaultTopic
}
s := client.Subscribe(topic)
s.Run()
}

Zobrazit soubor

@ -8,6 +8,7 @@ import (
// Message ...
type Message struct {
ID uint64 `json:"id"`
Topic string `json:"topic"`
Payload []byte `json:"payload"`
Created time.Time `json:"created"`
Acked time.Time `json:"acked"`
@ -21,21 +22,21 @@ type Ack struct {
// Listeners ...
type Listeners struct {
ids map[string]bool
chs map[string]chan *Message
chs map[string]chan Message
}
// NewListeners ...
func NewListeners() *Listeners {
return &Listeners{
ids: make(map[string]bool),
chs: make(map[string]chan *Message),
chs: make(map[string]chan Message),
}
}
// Add ...
func (ls *Listeners) Add(id string) chan *Message {
func (ls *Listeners) Add(id string) chan Message {
ls.ids[id] = true
ls.chs[id] = make(chan *Message)
ls.chs[id] = make(chan Message)
return ls.chs[id]
}
@ -54,7 +55,7 @@ func (ls *Listeners) Exists(id string) bool {
}
// Get ...
func (ls *Listeners) Get(id string) (chan *Message, bool) {
func (ls *Listeners) Get(id string) (chan Message, bool) {
ch, ok := ls.chs[id]
if !ok {
return nil, false
@ -63,7 +64,7 @@ func (ls *Listeners) Get(id string) (chan *Message, bool) {
}
// NotifyAll ...
func (ls *Listeners) NotifyAll(message *Message) {
func (ls *Listeners) NotifyAll(message Message) {
for _, ch := range ls.chs {
ch <- message
}
@ -90,8 +91,8 @@ func (mb *MessageBus) Len() int {
}
// NewMessage ...
func (mb *MessageBus) NewMessage(payload []byte) *Message {
message := &Message{
func (mb *MessageBus) NewMessage(payload []byte) Message {
message := Message{
ID: mb.seqid,
Payload: payload,
Created: time.Now(),
@ -103,10 +104,12 @@ func (mb *MessageBus) NewMessage(payload []byte) *Message {
}
// Put ...
func (mb *MessageBus) Put(topic string, message *Message) {
func (mb *MessageBus) Put(topic string, message Message) {
message.Topic = topic
log.Printf(
"[msgbus] PUT id=%d topic=%s payload=%s",
message.ID, topic, message.Payload,
message.ID, message.Topic, message.Payload,
)
q, ok := mb.topics[topic]
if !ok {
@ -119,25 +122,25 @@ func (mb *MessageBus) Put(topic string, message *Message) {
}
// Get ...
func (mb *MessageBus) Get(topic string) (*Message, bool) {
func (mb *MessageBus) Get(topic string) (Message, bool) {
log.Printf("[msgbus] GET topic=%s", topic)
q, ok := mb.topics[topic]
if !ok {
return &Message{}, false
return Message{}, false
}
m := q.Pop()
if m == nil {
return &Message{}, false
return Message{}, false
}
return m.(*Message), true
return m.(Message), true
}
// NotifyAll ...
func (mb *MessageBus) NotifyAll(topic string, message *Message) {
func (mb *MessageBus) NotifyAll(topic string, message Message) {
log.Printf(
"[msgbus] NotifyAll id=%d topic=%s payload=%s",
message.ID, topic, message.Payload,
message.ID, message.Topic, message.Payload,
)
ls, ok := mb.listeners[topic]
if !ok {
@ -147,7 +150,7 @@ func (mb *MessageBus) NotifyAll(topic string, message *Message) {
}
// Subscribe ...
func (mb *MessageBus) Subscribe(id, topic string) chan *Message {
func (mb *MessageBus) Subscribe(id, topic string) chan Message {
log.Printf("[msgbus] Subscribe id=%s topic=%s", id, topic)
ls, ok := mb.listeners[topic]
if !ok {