Refactor read options with functional options for easier usage (#183)

Co-authored-by: James Mills <1290234+prologic@users.noreply.github.com>
Reviewed-on: https://git.mills.io/saltyim/saltyim/pulls/183
This commit is contained in:
James Mills 2023-01-27 23:19:52 +00:00
parent 57970ff67f
commit ba77f9e32c
7 changed files with 81 additions and 23 deletions

View File

@ -138,23 +138,23 @@ func (cli *Client) getAddr(user string) (Addr, error) {
return addr, nil
}
func (cli *Client) processMessage(msg *msgbus.Message, extraenvs, prehook, posthook string) (Message, error) {
func (cli *Client) processMessage(msg *msgbus.Message, opts *ReadOptions) (Message, error) {
var data []byte
defer func() {
if posthook != "" {
out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, cli.Env(extraenvs), posthook, bytes.NewBuffer(data))
if opts.PostHook != "" {
out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, cli.Env(opts.ExtraEnvs), opts.PostHook, bytes.NewBuffer(data))
if err != nil {
log.WithError(err).Debugf("error running post-hook %s", posthook)
log.WithError(err).Debugf("error running post-hook %s", opts.PostHook)
}
log.Debugf("post-hook: %q", out)
}
}()
if prehook != "" {
out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, cli.Env(extraenvs), prehook, bytes.NewBuffer(msg.Payload))
if opts.PreHook != "" {
out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, cli.Env(opts.ExtraEnvs), opts.PreHook, bytes.NewBuffer(msg.Payload))
if err != nil {
log.WithError(err).Debugf("error running pre-hook %s", prehook)
log.WithError(err).Debugf("error running pre-hook %s", opts.PreHook)
}
log.Debugf("pre-hook: %q", out)
}
@ -168,9 +168,9 @@ func (cli *Client) processMessage(msg *msgbus.Message, extraenvs, prehook, posth
return Message{Text: string(data), Key: senderKey}, nil
}
func (cli *Client) messageHandler(ctx context.Context, extraenvs, prehook, posthook string, msgs chan Message) msgbus.HandlerFunc {
func (cli *Client) messageHandler(ctx context.Context, opts *ReadOptions, msgs chan Message) msgbus.HandlerFunc {
return func(msg *msgbus.Message) error {
message, err := cli.processMessage(msg, extraenvs, prehook, posthook)
message, err := cli.processMessage(msg, opts)
if err != nil {
return fmt.Errorf("error processing message: %w", err)
}
@ -295,14 +295,58 @@ func (cli *Client) SetSend(send Sender) {
cli.send = send
}
// ReadOptions allows a client to read from its inbox and provide additional options for processing
// messages such as extra environment variables for pre/post hooks.
type ReadOptions struct {
ExtraEnvs string
PreHook string
PostHook string
}
// ReadOption is a function that configures a client
type ReadOption func(opts *ReadOptions) error
// WithExtraEnvs sets extra environment variables for use by pre/post hooks
func WithExtraEnvs(extraenvs string) ReadOption {
return func(opts *ReadOptions) error {
opts.ExtraEnvs = extraenvs
return nil
}
}
// WithPreHook sets the prehook used for processing incoming messages which is the path to a
// script that is passed the encrypted message payload as its standard input.
func WithPreHook(prehook string) ReadOption {
return func(opts *ReadOptions) error {
opts.PreHook = prehook
return nil
}
}
// WithPostHook sets the posthook used for processing incoming messages which is the path to a
// script that is passed the decrypted message as its standard input.
func WithPostHook(posthook string) ReadOption {
return func(opts *ReadOptions) error {
opts.PostHook = posthook
return nil
}
}
// Read reads a single message from this user's inbox
func (cli *Client) Read(extraenvs, prehook, posthook string) (Message, error) {
func (cli *Client) Read(options ...ReadOption) (Message, error) {
if cli.me.Endpoint() == nil {
if err := cli.me.Refresh(); err != nil {
return Message{}, fmt.Errorf("unable to find your endpoint for %s: %w", cli.me.String(), err)
}
}
opts := &ReadOptions{}
for _, option := range options {
if err := option(opts); err != nil {
return Message{}, fmt.Errorf("error configuring read options: %w", err)
}
}
uri, inbox := SplitInbox(cli.me.Endpoint().String())
bus := msgbus_client.NewClient(uri, nil)
@ -314,22 +358,30 @@ func (cli *Client) Read(extraenvs, prehook, posthook string) (Message, error) {
return Message{}, ErrNoMessages
}
return cli.processMessage(msg, extraenvs, prehook, posthook)
return cli.processMessage(msg, opts)
}
// Subscribe subscribers to this user's inbox for new messages
func (cli *Client) Subscribe(ctx context.Context, extraenvs, prehook, posthook string) chan Message {
func (cli *Client) Subscribe(ctx context.Context, options ...ReadOption) chan Message {
if cli.me.Endpoint() == nil {
return nil
}
opts := &ReadOptions{}
for _, option := range options {
if err := option(opts); err != nil {
log.WithError(err).Errorf("error configuring read options")
return nil
}
}
uri, inbox := SplitInbox(cli.me.Endpoint().String())
bus := msgbus_client.NewClient(uri, nil)
msgs := make(chan Message, 1)
index := cli.state.GetIndex(inbox) + 1 // +1 to skip over the last seen message
log.Debugf("streaming inbox %s from %d ...", inbox, index)
s := bus.Subscribe(inbox, index, cli.messageHandler(ctx, extraenvs, prehook, posthook, msgs))
s := bus.Subscribe(inbox, index, cli.messageHandler(ctx, opts, msgs))
go s.Run(ctx)
log.Debugf("Connected to %s/%s", uri, inbox)

View File

@ -132,7 +132,7 @@ func TestClient(t *testing.T) {
message := "Hello Bob"
require.NoError(alice.Send(bob.Me().String(), message))
msg, err := bob.Read("", "", "")
msg, err := bob.Read()
require.NoError(err)
assert.Equal(msg.Key.ID(), alice.Key().ID())
@ -148,7 +148,7 @@ func TestClient(t *testing.T) {
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
ch := bob.Subscribe(ctx, "", "", "")
ch := bob.Subscribe(ctx)
select {
case <-ctx.Done():
break

View File

@ -87,8 +87,14 @@ func read(follow bool, extraenvs, prehook, posthook string, args ...string) {
cancel()
}()
opts := []saltyim.ReadOption{
saltyim.WithExtraEnvs(extraenvs),
saltyim.WithPreHook(prehook),
saltyim.WithPostHook(posthook),
}
if follow {
for msg := range cli.Subscribe(ctx, extraenvs, prehook, posthook) {
for msg := range cli.Subscribe(ctx, opts...) {
if isatty.IsTerminal(os.Stdout.Fd()) {
fmt.Println(saltyim.FormatMessage(msg.Text))
} else {
@ -96,7 +102,7 @@ func read(follow bool, extraenvs, prehook, posthook string, args ...string) {
}
}
} else {
msg, err := cli.Read(extraenvs, prehook, posthook)
msg, err := cli.Read(opts...)
if err != nil {
if err == saltyim.ErrNoMessages {
os.Exit(0)

View File

@ -262,7 +262,7 @@ func (app *App) outputLoop(ctx context.Context) {
}
func (app *App) readLoop(ctx context.Context) {
ch := app.cli.Subscribe(ctx, "", "", "")
ch := app.cli.Subscribe(ctx)
for {
select {
case <-ctx.Done():

View File

@ -120,8 +120,8 @@ func (h *SaltyChat) connect(ctx app.Context) {
ctx.Async(func() {
stateCh := time.NewTicker(time.Second * 20)
inboxCh := client.Subscribe(ctx.Dispatcher().Context(), "", "", "")
outboxCh := client.OutboxClient(nil).Subscribe(ctx.Dispatcher().Context(), "", "", "")
inboxCh := client.Subscribe(ctx.Dispatcher().Context())
outboxCh := client.OutboxClient(nil).Subscribe(ctx.Dispatcher().Context())
for {
select {

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:918a7f3370b037e18874307e00ccb8b1f7a458fe7be68913a09ca9d922ee89fd
size 29760905
oid sha256:a41cea81148f0a7b01c94011223272c9bcbd86ea086f4338d7b6cca2258a2e38
size 29761883

View File

@ -117,7 +117,7 @@ func (svc *Service) Run(ctx context.Context) error {
log.Debugf("listening for service requests as %s", svc.me)
msgch := svc.cli.Subscribe(ctx, "", "", "")
msgch := svc.cli.Subscribe(ctx)
for {
select {