6
1
mirror of https://git.mills.io/saltyim/saltyim.git synced 2024-06-16 03:48:24 +00:00
prologic-saltyim/client.go
xuu 754fcc7323 feat: add compression negotiation for sent messages (#91)
feat: add compression negotiation for sent messages
fix: unix homedir handling

the service will negotiate a compression algo for sending messages
when a user chats someone during the auto discovery, the service returns an `Accept-Encoding: br, gzip, deflate`

the client saves that response and so when it makes POSTs of messages adds the best `Content-Encoding` and compresses the message

example:
```
>> GET /.well-known/salty/c765c69040d98f3af2181237f47ec01398d80f8ab2690fe929e4311ab05dec01.json

<< Accept-Encoding: br, gzip, deflate
<<
<< {"endpoint":"https://salty.home.arpa/inbox/01FZBR8Y2E6TH949JA3925WF71","key":"kex1wurry09ftqjuxgjl0jxmqypv4axqvzqljkgeadxjcpwtfuhcedcslck52d"}

>> POST /inbox/01FZBR8Y2E6TH949JA3925WF71
>> Content-Encoding: br
>>
>> [Brotli Compressed data]
```

this PR depends on https://git.mills.io/prologic/msgbus/pulls/24

Co-authored-by: Jon Lundy <jon@xuu.cc>
Reviewed-on: https://git.mills.io/saltyim/saltyim/pulls/91
Co-authored-by: xuu <xuu@noreply@mills.io>
Co-committed-by: xuu <xuu@noreply@mills.io>
2022-03-29 22:23:16 +00:00

373 lines
9.3 KiB
Go

package saltyim
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"os"
"strings"
"time"
"git.mills.io/prologic/msgbus"
msgbus_client "git.mills.io/prologic/msgbus/client"
"github.com/avast/retry-go"
"github.com/keys-pub/keys"
log "github.com/sirupsen/logrus"
"github.com/timewasted/go-accept-headers"
"go.mills.io/salty"
"go.mills.io/saltyim/internal/exec"
)
const (
DefaultEnvPath = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
ServiceUser = "salty"
)
var (
ErrNoMessages = errors.New("error: no messages found")
ErrClientNotConnected = errors.New("error: client not connected")
acceptEncodings = []string{"br", "gzip", ""}
)
type addrCache map[string]*Addr
type Message struct {
Text string
Key *keys.EdX25519PublicKey
}
// TODO: Support shell quoting and escapes?
func parseExtraEnvs(extraenvs string) map[string]string {
env := make(map[string]string)
for _, extraenv := range strings.Split(extraenvs, " ") {
tokens := strings.SplitN(extraenv, "=", 2)
switch len(tokens) {
case 1:
env[tokens[0]] = ""
case 2:
env[tokens[0]] = tokens[1]
}
}
return env
}
// PackMessage formts an outoing message in the Message Format
// <timestamp>\t(<sender>) <message>
func PackMessage(me *Addr, msg string) []byte {
return []byte(
fmt.Sprint(
time.Now().UTC().Format(time.RFC3339), "\t",
me.Formatted(), "\t",
strings.TrimSpace(msg), "\n",
),
)
}
// Send sends the encrypted message `msg` to the Endpoint `endpoint` using a
// `POST` request and returns nil on success or an error on failure.
func Send(endpoint, msg string, cap Capabilities) error {
headers := make(http.Header)
if cap.AcceptEncoding != "" {
ae, err := accept.Negotiate(cap.AcceptEncoding, acceptEncodings...)
if err != nil {
return fmt.Errorf("error publishing message to %s: %w", endpoint, err)
}
headers.Set("Content-Encoding", ae)
}
res, err := Request(http.MethodPost, endpoint, headers, bytes.NewBufferString(msg))
if err != nil {
return fmt.Errorf("error publishing message to %s: %w", endpoint, err)
}
defer res.Body.Close()
return nil
}
// Client is a Salty IM client that handles talking to a Salty IM Broker
// and Sedngina and Receiving messages to/from Salty IM Users.
type Client struct {
me *Addr
id *Identity
key *keys.EdX25519Key
cache addrCache
}
// NewClient reeturns a new Salty IM client for sending and receiving
// encrypted messages to other Salty IM users as well as decrypting
// and displaying messages of the user's own inbox.
func NewClient(me *Addr, options ...IdentityOption) (*Client, error) {
id, err := GetIdentity(options...)
if err != nil {
return nil, fmt.Errorf("error opening identity %s: %w", id.Source(), err)
}
if me == nil || me.IsZero() {
me = id.addr
}
if me == nil || me.IsZero() {
return nil, fmt.Errorf("unable to find your user addressn in %s", id.Source())
}
if err := me.Refresh(); err != nil {
log.WithError(err).Warn("error looking up user endpoint")
}
log.Debugf("Using identity %s with public key %s", id.Source(), id.key)
log.Debugf("Salty Addr is %s", me)
log.Debugf("Endpoint is %s", me.Endpoint())
return &Client{
me: me,
id: id,
key: id.key,
cache: make(addrCache),
}, nil
}
func (cli *Client) getAddr(user string) (*Addr, error) {
addr, ok := cli.cache[user]
if ok {
return addr, nil
}
addr, err := LookupAddr(user)
if err != nil {
return nil, fmt.Errorf("error: failed to lookup user %s: %w", user, err)
}
cli.cache[user] = addr
return addr, nil
}
func (cli *Client) processMessage(msg *msgbus.Message, extraenvs, prehook, posthook string) (Message, error) {
var data []byte
defer func() {
if posthook != "" {
out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, cli.Env(extraenvs), posthook, bytes.NewBuffer(data))
if err != nil {
log.WithError(err).Debugf("error running post-hook %s", posthook)
}
log.Debugf("post-hook: %q", out)
}
}()
if prehook != "" {
out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, cli.Env(extraenvs), prehook, bytes.NewBuffer(msg.Payload))
if err != nil {
log.WithError(err).Debugf("error running pre-hook %s", prehook)
}
log.Debugf("pre-hook: %q", out)
}
unencrypted, senderKey, err := salty.Decrypt(cli.key, msg.Payload)
if err != nil {
return Message{}, fmt.Errorf("error decrypting message: %w", err)
}
data = unencrypted[:]
return Message{Text: string(data), Key: senderKey}, nil
}
func (cli *Client) messageHandler(extraenvs, prehook, posthook string, msgs chan Message) msgbus.HandlerFunc {
return func(msg *msgbus.Message) error {
message, err := cli.processMessage(msg, extraenvs, prehook, posthook)
if err != nil {
return fmt.Errorf("error processing message: %w", err)
}
msgs <- message
return nil
}
}
func (cli *Client) Me() *Addr { return cli.me }
func (cli *Client) Key() *keys.EdX25519PublicKey { return cli.key.PublicKey() }
func (cli *Client) Env(extraenvs string) []string {
Path := DefaultEnvPath
GoPath := os.Getenv("GOPATH")
if GoPath != "" {
Path = fmt.Sprintf("%s/bin:%s", GoPath, Path)
}
env := []string{
fmt.Sprintf("PATH=%s", Path),
fmt.Sprintf("PWD=%s", os.Getenv("PWD")),
fmt.Sprintf("HOME=%s", os.Getenv("HOME")),
fmt.Sprintf("SALTY_USER=%s", cli.me.String()),
fmt.Sprintf("SALTY_IDENTITY=%s", cli.id.Source()),
}
for key, val := range parseExtraEnvs(extraenvs) {
log.Debugf("key: %q", key)
log.Debugf("val: %q", val)
val = os.ExpandEnv(val)
if val == "" {
val = os.Getenv(key)
}
if val != "" {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
}
log.Debugf("env: #%v", env)
return env
}
func (cli *Client) String() string {
b := &bytes.Buffer{}
fmt.Fprintln(b, "Me: ", cli.me)
fmt.Fprintln(b, "Endpoint: ", cli.me.Endpoint())
fmt.Fprintln(b, "Key: ", cli.key)
return b.String()
}
// Drain drains this user's inbox by simulteneiously reading until empty anda
// subscribing to the inbox for new messages.
func (cli *Client) Drain(ctx context.Context, extraenvs, prehook, posthook string) chan Message {
msgs := make(chan Message)
go func() {
for {
msg, err := cli.Read(extraenvs, prehook, posthook)
if err != nil {
log.Debugf("err: #%v", err)
switch err {
case ErrNoMessages:
log.Debug("no more messages, existing readloop...")
return
case ErrClientNotConnected:
log.Debug("client not connected, existing readloop...")
return
default:
log.WithError(err).Warn("error reading inbox")
}
} else {
msgs <- msg
}
time.Sleep(time.Millisecond * 100)
}
}()
go func() {
msgCh := cli.Subscribe(ctx, extraenvs, prehook, posthook)
for {
select {
case msg := <-msgCh:
msgs <- msg
case <-ctx.Done():
close(msgs)
return
}
}
}()
return msgs
}
// Read reads a single message from this user's inbox
func (cli *Client) Read(extraenvs, prehook, posthook string) (Message, error) {
if cli.me.Endpoint() == nil {
return Message{}, ErrClientNotConnected
}
uri, inbox := SplitInbox(cli.me.Endpoint().String())
bus := msgbus_client.NewClient(uri, nil)
msg, err := bus.Pull(inbox)
if err != nil {
return Message{}, fmt.Errorf("error reading inbox: %w", err)
}
if msg == nil {
return Message{}, ErrNoMessages
}
return cli.processMessage(msg, extraenvs, prehook, posthook)
}
// Subscribe subscribers to this user's inbox for new messages
func (cli *Client) Subscribe(ctx context.Context, extraenvs, prehook, posthook string) chan Message {
if cli.me.Endpoint() == nil {
return nil
}
uri, inbox := SplitInbox(cli.me.Endpoint().String())
bus := msgbus_client.NewClient(uri, nil)
msgs := make(chan Message)
s := bus.Subscribe(inbox, cli.messageHandler(extraenvs, prehook, posthook, msgs))
s.Start()
log.Debugf("Connected to %s/%s", uri, inbox)
go func() {
<-ctx.Done()
s.Stop()
close(msgs)
}()
return msgs
}
// Lookup performs a lookup for a user's address and config
// If the user has an address already cached, the cached addr
// is returned, otherwise a full lookup is done.
func (cli *Client) Lookup(user string) (*Addr, error) {
return cli.getAddr(user)
}
// Send sends an encrypted message to the specified user
func (cli *Client) Send(user, msg string) error {
addr, err := cli.getAddr(user)
if err != nil {
return fmt.Errorf("error looking up user %s: %w", user, err)
}
return cli.SendToAddr(addr, msg)
}
func (cli *Client) SendToAddr(addr *Addr, msg string) error {
b, err := salty.Encrypt(cli.key, PackMessage(cli.me, msg), []string{addr.key.ID().String()})
if err != nil {
return fmt.Errorf("error encrypting message to %s: %w", addr, err)
}
if err := Send(addr.Endpoint().String(), string(b), addr.Cap()); err != nil {
return fmt.Errorf("error sending message to %s: %w", addr, err)
}
return nil
}
// Register sends a registration request to the service user of a Salty Broker
func (cli *Client) Register() error {
svc, err := LookupAddr(fmt.Sprintf("%s@%s", ServiceUser, cli.Me().Domain))
if err != nil {
return fmt.Errorf("error looking up service user on %s: %w", cli.Me().Domain, err)
}
if err := cli.SendToAddr(svc, "REGISTER"); err != nil {
return fmt.Errorf("error sending REGISTER to %s: %w", svc, err)
}
if err := retry.Do(
func() error {
return cli.Me().Refresh()
},
); err != nil {
return fmt.Errorf("error registrating account: %w", err)
}
return nil
}