package saltyim import ( "bytes" "context" "fmt" "os" "time" "git.mills.io/prologic/msgbus" "git.mills.io/prologic/msgbus/client" "github.com/keys-pub/keys" log "github.com/sirupsen/logrus" "go.mills.io/salty" ) const ( defaultRunCmdTimeout = time.Second * 3 ) func handleMessage(key *keys.EdX25519Key, prehook, posthook string, msgs chan string) msgbus.HandlerFunc { return func(msg *msgbus.Message) error { if prehook != "" { out, err := RunCmd(defaultRunCmdTimeout, prehook, bytes.NewBuffer(msg.Payload)) if err != nil { log.WithError(err).Debugf("error running pre-hook %s", prehook) } log.Debugf("pre-hook: %q", out) } data, _, err := salty.Decrypt(key, msg.Payload) if err != nil { fmt.Fprintf(os.Stderr, "error decrypting message") return err } msgs <- string(data) if posthook != "" { out, err := RunCmd(defaultRunCmdTimeout, posthook, bytes.NewBuffer(data)) if err != nil { log.WithError(err).Debugf("error running post-hook %s", posthook) } log.Debugf("post-hook: %q", out) } return nil } } // Read ... func Read(ctx context.Context, key *keys.EdX25519Key, endpoint, prehook, posthook string) chan string { uri, inbox := SplitInbox(endpoint) client := client.NewClient(uri, nil) msgs := make(chan string) s := client.Subscribe(inbox, handleMessage(key, prehook, posthook, msgs)) s.Start() log.Infof("Connected to %s/%s", uri, inbox) go func() { <-ctx.Done() s.Stop() close(msgs) }() return msgs }