mirror of
https://git.mills.io/saltyim/saltyim.git
synced 2024-06-16 11:58:24 +00:00
801d6b93bb
Co-authored-by: James Mills <prologic@shortcircuit.net.au> Reviewed-on: https://git.mills.io/prologic/saltyim/pulls/43
204 lines
5.0 KiB
Go
204 lines
5.0 KiB
Go
package saltyim
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"time"
|
|
|
|
"git.mills.io/prologic/msgbus"
|
|
msgbus_client "git.mills.io/prologic/msgbus/client"
|
|
"github.com/keys-pub/keys"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"go.mills.io/salty"
|
|
"go.mills.io/saltyim/internal/exec"
|
|
)
|
|
|
|
type configCache map[string]Config
|
|
|
|
// PackMessage formts an outoing message in the Message Format
|
|
// <timestamp>\t(<sender>) <message>
|
|
func PackMessage(me Addr, msg string) []byte {
|
|
return []byte(fmt.Sprint(time.Now().UTC().Format(time.RFC3339), "\t", me.Formatted(), "\t", msg))
|
|
}
|
|
|
|
// Send sends the encrypted message `msg` to the Endpoint `endpoint` using a
|
|
// `POST` request and returns nil on success or an error on failure.
|
|
func Send(endpoint, msg string) error {
|
|
res, err := Request(http.MethodPost, endpoint, nil, bytes.NewBufferString(msg))
|
|
if err != nil {
|
|
return fmt.Errorf("error publishing message to %s: %w", endpoint, err)
|
|
}
|
|
defer res.Body.Close()
|
|
return nil
|
|
}
|
|
|
|
// Client is a Salty IM client that handles talking to a Salty IM Broker
|
|
// and Sedngina and Receiving messages to/from Salty IM Users.
|
|
type Client struct {
|
|
key *keys.EdX25519Key
|
|
|
|
endpoint string
|
|
me Addr
|
|
|
|
cache configCache
|
|
}
|
|
|
|
// NewClient reeturns a new Salty IM client for sending and receiving
|
|
// encrypted messages to other Salty IM users as well as decrypting
|
|
// and displaying messages of the user's own inbox.
|
|
func NewClient(me Addr, identity, endpoint string) (*Client, error) {
|
|
key, m, err := GetIdentity(identity)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error opening identity %s: %w", identity, err)
|
|
}
|
|
if me.IsZero() {
|
|
me = m
|
|
}
|
|
|
|
if me.IsZero() {
|
|
return nil, fmt.Errorf("unable to find your user addressn in %s", identity)
|
|
}
|
|
|
|
log.Debugf("Using identity %s with public key %s", identity, key)
|
|
log.Debugf("Salty Addr is %s", me)
|
|
log.Debugf("Endpoint is %s", endpoint)
|
|
|
|
return &Client{
|
|
key: key,
|
|
|
|
endpoint: endpoint,
|
|
me: me,
|
|
|
|
cache: make(configCache),
|
|
}, nil
|
|
}
|
|
|
|
func (cli *Client) getConfig(user string) (Config, error) {
|
|
config, ok := cli.cache[user]
|
|
if ok {
|
|
return config, nil
|
|
}
|
|
|
|
config, err := Lookup(user)
|
|
if err != nil {
|
|
return Config{}, fmt.Errorf("error: failed to lookup user %s: %w", user, err)
|
|
}
|
|
|
|
cli.cache[user] = config
|
|
|
|
return config, nil
|
|
}
|
|
|
|
func (cli *Client) handleMessage(prehook, posthook string, msgs chan string) msgbus.HandlerFunc {
|
|
return func(msg *msgbus.Message) error {
|
|
if prehook != "" {
|
|
out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, prehook, bytes.NewBuffer(msg.Payload))
|
|
if err != nil {
|
|
log.WithError(err).Debugf("error running pre-hook %s", prehook)
|
|
}
|
|
log.Debugf("pre-hook: %q", out)
|
|
}
|
|
|
|
data, _, err := salty.Decrypt(cli.key, msg.Payload)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "error decrypting message")
|
|
return err
|
|
}
|
|
|
|
msgs <- string(data)
|
|
|
|
if posthook != "" {
|
|
out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, posthook, bytes.NewBuffer(data))
|
|
if err != nil {
|
|
log.WithError(err).Debugf("error running post-hook %s", posthook)
|
|
}
|
|
log.Debugf("post-hook: %q", out)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (cli *Client) Me() Addr { return cli.me }
|
|
func (cli *Client) Endpoint() string { return cli.endpoint }
|
|
|
|
// Read subscribers to this user's inbox for new messages
|
|
func (cli *Client) Read(ctx context.Context, endpoint, prehook, posthook string) chan string {
|
|
if endpoint == "" {
|
|
endpoint = cli.endpoint
|
|
}
|
|
|
|
uri, inbox := SplitInbox(endpoint)
|
|
bus := msgbus_client.NewClient(uri, nil)
|
|
|
|
msgs := make(chan string)
|
|
s := bus.Subscribe(inbox, cli.handleMessage(prehook, posthook, msgs))
|
|
s.Start()
|
|
|
|
log.Debugf("Connected to %s/%s", uri, inbox)
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
s.Stop()
|
|
close(msgs)
|
|
}()
|
|
|
|
return msgs
|
|
}
|
|
|
|
// Send sends an encrypted message to the specified user
|
|
func (cli *Client) Send(user, msg string) error {
|
|
cfg, err := cli.getConfig(user)
|
|
if err != nil {
|
|
return fmt.Errorf("error looking up user %s: %w", user, err)
|
|
}
|
|
|
|
b, err := salty.Encrypt(cli.key, PackMessage(cli.me, msg), []string{cfg.Key})
|
|
if err != nil {
|
|
return fmt.Errorf("error encrypting message to %s: %w", user, err)
|
|
}
|
|
|
|
if err := Send(cfg.Endpoint, string(b)); err != nil {
|
|
return fmt.Errorf("error sending message to %s: %w", user, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Register sends a registration requestn to a broker
|
|
func (cli *Client) Register() error {
|
|
req := RegisterRequest{
|
|
Addr: cli.me,
|
|
Key: cli.key.ID().String(),
|
|
}
|
|
data, err := json.Marshal(req)
|
|
if err != nil {
|
|
return fmt.Errorf("error serializing register request: %w", err)
|
|
}
|
|
signed, err := salty.Sign(cli.key, data)
|
|
if err != nil {
|
|
return fmt.Errorf("error signing registration request: %w", err)
|
|
}
|
|
body := bytes.NewBuffer(signed)
|
|
|
|
endpointURL, err := url.Parse(cli.endpoint)
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing endpoint %s: %w", cli.endpoint, err)
|
|
}
|
|
endpointURL.Path = "/api/v1/register"
|
|
|
|
res, err := Request(http.MethodPost, endpointURL.String(), nil, body)
|
|
if err != nil {
|
|
return fmt.Errorf("error registering to broker %s: %w", endpointURL, err)
|
|
}
|
|
defer res.Body.Close()
|
|
|
|
return nil
|
|
}
|