Add feature to msgbus sub CLI to executre commands on messages for convenient shell scriptiong

This commit is contained in:
James Mills 2018-04-30 23:09:12 -07:00
parent 95e5e36662
commit 07699203ab
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
3 changed files with 59 additions and 11 deletions

View File

@ -33,9 +33,6 @@ var (
)
)
// HandlerFunc ...
type HandlerFunc func(msg *msgbus.Message) error
// Client ...
type Client struct {
url string
@ -177,7 +174,7 @@ func (c *Client) Publish(topic, message string) error {
}
// Subscribe ...
func (c *Client) Subscribe(topic string, handler HandlerFunc) *Subscriber {
func (c *Client) Subscribe(topic string, handler msgbus.HandlerFunc) *Subscriber {
return NewSubscriber(c, topic, handler)
}
@ -187,14 +184,14 @@ type Subscriber struct {
client *Client
topic string
handler HandlerFunc
handler msgbus.HandlerFunc
errch chan error
stopch chan bool
}
// NewSubscriber ...
func NewSubscriber(client *Client, topic string, handler HandlerFunc) *Subscriber {
func NewSubscriber(client *Client, topic string, handler msgbus.HandlerFunc) *Subscriber {
if handler == nil {
handler = client.Handle
}

View File

@ -1,24 +1,29 @@
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
"syscall"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/prologic/msgbus"
"github.com/prologic/msgbus/client"
)
// subCmd represents the pub command
var subCmd = &cobra.Command{
Use: "sub [flags] <topic>",
Use: "sub [flags] <topic> [<command>]",
Aliases: []string{"reg"},
Short: "Subscribe to a topic",
Long: `This subscribes to the given topic and for every message published
to the topic, the message is printed to standard output.`,
to the topic, the message is printed to standard output (default) or the
supplied command is executed with the contents of the message as stdin.`,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
uri := viper.GetString("uri")
@ -26,7 +31,12 @@ to the topic, the message is printed to standard output.`,
topic := args[0]
subscribe(client, topic)
var command string
if len(args) > 1 {
command = args[1]
}
subscribe(client, topic, command)
},
}
@ -34,12 +44,50 @@ func init() {
RootCmd.AddCommand(subCmd)
}
func subscribe(client *client.Client, topic string) {
func handler(command string) msgbus.HandlerFunc {
return func(msg *msgbus.Message) error {
out, err := json.Marshal(msg)
if err != nil {
log.Printf("error marshalling message: %s", err)
return err
}
if command == "" {
os.Stdout.Write(out)
os.Stdout.Write([]byte{'\r', '\n'})
return nil
}
cmd := exec.Command(command)
stdin, err := cmd.StdinPipe()
if err != nil {
log.Printf("error connecting to stdin of %s: %s", command, err)
return err
}
go func() {
defer stdin.Close()
stdin.Write(out)
stdin.Write([]byte{'\r', '\n'})
}()
stdout, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error running %s: %s", command, err)
return err
}
fmt.Print(string(stdout))
return nil
}
}
func subscribe(client *client.Client, topic, command string) {
if topic == "" {
topic = defaultTopic
}
s := client.Subscribe(topic, nil)
s := client.Subscribe(topic, handler(command))
go s.Run()
sigs := make(chan os.Signal, 1)

View File

@ -19,6 +19,9 @@ const (
DefaultTTL = 60 * time.Second
)
// HandlerFunc ...
type HandlerFunc func(msg *Message) error
// Topic ...
type Topic struct {
Name string `json:"name"`