6
1
mirror of https://git.mills.io/saltyim/saltyim.git synced 2024-06-16 11:58:24 +00:00

Add support for indexing and persisting inbox index state (#121)

TODO:

- [ ] Remove `replace` directive after testing...

Co-authored-by: James Mills <prologic@shortcircuit.net.au>
Reviewed-on: https://git.mills.io/saltyim/saltyim/pulls/121
This commit is contained in:
James Mills 2022-04-02 14:15:46 +00:00
parent 3fccb3ae5f
commit 6a92c9d30a
15 changed files with 222 additions and 89 deletions

107
client.go

@ -30,8 +30,9 @@ const (
)
var (
ErrNoMessages = errors.New("error: no messages found")
ErrClientNotConnected = errors.New("error: client not connected")
ErrNoMessages = errors.New("error: no messages found")
ErrNotConnected = errors.New("error: client not connected")
ErrMissingIdentity = errors.New("error: missing identity")
)
type addrCache map[string]*Addr
@ -75,45 +76,58 @@ type Client struct {
id *Identity
key *keys.EdX25519Key
cache addrCache
state *State
lookup Lookuper
send Sender
}
// ClientOption is a function that configures a client
type ClientOption func(cli *Client) error
// NewClient reeturns a new Salty IM client for sending and receiving
// encrypted messages to other Salty IM users as well as decrypting
// and displaying messages of the user's own inbox.
func NewClient(me *Addr, options ...IdentityOption) (*Client, error) {
id, err := GetIdentity(options...)
if err != nil {
return nil, fmt.Errorf("error opening identity %s: %w", id.Source(), err)
func NewClient(me *Addr, options ...ClientOption) (*Client, error) {
cli := &Client{
me: me,
cache: make(addrCache),
lookup: &DirectLookup{},
send: &DirectSend{},
}
if me == nil || me.IsZero() {
me = id.addr
for _, option := range options {
if err := option(cli); err != nil {
return nil, fmt.Errorf("error configuring client: %w", err)
}
}
if cli.state == nil {
log.Warn("no state loaded, using an empty state")
cli.state = NewState()
}
if cli.id == nil || cli.id.key == nil {
return nil, ErrMissingIdentity
}
if me == nil || me.IsZero() {
return nil, fmt.Errorf("unable to find your user addressn in %s", id.Source())
me = cli.id.addr
}
if me == nil || me.IsZero() {
return nil, fmt.Errorf("unable to find your user addressn in %s", cli.id.Source())
}
if err := me.Refresh(); err != nil {
log.WithError(err).Warn("error looking up user endpoint")
}
log.Debugf("Using identity %s with public key %s", id.Source(), id.key)
log.Debugf("Salty Addr is %s", me)
log.Debugf("Endpoint is %s", me.Endpoint())
log.Debugf("Using identity %s with public key %s", cli.id.Source(), cli.id.key)
log.Debugf("Salty Addr is %s", cli.me)
log.Debugf("Endpoint is %s", cli.me.Endpoint())
return &Client{
me: me,
id: id,
key: id.key,
cache: make(addrCache),
lookup: &DirectLookup{},
send: &DirectSend{},
}, nil
return cli, nil
}
func (cli *Client) getAddr(user string) (*Addr, error) {
@ -168,6 +182,7 @@ func (cli *Client) messageHandler(extraenvs, prehook, posthook string, msgs chan
if err != nil {
return fmt.Errorf("error processing message: %w", err)
}
cli.state.SetIndex(msg.Topic.Name, msg.ID)
msgs <- message
@ -177,6 +192,7 @@ func (cli *Client) messageHandler(extraenvs, prehook, posthook string, msgs chan
func (cli *Client) Me() *Addr { return cli.me }
func (cli *Client) Key() *keys.EdX25519PublicKey { return cli.key.PublicKey() }
func (cli *Client) State() *State { return cli.state }
func (cli *Client) Env(extraenvs string) []string {
Path := DefaultEnvPath
@ -252,53 +268,10 @@ func (cli *Client) SetSend(send Sender) {
cli.send = send
}
// Drain drains this user's inbox by simulteneiously reading until empty anda
// subscribing to the inbox for new messages.
func (cli *Client) Drain(ctx context.Context, extraenvs, prehook, posthook string) chan Message {
msgs := make(chan Message)
go func() {
for {
msg, err := cli.Read(extraenvs, prehook, posthook)
if err != nil {
log.Debugf("err: #%v", err)
switch err {
case ErrNoMessages:
log.Debug("no more messages, existing readloop...")
return
case ErrClientNotConnected:
log.Debug("client not connected, existing readloop...")
return
default:
log.WithError(err).Warn("error reading inbox")
}
} else {
msgs <- msg
}
time.Sleep(time.Millisecond * 100)
}
}()
go func() {
msgCh := cli.Subscribe(ctx, extraenvs, prehook, posthook)
for {
select {
case msg := <-msgCh:
msgs <- msg
case <-ctx.Done():
close(msgs)
return
}
}
}()
return msgs
}
// Read reads a single message from this user's inbox
func (cli *Client) Read(extraenvs, prehook, posthook string) (Message, error) {
if cli.me.Endpoint() == nil {
return Message{}, ErrClientNotConnected
return Message{}, ErrNotConnected
}
uri, inbox := SplitInbox(cli.me.Endpoint().String())
@ -325,7 +298,7 @@ func (cli *Client) Subscribe(ctx context.Context, extraenvs, prehook, posthook s
bus := msgbus_client.NewClient(uri, nil)
msgs := make(chan Message)
s := bus.Subscribe(inbox, cli.messageHandler(extraenvs, prehook, posthook, msgs))
s := bus.Subscribe(inbox, cli.state.GetIndex(inbox), cli.messageHandler(extraenvs, prehook, posthook, msgs))
s.Start()
log.Debugf("Connected to %s/%s", uri, inbox)

@ -46,7 +46,7 @@ func init() {
}
func setavatar(me *saltyim.Addr, identity, fn string) {
cli, err := saltyim.NewClient(me, saltyim.WithIdentityPath(identity))
cli, err := saltyim.NewClient(me, saltyim.WithClientIdentity(saltyim.WithIdentityPath(identity)))
if err != nil {
fmt.Fprintf(os.Stderr, "error initializing client: %s\n", err)
os.Exit(2)

@ -5,6 +5,7 @@ import (
"os"
"strings"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -23,6 +24,7 @@ messages to the user via their discovered endpoint.`,
Run: func(cmd *cobra.Command, args []string) {
user := viper.GetString("user")
identity := viper.GetString("identity")
state := viper.GetString("state")
var profiles []profile
viper.UnmarshalKey("profiles", &profiles)
@ -39,7 +41,7 @@ messages to the user via their discovered endpoint.`,
}
// XXX: What if me.IsZero()
chat(me, identity, args[0])
chat(me, identity, state, args[0])
},
}
@ -47,12 +49,20 @@ func init() {
rootCmd.AddCommand(chatCmd)
}
func chat(me *saltyim.Addr, identity, user string) {
cli, err := saltyim.NewClient(me, saltyim.WithIdentityPath(identity))
func chat(me *saltyim.Addr, identity, state, user string) {
cli, err := saltyim.NewClient(me,
saltyim.WithStateFromFile(state),
saltyim.WithClientIdentity(saltyim.WithIdentityPath(identity)),
)
if err != nil {
fmt.Fprintf(os.Stderr, "error initializing client: %s\n", err)
os.Exit(2)
}
defer func() {
if err := cli.State().Save(state); err != nil {
log.WithError(err).Warnf("error saving state: %s", state)
}
}()
// Set terminal title
tui.SetTerminalTitle("Salty IM with %s", user)
@ -62,13 +72,13 @@ func chat(me *saltyim.Addr, identity, user string) {
outCh := make(chan string)
// Initialize ui.
c, err := tui.NewChatTUI(cli, user)
ui, err := tui.NewChatTUI(cli, user)
if err != nil {
fmt.Fprintf(os.Stderr, "error creating chat: %s\n", err)
os.Exit(2)
}
// Run the ui loop
go c.RunChat(inCh, outCh)
c.SetScreen(inCh, outCh)
go ui.RunChat(inCh, outCh)
ui.SetScreen(inCh, outCh)
}

@ -24,6 +24,7 @@ not specified defaults to the local user ($USER)`,
Run: func(cmd *cobra.Command, args []string) {
user := viper.GetString("user")
identity := viper.GetString("identity")
state := viper.GetString("state")
var profiles []profile
viper.UnmarshalKey("profiles", &profiles)
@ -60,7 +61,7 @@ not specified defaults to the local user ($USER)`,
log.Fatal("error getting --post-hook flag")
}
read(me, identity, follow, extraenvs, prehook, posthook, args...)
read(me, identity, state, follow, extraenvs, prehook, posthook, args...)
},
}
@ -93,8 +94,11 @@ func init() {
)
}
func read(me *saltyim.Addr, identity string, follow bool, extraenvs, prehook, posthook string, args ...string) {
cli, err := saltyim.NewClient(me, saltyim.WithIdentityPath(identity))
func read(me *saltyim.Addr, identity, state string, follow bool, extraenvs, prehook, posthook string, args ...string) {
cli, err := saltyim.NewClient(me,
saltyim.WithStateFromFile(state),
saltyim.WithClientIdentity(saltyim.WithIdentityPath(identity)),
)
if err != nil {
fmt.Fprintf(os.Stderr, "error initializing client: %s\n", err)
os.Exit(2)
@ -112,7 +116,7 @@ func read(me *saltyim.Addr, identity string, follow bool, extraenvs, prehook, po
}()
if follow {
for msg := range cli.Drain(ctx, extraenvs, prehook, posthook) {
for msg := range cli.Subscribe(ctx, extraenvs, prehook, posthook) {
if isatty.IsTerminal(os.Stdout.Fd()) {
fmt.Println(saltyim.FormatMessage(msg.Text))
} else {

@ -57,7 +57,7 @@ func register(me *saltyim.Addr, identity string) {
os.Exit(2)
}
cli, err := saltyim.NewClient(me, saltyim.WithIdentity(id))
cli, err := saltyim.NewClient(me, saltyim.WithClientIdentity(saltyim.WithIdentity(id)))
if err != nil {
fmt.Fprintf(os.Stderr, "error initializing client: %s\n", err)
os.Exit(2)

@ -77,6 +77,11 @@ func init() {
"Use the identity file at PATH",
)
rootCmd.PersistentFlags().StringP(
"state", "s", saltyim.DefaultState(),
"Use the state file at PATH",
)
viper.BindPFlag("debug", rootCmd.PersistentFlags().Lookup("debug"))
viper.SetDefault("debug", false)
@ -86,6 +91,9 @@ func init() {
viper.BindPFlag("identity", rootCmd.PersistentFlags().Lookup("identity"))
viper.SetDefault("identity", saltyim.DefaultIdentity())
viper.BindPFlag("state", rootCmd.PersistentFlags().Lookup("state"))
viper.SetDefault("state", saltyim.DefaultState())
viper.BindPFlag("pre-hook", rootCmd.PersistentFlags().Lookup("pre-hook"))
viper.SetDefault("pre-hook", "")

@ -68,7 +68,7 @@ func send(me *saltyim.Addr, identity, user string, args ...string) {
os.Exit(2)
}
cli, err := saltyim.NewClient(me, saltyim.WithIdentityPath(identity))
cli, err := saltyim.NewClient(me, saltyim.WithClientIdentity(saltyim.WithIdentityPath(identity)))
if err != nil {
fmt.Fprintf(os.Stderr, "error initializing client: %s\n", err)
os.Exit(2)

4
go.mod

@ -56,7 +56,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/petermattis/goid v0.0.0-20220302125637-5f11c28912df // indirect
github.com/petermattis/goid v0.0.0-20220331194723-8ee3e6ded87a // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/plar/go-adaptive-radix-tree v1.0.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
@ -87,7 +87,7 @@ require (
require (
git.mills.io/prologic/bitcask v1.0.2
git.mills.io/prologic/msgbus v0.1.13-0.20220329220338-7181b6df1bd6
git.mills.io/prologic/msgbus v0.1.13-0.20220402140515-7b71102aa813
git.mills.io/prologic/observe v0.0.0-20210712230028-fc31c7aa2bd1
git.mills.io/prologic/useragent v0.0.0-20210714100044-d249fe7921a0
github.com/NYTimes/gziphandler v1.1.1

5
go.sum

@ -44,6 +44,8 @@ git.mills.io/prologic/bitcask v1.0.2 h1:Iy9x3mVVd1fB+SWY0LTmsSDPGbzMrd7zCZPKbsb/
git.mills.io/prologic/bitcask v1.0.2/go.mod h1:ppXpR3haeYrijyJDleAkSGH3p90w6sIHxEA/7UHMxH4=
git.mills.io/prologic/msgbus v0.1.13-0.20220329220338-7181b6df1bd6 h1:9Ci4a+yqtRdnj8JitXaRGntxeAkdFe+NltnR2ehl4vo=
git.mills.io/prologic/msgbus v0.1.13-0.20220329220338-7181b6df1bd6/go.mod h1:UyiNBmWbpsq7mtO+FHWoGwRiccPcT+EJGqT/idm/lfo=
git.mills.io/prologic/msgbus v0.1.13-0.20220402140515-7b71102aa813 h1:lMjKwVDctfeqnxMLHm/PQ3kngbedn6indnU6noInYWk=
git.mills.io/prologic/msgbus v0.1.13-0.20220402140515-7b71102aa813/go.mod h1:LitIrrXM81t/9+UNl0WN9B9lCeIfkOrf/Fee7CLf+7A=
git.mills.io/prologic/observe v0.0.0-20210712230028-fc31c7aa2bd1 h1:e6ZyAOFGLZJZYL2galNvfuNMqeQDdilmQ5WRBXCNL5s=
git.mills.io/prologic/observe v0.0.0-20210712230028-fc31c7aa2bd1/go.mod h1:/rNXqsTHGrilgNJYH/8wsIRDScyxXUhpbSdNbBatAKY=
git.mills.io/prologic/useragent v0.0.0-20210714100044-d249fe7921a0 h1:MojWEgZyiugUbgyjydrdSAkHlADnbt90dXyURRYFzQ4=
@ -82,6 +84,7 @@ github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqO
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
@ -401,6 +404,8 @@ github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/petermattis/goid v0.0.0-20220302125637-5f11c28912df h1:/B1Q9E4W1cmiwPQfC2vymWL7FXHCEsUzg8Rywl5avtQ=
github.com/petermattis/goid v0.0.0-20220302125637-5f11c28912df/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
github.com/petermattis/goid v0.0.0-20220331194723-8ee3e6ded87a h1:VXRRto5GMJPNfB7MNbUVoFhtxwoYjBEsIt/NpWg42U0=
github.com/petermattis/goid v0.0.0-20220331194723-8ee3e6ded87a/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

@ -135,7 +135,7 @@ func (c *Configuration) registerIdentity() func(button app.HTMLButton) {
}
// not using client since that's not setup until we have an identity, might break the existing
// flow
registerClient, err := saltyim.NewClient(identity.Addr(), saltyim.WithIdentityBytes(identity.Contents()))
registerClient, err := saltyim.NewClient(identity.Addr(), saltyim.WithClientIdentity(saltyim.WithIdentityBytes(identity.Contents())))
if err != nil { // TODO: pop dialog
log.Println("error", err)
return

@ -102,7 +102,7 @@ func (h *SaltyChat) connect(ctx app.Context) {
return
}
newClient, err := saltyim.NewClient(identity.Addr(), saltyim.WithIdentityBytes(identity.Contents()))
newClient, err := saltyim.NewClient(identity.Addr(), saltyim.WithClientIdentity(saltyim.WithIdentityBytes(identity.Contents())))
if err != nil {
h.dialog.ShowDialog("error setting up client", err.Error())
return
@ -113,7 +113,7 @@ func (h *SaltyChat) connect(ctx app.Context) {
client = newClient
ctx.Async(func() {
for msg := range client.Drain(context.Background(), "", "", "") {
for msg := range client.Subscribe(context.Background(), "", "", "") {
// passing both the message and the text in case we need the message key at some point
ctx.NewActionWithValue(saltyChatMessageAction, msg, app.T("text", msg.Text))
}

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:715314606d8f94c1c3d1ea181008298cb970fec7a273eedaa11865dc28ca096e
size 28479770
oid sha256:d51becd13da593ccc8a8313bceba4a47183c0d0df73f4d84076115002a4526a7
size 28571126

52
options.go Normal file

@ -0,0 +1,52 @@
package saltyim
import (
"bytes"
"fmt"
"os"
)
// WithClientIdentity sets the client's identity
func WithClientIdentity(options ...IdentityOption) ClientOption {
return func(cli *Client) error {
id, err := GetIdentity(options...)
if err != nil {
return fmt.Errorf("error loading identity: %w", err)
}
cli.id = id
cli.key = id.key
return nil
}
}
// WithStateFromFile sets the client's state from a file on disk
func WithStateFromFile(fn string) ClientOption {
return func(cli *Client) error {
f, err := os.Open(fn)
if err != nil {
return fmt.Errorf("error opening state file %s for reading: %w", fn, err)
}
defer f.Close()
s, err := LoadState(f)
if err != nil {
return fmt.Errorf("error loading state: %w", err)
}
cli.state = s
return nil
}
}
// WithStateFromBytes sets the client's state from a byte array
func WithStateFromBytes(data []byte) ClientOption {
return func(cli *Client) error {
s, err := LoadState(bytes.NewBuffer(data))
if err != nil {
return fmt.Errorf("error loading state: %w", err)
}
cli.state = s
return nil
}
}

@ -76,7 +76,7 @@ func (svc *Service) Run(ctx context.Context) error {
// create the service user's client in a loop until successful
// TODO: Should this timeout? Use a context?
if err := retry.Do(func() error {
cli, err := NewClient(svc.me, WithIdentity(svc.id))
cli, err := NewClient(svc.me, WithClientIdentity(WithIdentity(svc.id)))
if err != nil {
return err
}
@ -96,7 +96,7 @@ func (svc *Service) Run(ctx context.Context) error {
log.Debugf("listening for service requests as %s", svc.me)
msgch := svc.cli.Drain(ctx, "", "", "")
msgch := svc.cli.Subscribe(ctx, "", "", "")
for {
select {

81
state.go Normal file

@ -0,0 +1,81 @@
package saltyim
import (
"encoding/json"
"fmt"
"io"
"os"
sync "github.com/sasha-s/go-deadlock"
)
// DefaultState returns a default state file
func DefaultState() string {
return os.ExpandEnv("$HOME/.config/salty/state.json")
}
type State struct {
sync.RWMutex
Indicies map[string]int
}
func NewState() *State {
return &State{
Indicies: make(map[string]int),
}
}
func (s *State) GetIndex(name string) int {
s.RLock()
defer s.RUnlock()
return s.Indicies[name]
}
func (s *State) SetIndex(name string, index int) {
s.Lock()
defer s.Unlock()
s.Indicies[name] = index
}
func (s *State) Bytes() ([]byte, error) {
s.Lock()
defer s.Unlock()
data, err := json.Marshal(s)
if err != nil {
return nil, fmt.Errorf("error encoding state: %w", err)
}
return data, nil
}
func (s *State) Save(fn string) error {
data, err := s.Bytes()
if err != nil {
return err
}
if err := os.WriteFile(fn, data, 0644); err != nil {
return fmt.Errorf("error writing state file %s: %w", fn, err)
}
return nil
}
func LoadState(r io.Reader) (*State, error) {
var state *State
data, err := io.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("error reading state: %w", err)
}
if err := json.Unmarshal(data, &state); err != nil {
return nil, fmt.Errorf("error reading state: %w", err)
}
return state, nil
}