mirror of
https://git.mills.io/saltyim/saltyim.git
synced 2024-06-25 00:08:26 +00:00
515 lines
14 KiB
Go
515 lines
14 KiB
Go
package saltyim
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
|
|
"git.mills.io/prologic/msgbus"
|
|
msgbus_client "git.mills.io/prologic/msgbus/client"
|
|
"github.com/keys-pub/keys"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"go.mills.io/salty"
|
|
"go.salty.im/saltyim/internal/exec"
|
|
)
|
|
|
|
const (
|
|
// DefaultEnvPath is the default PATH for pre and post hooks that are shelled out to
|
|
DefaultEnvPath = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
|
|
)
|
|
|
|
var (
|
|
// ErrNoMessages is an error returned when there are no further messages found for an inbox from the broker
|
|
ErrNoMessages = errors.New("error: no messages found")
|
|
|
|
// ErrNoSender is an error returned when the client is not properly configured with a valid sender
|
|
ErrNoSender = errors.New("error: no sender configured")
|
|
|
|
// ErrMissingIdentity is an error returned when the client is not properly configured with a valid identity
|
|
ErrMissingIdentity = errors.New("error: missing identity")
|
|
)
|
|
|
|
type addrCache map[string]Addr
|
|
|
|
// Message contains the plaintext (decrypted) message and the sender's public key
|
|
type Message struct {
|
|
Text string
|
|
Key *keys.EdX25519PublicKey
|
|
}
|
|
|
|
// TODO: Support shell quoting and escapes?
|
|
func parseExtraEnvs(extraenvs string) map[string]string {
|
|
env := make(map[string]string)
|
|
for _, extraenv := range strings.Split(extraenvs, " ") {
|
|
tokens := strings.SplitN(extraenv, "=", 2)
|
|
switch len(tokens) {
|
|
case 1:
|
|
env[tokens[0]] = ""
|
|
case 2:
|
|
env[tokens[0]] = tokens[1]
|
|
}
|
|
}
|
|
return env
|
|
}
|
|
|
|
// Client is a Salty IM client that handles talking to a Salty IM Broker
|
|
// and Sedngina and Receiving messages to/from Salty IM Users.
|
|
type Client struct {
|
|
me Addr
|
|
id *Identity
|
|
|
|
cache addrCache
|
|
state *State
|
|
|
|
lookup Lookuper
|
|
send Sender
|
|
}
|
|
|
|
// ClientOption is a function that configures a client
|
|
type ClientOption func(cli *Client) error
|
|
|
|
// NewClient returns a new Salty IM client for sending and receiving
|
|
// encrypted messages to other Salty IM users as well as decrypting
|
|
// and displaying messages of the user's own inbox.
|
|
func NewClient(options ...ClientOption) (*Client, error) {
|
|
cli := &Client{
|
|
cache: make(addrCache),
|
|
lookup: &DirectLookup{},
|
|
send: &DirectSend{},
|
|
}
|
|
|
|
for _, option := range options {
|
|
if err := option(cli); err != nil {
|
|
return nil, fmt.Errorf("error configuring client: %w", err)
|
|
}
|
|
}
|
|
|
|
if cli.state == nil {
|
|
log.Debugf("no state loaded, using an empty state")
|
|
cli.state = NewState()
|
|
}
|
|
|
|
if cli.id == nil || cli.id.key == nil {
|
|
return nil, ErrMissingIdentity
|
|
}
|
|
|
|
if cli.me == nil || cli.me.IsZero() {
|
|
cli.me = cli.id.addr
|
|
}
|
|
|
|
if err := cli.me.Refresh(); err != nil {
|
|
log.WithError(err).Debug("error looking up user endpoint")
|
|
}
|
|
|
|
if cli.me == nil || cli.me.IsZero() {
|
|
return nil, fmt.Errorf("unable to find your user address in %s", cli.id.Source())
|
|
}
|
|
|
|
log.Debugf("Using identity %s with public key %s", cli.id.Source(), cli.id.key)
|
|
log.Debugf("Salty Addr is %s", cli.me)
|
|
log.Debugf("Endpoint is %s", cli.me.Endpoint())
|
|
|
|
return cli, nil
|
|
}
|
|
|
|
func (cli *Client) getAddr(user string) (Addr, error) {
|
|
addr, ok := cli.cache[user]
|
|
if ok {
|
|
return addr, nil
|
|
}
|
|
|
|
addr, err := cli.lookup.LookupAddr(user)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error: failed to lookup user %s: %w", user, err)
|
|
}
|
|
|
|
cli.cache[user] = addr
|
|
|
|
return addr, nil
|
|
}
|
|
|
|
func (cli *Client) processMessage(msg *msgbus.Message, opts *ReadOptions) (Message, error) {
|
|
var data []byte
|
|
|
|
defer func() {
|
|
if opts.PostHook != "" {
|
|
out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, cli.Env(opts.ExtraEnvs), opts.PostHook, bytes.NewBuffer(data))
|
|
if err != nil {
|
|
log.WithError(err).Debugf("error running post-hook %s", opts.PostHook)
|
|
}
|
|
log.Debugf("post-hook: %q", out)
|
|
}
|
|
}()
|
|
|
|
if opts.PreHook != "" {
|
|
out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, cli.Env(opts.ExtraEnvs), opts.PreHook, bytes.NewBuffer(msg.Payload))
|
|
if err != nil {
|
|
log.WithError(err).Debugf("error running pre-hook %s", opts.PreHook)
|
|
}
|
|
log.Debugf("pre-hook: %q", out)
|
|
}
|
|
|
|
unencrypted, senderKey, err := salty.Decrypt(cli.id.key, msg.Payload)
|
|
if err != nil {
|
|
return Message{}, fmt.Errorf("error decrypting message: %w", err)
|
|
}
|
|
data = unencrypted[:]
|
|
|
|
return Message{Text: string(data), Key: senderKey}, nil
|
|
}
|
|
|
|
func (cli *Client) messageHandler(ctx context.Context, opts *ReadOptions, msgs chan Message) msgbus.HandlerFunc {
|
|
return func(msg *msgbus.Message) error {
|
|
message, err := cli.processMessage(msg, opts)
|
|
if err != nil {
|
|
return fmt.Errorf("error processing message: %w", err)
|
|
}
|
|
cli.state.SetIndex(msg.Topic.Name, msg.ID)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
close(msgs)
|
|
return nil
|
|
case msgs <- message:
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Me returns our (self) address
|
|
func (cli *Client) Me() Addr { return cli.me }
|
|
|
|
// Key returns our (self) public key
|
|
func (cli *Client) Key() *keys.EdX25519PublicKey { return cli.id.key.PublicKey() }
|
|
|
|
// State returns the current state of the client
|
|
func (cli *Client) State() *State { return cli.state }
|
|
|
|
// Env sets up a sensible (and hopefully secure) environment for running pre and post hooks
|
|
// Extra environment variables are parsed from extraenvs and some default variables injected
|
|
// into the new environment such as PATH, PWD and HOME as well as the current user's Salty address
|
|
// (SALTY_USER) and their public key (SALTY_IDENTITY).
|
|
func (cli *Client) Env(extraenvs string) []string {
|
|
Path := DefaultEnvPath
|
|
GoPath := os.Getenv("GOPATH")
|
|
if GoPath != "" {
|
|
Path = fmt.Sprintf("%s/bin:%s", GoPath, Path)
|
|
}
|
|
|
|
env := []string{
|
|
fmt.Sprintf("PATH=%s", Path),
|
|
fmt.Sprintf("PWD=%s", os.Getenv("PWD")),
|
|
fmt.Sprintf("HOME=%s", os.Getenv("HOME")),
|
|
|
|
fmt.Sprintf("SALTY_USER=%s", cli.me.String()),
|
|
fmt.Sprintf("SALTY_IDENTITY=%s", cli.id.Source()),
|
|
}
|
|
|
|
for key, val := range parseExtraEnvs(extraenvs) {
|
|
log.Debugf("key: %q", key)
|
|
log.Debugf("val: %q", val)
|
|
val = os.ExpandEnv(val)
|
|
if val == "" {
|
|
val = os.Getenv(key)
|
|
}
|
|
if val != "" {
|
|
env = append(env, fmt.Sprintf("%s=%s", key, val))
|
|
}
|
|
}
|
|
|
|
log.Debugf("env: #%v", env)
|
|
|
|
return env
|
|
}
|
|
|
|
// Outbox returns the URL of our (self) outbox for sending copies of our outgoing messages to
|
|
// which is later used by the client as a way to track messages sent.
|
|
func (cli *Client) Outbox() *url.URL {
|
|
// use url struct copy to avoid modifying cli.me.Endpoint().Path
|
|
// https://github.com/golang/go/issues/38351
|
|
ep := *cli.me.Endpoint()
|
|
ep.Path = path.Join(
|
|
path.Dir(ep.Path),
|
|
fmt.Sprintf("%x", sha256.Sum256(cli.id.key.Private())),
|
|
)
|
|
return &ep
|
|
}
|
|
|
|
// OutboxAddr returns the address of our (self) outbox
|
|
func (cli *Client) OutboxAddr(to Addr) Addr {
|
|
return cli.me.WithEndpoint(cli.Outbox())
|
|
}
|
|
|
|
// OutboxClient returns a modified client for our (self) outbox
|
|
func (cli *Client) OutboxClient(to Addr) *Client {
|
|
if to == nil {
|
|
to = cli.me
|
|
}
|
|
|
|
me := to.WithEndpoint(cli.Outbox())
|
|
|
|
return &Client{
|
|
me: me,
|
|
id: &Identity{
|
|
addr: me,
|
|
key: cli.id.key,
|
|
},
|
|
cache: cli.cache,
|
|
state: cli.state,
|
|
lookup: cli.lookup,
|
|
send: cli.send,
|
|
}
|
|
}
|
|
|
|
// String implements the fmt.Stringer interface and outputs who we (self) are,
|
|
// what our endpoint is we're connected to (broker), our outbox and our public key.
|
|
func (cli *Client) String() string {
|
|
b := &bytes.Buffer{}
|
|
fmt.Fprintln(b, "Me: ", cli.me)
|
|
fmt.Fprintln(b, "Endpoint: ", cli.me.Endpoint())
|
|
fmt.Fprintln(b, "Outbox: ", cli.Outbox())
|
|
fmt.Fprintln(b, "Key: ", cli.id.key)
|
|
return b.String()
|
|
}
|
|
|
|
// SetLookup sets the internal lookup interface to use (Lookuper) for looking
|
|
// up Salty Addresses. By default the DirectLookup implementation is used.
|
|
func (cli *Client) SetLookup(lookup Lookuper) {
|
|
cli.lookup = lookup
|
|
}
|
|
|
|
// SetSend sets the internal send interface to use (Sender) for sending
|
|
// messages to endpoints. By default the DirectSend implementation is used.
|
|
func (cli *Client) SetSend(send Sender) {
|
|
cli.send = send
|
|
}
|
|
|
|
// ReadOptions allows a client to read from its inbox and provide additional options for processing
|
|
// messages such as extra environment variables for pre/post hooks.
|
|
type ReadOptions struct {
|
|
ExtraEnvs string
|
|
PreHook string
|
|
PostHook string
|
|
}
|
|
|
|
// ReadOption is a function that configures a client
|
|
type ReadOption func(opts *ReadOptions) error
|
|
|
|
// WithExtraEnvs sets extra environment variables for use by pre/post hooks
|
|
func WithExtraEnvs(extraenvs string) ReadOption {
|
|
return func(opts *ReadOptions) error {
|
|
opts.ExtraEnvs = extraenvs
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithPreHook sets the prehook used for processing incoming messages which is the path to a
|
|
// script that is passed the encrypted message payload as its standard input.
|
|
func WithPreHook(prehook string) ReadOption {
|
|
return func(opts *ReadOptions) error {
|
|
opts.PreHook = prehook
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithPostHook sets the posthook used for processing incoming messages which is the path to a
|
|
// script that is passed the decrypted message as its standard input.
|
|
func WithPostHook(posthook string) ReadOption {
|
|
return func(opts *ReadOptions) error {
|
|
opts.PostHook = posthook
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Read reads a single message from this user's inbox
|
|
func (cli *Client) Read(options ...ReadOption) (Message, error) {
|
|
if cli.me.Endpoint() == nil {
|
|
if err := cli.me.Refresh(); err != nil {
|
|
return Message{}, fmt.Errorf("unable to find your endpoint for %s: %w", cli.me.String(), err)
|
|
}
|
|
}
|
|
|
|
opts := &ReadOptions{}
|
|
for _, option := range options {
|
|
if err := option(opts); err != nil {
|
|
return Message{}, fmt.Errorf("error configuring read options: %w", err)
|
|
}
|
|
}
|
|
|
|
uri, inbox := SplitInbox(cli.me.Endpoint().String())
|
|
bus := msgbus_client.NewClient(uri, nil)
|
|
|
|
msg, err := bus.Pull(inbox)
|
|
if err != nil {
|
|
return Message{}, fmt.Errorf("error reading inbox: %w", err)
|
|
}
|
|
if msg == nil {
|
|
return Message{}, ErrNoMessages
|
|
}
|
|
|
|
return cli.processMessage(msg, opts)
|
|
}
|
|
|
|
// Subscribe subscribers to this user's inbox for new messages
|
|
func (cli *Client) Subscribe(ctx context.Context, options ...ReadOption) chan Message {
|
|
if cli.me.Endpoint() == nil {
|
|
return nil
|
|
}
|
|
|
|
opts := &ReadOptions{}
|
|
for _, option := range options {
|
|
if err := option(opts); err != nil {
|
|
log.WithError(err).Errorf("error configuring read options")
|
|
return nil
|
|
}
|
|
}
|
|
|
|
uri, inbox := SplitInbox(cli.me.Endpoint().String())
|
|
bus := msgbus_client.NewClient(uri, nil)
|
|
|
|
msgs := make(chan Message, 1)
|
|
index := cli.state.GetIndex(inbox) + 1 // +1 to skip over the last seen message
|
|
log.Debugf("streaming inbox %s from %d ...", inbox, index)
|
|
s := bus.Subscribe(inbox, index, cli.messageHandler(ctx, opts, msgs))
|
|
go s.Run(ctx)
|
|
|
|
log.Debugf("Connected to %s/%s", uri, inbox)
|
|
|
|
return msgs
|
|
}
|
|
|
|
// Lookup performs a lookup for a user's address and config
|
|
// If the user has an address already cached, the cached addr
|
|
// is returned, otherwise a full lookup is done.
|
|
func (cli *Client) Lookup(user string) (Addr, error) {
|
|
return cli.getAddr(user)
|
|
}
|
|
|
|
// Send sends an encrypted message to the specified user
|
|
func (cli *Client) Send(user, msg string) error {
|
|
if cli.me.Endpoint() == nil {
|
|
if err := cli.me.Refresh(); err != nil {
|
|
return fmt.Errorf("unable to find your endpoint for %s: %w", cli.me.String(), err)
|
|
}
|
|
}
|
|
|
|
addr, err := cli.getAddr(user)
|
|
if err != nil {
|
|
return fmt.Errorf("error looking up user %s: %w", user, err)
|
|
}
|
|
|
|
err = cli.SendToAddr(addr, msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return cli.OutboxClient(addr).SendToAddr(cli.OutboxAddr(addr), msg)
|
|
}
|
|
|
|
// SendToAddr encrypts and sends the message to a specified address
|
|
func (cli *Client) SendToAddr(addr Addr, msg string) error {
|
|
if cli.me == nil || cli.me.IsZero() {
|
|
return ErrNoSender
|
|
}
|
|
|
|
b, err := salty.Encrypt(cli.id.key, PackMessage(cli.me, msg), []string{addr.Key().ID().String()})
|
|
if err != nil {
|
|
return fmt.Errorf("error encrypting message to %s: %w", addr, err)
|
|
}
|
|
|
|
endpoint := addr.Endpoint().String()
|
|
log.Debugf("sending message to %s", endpoint)
|
|
if err := cli.send.Send(cli.id.key, endpoint, string(b), addr.Cap()); err != nil {
|
|
return fmt.Errorf("error sending message to %s: %w", addr, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Register sends a registration request to the service user of a Salty Broker
|
|
func (cli *Client) Register(brokerURI string) error {
|
|
if brokerURI == "" {
|
|
log.Debugf("Looking up SRV record for _salty._tcp.%s", cli.Me().Domain())
|
|
target, err := DefaultResolver.LookupSRV("salty", "tcp", cli.Me().Domain())
|
|
if err != nil {
|
|
return fmt.Errorf("unable to find broker for %s: %w", cli.Me(), err)
|
|
}
|
|
brokerURI = fmt.Sprintf("https://%s", target)
|
|
}
|
|
u, err := url.Parse(brokerURI)
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing broker uri %s: %w", brokerURI, err)
|
|
}
|
|
u.Path = "/api/v1/register"
|
|
|
|
req := RegisterRequest{Hash: cli.Me().Hash(), Key: cli.Key().ID().String()}
|
|
data, err := json.Marshal(req)
|
|
if err != nil {
|
|
return fmt.Errorf("error serializing register request: %w", err)
|
|
}
|
|
signed, err := salty.Sign(cli.id.key, data)
|
|
if err != nil {
|
|
return fmt.Errorf("error signing register request: %w", err)
|
|
}
|
|
|
|
_, err = Request(http.MethodPost, u.String(), nil, bytes.NewBuffer(signed))
|
|
if err != nil {
|
|
return fmt.Errorf("error registering address: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetAvatar creates or updates an avatar for a user with a broker
|
|
func (cli *Client) SetAvatar(content []byte) error {
|
|
// TODO: Verify cli.Me().Domain has valid SRV records
|
|
|
|
req := AvatarRequest{Addr: cli.Me(), Content: content}
|
|
data, err := json.Marshal(req)
|
|
if err != nil {
|
|
return fmt.Errorf("error serializing avatar request: %w", err)
|
|
}
|
|
signed, err := salty.Sign(cli.id.key, data)
|
|
if err != nil {
|
|
return fmt.Errorf("error signing avatar request: %w", err)
|
|
}
|
|
|
|
// TODO: Automatically work out the URI based on SRV lookups of the user's address
|
|
u := cli.Me().Endpoint()
|
|
u.Path = "/api/v1/avatar"
|
|
|
|
_, err = Request(http.MethodPost, u.String(), nil, bytes.NewBuffer(signed))
|
|
if err != nil {
|
|
return fmt.Errorf("error updating avatar: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Request makes a signed request to a broker's API.
|
|
func (cli *Client) Request(method, endpoint string, body []byte) ([]byte, error) {
|
|
u, err := url.Parse(endpoint)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing endpoint address %s: %w", endpoint, err)
|
|
}
|
|
|
|
res, err := SignedRequest(cli.id.key, method, u.String(), nil, bytes.NewBuffer(body))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error making %s request to %s: %w", method, u, err)
|
|
}
|
|
defer res.Body.Close()
|
|
|
|
return io.ReadAll(res.Body)
|
|
}
|