Various error handling improvements

This commit is contained in:
James Mills 2018-04-06 01:27:25 -07:00
parent 0b0cbd0c7a
commit 931120defd
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
5 changed files with 55 additions and 19 deletions

View File

@ -4,9 +4,11 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"regexp"
"strings"
"time"
@ -24,6 +26,13 @@ const (
DefaultRetryInterval = 5
)
var (
// PublishedRegexp ...
PublishedRegexp = regexp.MustCompile(
"message successfully published to \\w+ with sequence \\d",
)
)
// HandlerFunc ...
type HandlerFunc func(msg *msgbus.Message) error
@ -93,17 +102,14 @@ func (c *Client) Pull(topic string) {
for {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Printf(
"error constructing pull request to %s: %s",
url, err,
)
log.Printf("error constructing request to %s: %s", url, err)
time.Sleep(c.retry)
continue
}
res, err := client.Do(req)
if err != nil {
log.Printf("error sending pull request to %s: %s", url, err)
log.Printf("error sending request to %s: %s", url, err)
time.Sleep(c.retry)
continue
}
@ -146,12 +152,25 @@ func (c *Client) Publish(topic, message string) error {
req, err := http.NewRequest("PUT", url, &payload)
if err != nil {
return err
return fmt.Errorf("error constructing request: %s", err)
}
_, err = client.Do(req)
res, err := client.Do(req)
if err != nil {
return err
return fmt.Errorf("error publishing message: %s", err)
}
if res.StatusCode != 200 {
return fmt.Errorf("unexpected non-200 response: %s", res.Status)
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("error reading response: %s", err)
}
if !PublishedRegexp.Match(body) {
return fmt.Errorf("unexpected non-matching response: %s", body)
}
return nil

View File

@ -13,8 +13,9 @@ import (
// pubCmd represents the pub command
var pubCmd = &cobra.Command{
Use: "pub [flags] <topic> [<message>|-]",
Short: "Publish a new message",
Use: "pub [flags] <topic> [<message>|-]",
Aliases: []string{"put"},
Short: "Publish a new message",
Long: `This publishes a new message either from positional command-line
arguments or from standard input if - is used as the first and only argument.

View File

@ -9,8 +9,9 @@ import (
// pullCmd represents the pub command
var pullCmd = &cobra.Command{
Use: "pull [flags] <topic>",
Short: "Pulls a message from a given topic",
Use: "pull [flags] <topic>",
Aliases: []string{"get"},
Short: "Pulls a message from a given topic",
Long: `This pulls a message from the given topic if there are any messages
and prints the message to standard output. Otherwise if the queue for the
given topic is empty, this does nothing.

View File

@ -14,8 +14,9 @@ import (
// subCmd represents the pub command
var subCmd = &cobra.Command{
Use: "sub [flags] <topic>",
Short: "Subscribe to a topic",
Use: "sub [flags] <topic>",
Aliases: []string{"reg"},
Short: "Subscribe to a topic",
Long: `This subscribes to the given topic and for every message published
to the topic, the message is printed to standard output.`,
Args: cobra.MinimumNArgs(1),

View File

@ -2,6 +2,7 @@ package msgbus
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
@ -240,9 +241,11 @@ func (mb *MessageBus) Unsubscribe(id, topic string) {
func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" && (r.URL.Path == "/" || r.URL.Path == "") {
// XXX: guard with a mutex?
out, err := json.Marshal(mb.topics)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
msg := fmt.Sprintf("error serializing topics: %s", err)
http.Error(w, msg, http.StatusInternalServerError)
return
}
@ -260,10 +263,19 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case "POST", "PUT":
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
msg := fmt.Sprintf("error reading payload: %s", err)
http.Error(w, msg, http.StatusBadRequest)
return
}
mb.Put(mb.NewMessage(t, body))
message := mb.NewMessage(t, body)
mb.Put(message)
msg := fmt.Sprintf(
"message successfully published to %s with sequence %d",
t.Name, t.Sequence,
)
w.Write([]byte(msg))
case "GET":
if r.Header.Get("Upgrade") == "websocket" {
NewClient(t, mb).Handler().ServeHTTP(w, r)
@ -273,13 +285,15 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
message, ok := mb.Get(t)
if !ok {
http.Error(w, "Not Found", http.StatusNotFound)
msg := fmt.Sprintf("no messages enqueued for topic: %s", topic)
http.Error(w, msg, http.StatusNotFound)
return
}
out, err := json.Marshal(message)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
msg := fmt.Sprintf("error serializing message: %s", err)
http.Error(w, msg, http.StatusInternalServerError)
return
}