From 38a6d71644d053b75269b690dad205bf4c76d57f Mon Sep 17 00:00:00 2001 From: xuu Date: Sat, 26 Mar 2022 14:43:05 +0000 Subject: [PATCH] xuu/bot (#64) Co-authored-by: Jon Lundy Co-authored-by: James Mills Reviewed-on: https://git.mills.io/saltyim/saltyim/pulls/64 Co-authored-by: xuu Co-committed-by: xuu --- .gitignore | 3 + bin/salty-chat.sh | 81 +++++++++---- client.go | 56 +++++++-- cmd/salty-chat/makeuser.go | 2 +- cmd/salty-chat/read.go | 2 +- cmd/saltyd/main.go | 9 +- ...631c181f4156f0edcde5cffa25b347c7ceda8.json | 1 - identity.go | 8 +- internal/api.go | 2 +- internal/config.go | 7 +- internal/options.go | 15 ++- internal/pwa/components/saltychat.go | 2 +- internal/server.go | 69 +++++++++-- internal/tasks.go | 4 +- internal/tui/tui.go | 3 +- service.go | 109 ++++++++++++++++++ types.go | 2 +- 17 files changed, 317 insertions(+), 58 deletions(-) delete mode 100644 data/.well-known/salty/d3d52221e8da5a8ae012f4e2db0631c181f4156f0edcde5cffa25b347c7ceda8.json create mode 100644 service.go diff --git a/.gitignore b/.gitignore index 2e814ef..7994ac7 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ /salty-chat /cmd/saltyd/saltyd /cmd/salty-chat/salty-chat + +/data/*.key +/data/.wellt-known diff --git a/bin/salty-chat.sh b/bin/salty-chat.sh index 3210487..dc5335a 100755 --- a/bin/salty-chat.sh +++ b/bin/salty-chat.sh @@ -147,16 +147,14 @@ lookup () { readmsgs () { topic="$1" - if [ -z "$topic" ]; then - me="$(get_user)" - topic="$(echo "$me" | awk -F@ '{ print $1 }')" - fi + salty_json="$(mktemp /tmp/salty.XXXXXX)" + lookup "$user" > "$salty_json" + endpoint="$(jq -r '.endpoint' < "$salty_json")" + key="$(jq -r '.key' < "$salty_json")" + rm "$salty_json" - export SALTY_IDENTITY="$data_path/$topic.key" - if [ ! -f "$SALTY_IDENTITY" ]; then - echo "identity file missing for user $topic" >&2 - return 1 - fi + export MSGBUS_URI=$(dirname "$endpoint") + topic=$(basename "$endpoint") msgbus sub "$topic" "$0" } @@ -188,12 +186,6 @@ sendmsg () { endpoint="$(jq -r '.endpoint' < "$salty_json")" key="$(jq -r '.key' < "$salty_json")" - # XXX: Deprecated as of Spec v1.1 - # XXX: There is only an Endpoint - if topic="$(jq -e -r '.topic' < "$salty_json")"; then - endpoint="$endpoint/$topic" - fi - rm "$salty_json" me="$(get_user)" @@ -224,12 +216,6 @@ chatwith() { endpoint="$(jq -r '.endpoint' < "$salty_json")" key="$(jq -r '.key' < "$salty_json")" - # XXX: Deprecated as of Spec v1.1 - # XXX: There is only an Endpoint - if topic="$(jq -e -r '.topic' < "$salty_json")"; then - endpoint="$endpoint/$topic" - fi - rm "$salty_json" me="$(get_user)" @@ -240,8 +226,8 @@ chatwith() { export SALTY_CHATKEY="$key" echo chat with "$SALTY_CHATWITH" via "$endpoint" key "$key" - readmsgs "$nick" & - READ_PID=$$ + readmsgs & + READ_PID=$! trap 'kill $READ_PID' EXIT while true; do @@ -302,6 +288,51 @@ Create this file in your webserver well-known folder. https://hostname.tld/.well EOF } +register () { + mkdir -p "$data_path" + + if [ $# -lt 1 ]; then + printf "usage: %s register nick@domain\n" "$0" + exit 1 + else + user=$1 + fi + + nick="$(echo "$user" | awk -F@ '{ print $1 }')" + domain="$(echo "$user" | awk -F@ '{ print $2 }')" + + discovery_host="$(dig +short SRV _salty._tcp."$domain" | cut -f 4 -d' ')" + if [ -z "$discovery_host" ]; then + discovery_host="$domain" + fi + + identity_file="$data_path/$nick.key" + + if [ -f "$identity_file" ]; then + printf "user key already exists!" + return 1 + fi + + # Check for msgbus env.. probably can make it fallback to looking for a config file? + if [ -z "$MSGBUS_URI" ]; then + printf "missing MSGBUS_URI in environment" + return 1 + fi + + salty-keygen -o "$identity_file" + echo "# user: $user" >> "$identity_file" + + pubkey=$(grep key: "$identity_file" | awk '{print $4}') + + export SALTY_IDENTITY="$identity_file" + msgbus sub "$pubkey" "$0" & + pid="$!" + + sendmsg "salty@$discovery_host" REGISTER + sleep 1 + kill "$pid" +} + show_help() { printf "Usage: %s [options] [arguments]\n" "$(basename "$0")" printf "\n" @@ -317,6 +348,7 @@ show_help() { printf " make-user -- Generate a new user key pair\n" printf " read -- Reads your messages\n" printf " send -- Sends a message to nick@domain\n" + printf " register -- Sends a register message to a broker bot\n" } # check if streaming @@ -357,4 +389,7 @@ case $CMD in make-user) make_user "$@" ;; + register) + register "$@" + ;; esac diff --git a/client.go b/client.go index b4db861..b9a3e20 100644 --- a/client.go +++ b/client.go @@ -3,6 +3,7 @@ package saltyim import ( "bytes" "context" + "errors" "fmt" "net/http" "os" @@ -45,7 +46,15 @@ type Client struct { cache addrCache } -// NewClient returns a new Salty IM client for sending and receiving +func (c *Client) String() string { + b := &bytes.Buffer{} + fmt.Fprintln(b, "Me: ", c.me) + fmt.Fprintln(b, "Endpoint: ", c.me.Endpoint()) + fmt.Fprintln(b, "Key: ", c.key) + return b.String() +} + +// NewClient reeturns a new Salty IM client for sending and receiving // encrypted messages to other Salty IM users as well as decrypting // and displaying messages of the user's own inbox. func NewClient(me *Addr, options ...IdentityOption) (*Client, error) { @@ -76,6 +85,27 @@ func NewClient(me *Addr, options ...IdentityOption) (*Client, error) { }, nil } +// CreateOrLoadClient creates a Client by creating or loading an existing identity +// from the given identity file and name of the client's user address +func CreateOrLoadBotClient(fn, name string) (*Client, error) { + me, err := ParseAddr(name) + if err != nil { + return nil, err + } + if _, err := os.Stat(fn); err == nil { + return NewClient(me, WithIdentityPath(fn)) + } else if !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("failed to load client: %w", err) + } + + identity, err := CreateIdentity(WithIdentityPath(fn), WithIdentityAddr(me)) + if err != nil { + return nil, fmt.Errorf("failed to create identity: %w", err) + } + + return NewClient(me, WithIdentityBytes(identity.Contents())) +} + func (cli *Client) getAddr(user string) (*Addr, error) { addr, ok := cli.cache[user] if ok { @@ -92,7 +122,12 @@ func (cli *Client) getAddr(user string) (*Addr, error) { return addr, nil } -func (cli *Client) handleMessage(prehook, posthook string, msgs chan string) msgbus.HandlerFunc { +type Message struct { + Text string + Key *keys.EdX25519PublicKey +} + +func (cli *Client) handleMessage(prehook, posthook string, msgs chan Message) msgbus.HandlerFunc { return func(msg *msgbus.Message) error { if prehook != "" { out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, prehook, bytes.NewBuffer(msg.Payload)) @@ -102,13 +137,13 @@ func (cli *Client) handleMessage(prehook, posthook string, msgs chan string) msg log.Debugf("pre-hook: %q", out) } - data, _, err := salty.Decrypt(cli.key, msg.Payload) + data, senderKey, err := salty.Decrypt(cli.key, msg.Payload) if err != nil { fmt.Fprintf(os.Stderr, "error decrypting message") return err } - msgs <- string(data) + msgs <- Message{Text: string(data), Key: senderKey} if posthook != "" { out, err := exec.RunCmd(exec.DefaultRunCmdTimeout, posthook, bytes.NewBuffer(data)) @@ -126,11 +161,11 @@ func (cli *Client) Me() *Addr { return cli.me } func (cli *Client) Key() *keys.EdX25519PublicKey { return cli.key.PublicKey() } // Read subscribers to this user's inbox for new messages -func (cli *Client) Read(ctx context.Context, prehook, posthook string) chan string { +func (cli *Client) Read(ctx context.Context, prehook, posthook string) chan Message { uri, inbox := SplitInbox(cli.me.Endpoint().String()) bus := msgbus_client.NewClient(uri, nil) - msgs := make(chan string) + msgs := make(chan Message) s := bus.Subscribe(inbox, cli.handleMessage(prehook, posthook, msgs)) s.Start() @@ -152,12 +187,17 @@ func (cli *Client) Send(user, msg string) error { return fmt.Errorf("error looking up user %s: %w", user, err) } - b, err := salty.Encrypt(cli.key, PackMessage(cli.me, msg), []string{addr.Key().ID().String()}) + return cli.SendWithConfig(user, &Config{Endpoint: addr.Endpoint().String(), Key: addr.key.String()}, msg) +} + +func (cli *Client) SendWithConfig(user string, config *Config, msg string) error { + b, err := salty.Encrypt(cli.key, PackMessage(cli.me, msg), []string{config.Key}) if err != nil { return fmt.Errorf("error encrypting message to %s: %w", user, err) } - if err := Send(addr.Endpoint().String(), string(b)); err != nil { + log.Println("SEND: ", config.Endpoint, user, msg) + if err := Send(config.Endpoint, string(b)); err != nil { return fmt.Errorf("error sending message to %s: %w", user, err) } diff --git a/cmd/salty-chat/makeuser.go b/cmd/salty-chat/makeuser.go index 9964e4a..6ec7261 100644 --- a/cmd/salty-chat/makeuser.go +++ b/cmd/salty-chat/makeuser.go @@ -50,7 +50,7 @@ others Salty IM Users. A valid top-level domain or sub-domain is required and the is in the form of: username@domain - + If the -u/--endpoint flag or argument is passed this is used as the broker endpoint for constructing the Well-Known Config, if neither is used the broker is assumed to be the domain part of the nick@domain argument. diff --git a/cmd/salty-chat/read.go b/cmd/salty-chat/read.go index 22ebd51..354aed6 100644 --- a/cmd/salty-chat/read.go +++ b/cmd/salty-chat/read.go @@ -93,7 +93,7 @@ func read(me *saltyim.Addr, identity string, prehook, posthook string, args ...s for msg := range cli.Read(ctx, prehook, posthook) { if term.IsTerminal(syscall.Stdin) { - fmt.Println(saltyim.FormatMessage(msg)) + fmt.Println(saltyim.FormatMessage(msg.Text)) } else { fmt.Println(msg) } diff --git a/cmd/saltyd/main.go b/cmd/saltyd/main.go index 0055d58..11775ab 100644 --- a/cmd/saltyd/main.go +++ b/cmd/saltyd/main.go @@ -29,9 +29,10 @@ var ( tlsCert string // Basic options - data string - store string - baseURL string + data string + store string + baseURL string + servicesUser string // Oeprator adminUser string @@ -67,6 +68,7 @@ func init() { flag.StringVarP(&data, "data", "d", internal.DefaultData, "data directory") flag.StringVarP(&store, "store", "s", internal.DefaultStore, "store to use") flag.StringVarP(&baseURL, "base-url", "u", internal.DefaultBaseURL, "base url to use") + flag.StringVarP(&servicesUser, "services-user", "S", internal.DefaultServicesUser, "internal services user") // Oeprator flag.StringVarP(&adminUser, "admin-user", "A", internal.DefaultAdminUser, "default admin user to use") @@ -125,6 +127,7 @@ func main() { internal.WithData(data), internal.WithStore(store), internal.WithBaseURL(baseURL), + internal.WithServicesUser(servicesUser), // Oeprator internal.WithAdminUser(adminUser), diff --git a/data/.well-known/salty/d3d52221e8da5a8ae012f4e2db0631c181f4156f0edcde5cffa25b347c7ceda8.json b/data/.well-known/salty/d3d52221e8da5a8ae012f4e2db0631c181f4156f0edcde5cffa25b347c7ceda8.json deleted file mode 100644 index 87115aa..0000000 --- a/data/.well-known/salty/d3d52221e8da5a8ae012f4e2db0631c181f4156f0edcde5cffa25b347c7ceda8.json +++ /dev/null @@ -1 +0,0 @@ -{"endpoint":"http://0.0.0.0:8000/inbox/01FYSBPFYJD0RGWFF41RMVYBY3","key":"kex1ekt5cru4vs42wnaxppkjn5pexmt2w6uxx9z2mz0fqeuc80e0g9gsggs8ah"} \ No newline at end of file diff --git a/identity.go b/identity.go index b1b3ab5..255e31f 100644 --- a/identity.go +++ b/identity.go @@ -101,19 +101,19 @@ func GetIdentity(options ...IdentityOption) (*Identity, error) { id, err := os.Open(fn) if err != nil { - return ident, fmt.Errorf("error opening identity %q: %s", ident.Source(), err) + return ident, fmt.Errorf("error opening identity %q: %w", ident.Source(), err) } defer id.Close() identityBytes, err := ioutil.ReadAll(id) if err != nil { - return ident, fmt.Errorf("error opening identity %q: %s", ident.Source(), err) + return ident, fmt.Errorf("error opening identity %q: %w", ident.Source(), err) } ident.contents = identityBytes } key, err := salty.ParseIdentity(bytes.NewBuffer(ident.contents)) if err != nil { - return ident, fmt.Errorf("error reading identity %q: %s", ident.Source(), err) + return ident, fmt.Errorf("error reading identity %q: %w", ident.Source(), err) } ident.key = key @@ -142,7 +142,7 @@ type Identity struct { } func (i *Identity) Source() string { - if i.path != "" { + if i != nil && i.path != "" { return i.path } return "[]byte" diff --git a/internal/api.go b/internal/api.go index db092c0..1e8647f 100644 --- a/internal/api.go +++ b/internal/api.go @@ -57,7 +57,7 @@ func (a *API) RegisterEndpoint() httprouter.Handle { return } - if err := CreateConfig(a.config, req.Addr, req.Key); err != nil { + if err := CreateConfig(a.config, req.Addr.Hash(), req.Key); err != nil { log.WithError(err).Errorf("error creating config for %s", req.Addr.String()) http.Error(w, "Error", http.StatusInternalServerError) return diff --git a/internal/config.go b/internal/config.go index e5381b1..a3bf2a6 100644 --- a/internal/config.go +++ b/internal/config.go @@ -14,9 +14,10 @@ type Config struct { TLSKey string `json:"-"` TLSCert string `json:"-"` - Data string `json:"-"` - Store string `json:"-"` - BaseURL string + Data string `json:"-"` + Store string `json:"-"` + BaseURL string + ServicesUser string AdminUser string `json:"-"` AdminEmail string `json:"-"` diff --git a/internal/options.go b/internal/options.go index e19a323..2ea5e5d 100644 --- a/internal/options.go +++ b/internal/options.go @@ -1,6 +1,8 @@ package internal -import "net/url" +import ( + "net/url" +) const ( // InvalidConfigValue is the constant value for invalid config values @@ -29,6 +31,9 @@ const ( // DefaultBaseURL is the default Base URL for the server DefaultBaseURL = "http://0.0.0.0:8000" + // DefaultServicesUser is the default user for internal services registrations and other operations + DefaultServicesUser = "salty@localhost" + // DefaultAdminUser is the default publickye to grant admin privileges to DefaultAdminUser = "" @@ -125,3 +130,11 @@ func WithAdminEmail(adminEmail string) Option { return nil } } + +// WithServicesUser sets the internal services user +func WithServicesUser(user string) Option { + return func(c *Config) error { + c.ServicesUser = user + return nil + } +} diff --git a/internal/pwa/components/saltychat.go b/internal/pwa/components/saltychat.go index 171c683..b7b6fa9 100644 --- a/internal/pwa/components/saltychat.go +++ b/internal/pwa/components/saltychat.go @@ -91,7 +91,7 @@ func (h *SaltyChat) connect(ctx app.Context) { for msg := range client.Read(context.Background(), "", "") { log.Println("incoming message", msg) ctx.Dispatch(func(ctx app.Context) { - h.incomingMessage(ctx, msg) + h.incomingMessage(ctx, msg.Text) }) } }) diff --git a/internal/server.go b/internal/server.go index c764189..400a13c 100644 --- a/internal/server.go +++ b/internal/server.go @@ -5,8 +5,8 @@ import ( "fmt" "net" "net/http" - "os" "os/signal" + "path" "path/filepath" "syscall" "time" @@ -16,18 +16,21 @@ import ( "github.com/NYTimes/gziphandler" "github.com/julienschmidt/httprouter" "github.com/justinas/nosurf" + "github.com/keys-pub/keys" "github.com/maxence-charriere/go-app/v9/pkg/app" "github.com/robfig/cron" log "github.com/sirupsen/logrus" "github.com/unrolled/logger" - "go.mills.io/saltyim/internal/pwa/routes" + "go.yarn.social/lextwt" "golang.org/x/crypto/acme/autocert" "go.mills.io/saltyim" + "go.mills.io/saltyim/internal/pwa/routes" ) const ( - acmeDir = "acme" + acmeDir = "acme" + servicesIdentity = "svc.key" ) var ( @@ -52,6 +55,9 @@ type Server struct { // Message Bus bus *msgbus.MessageBus + // Services User + svc *saltyim.Service + // Data Store db Store @@ -100,10 +106,11 @@ func (s *Server) Run() (err error) { } }() - sigch := make(chan os.Signal, 1) - signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM) - sig := <-sigch - log.Infof("Received signal %s", sig) + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + <-ctx.Done() + log.Infof("Received signal %s", ctx.Err()) log.Info("Shutting down...") @@ -214,6 +221,49 @@ func (s *Server) setupCronJobs() error { return nil } +func (s *Server) setupServices() { + time.Sleep(time.Second * 5) + + log.Infof("starting services %s", s.config.ServicesUser) + + // create or load client for services user + identity := filepath.Join(s.config.Data, servicesIdentity) + cli, err := saltyim.CreateOrLoadBotClient(identity, s.config.ServicesUser) + if err != nil { + log.WithError(err).Errorf("error creating or loading service user") + return + } + svc, err := saltyim.NewService(cli) + if err != nil { + log.WithError(err).Errorf("error creating service") + return + } + s.svc = svc + + err = CreateConfig(s.config, cli.Me().Hash(), cli.Me().Key().ID().String()) + if err != nil { + log.WithError(err).Error("error creating service config") + return + } + + svc.TextFunc("register", func(ctx context.Context, bot *saltyim.Service, key *keys.EdX25519PublicKey, msg *lextwt.SaltyText) error { + addr, err := saltyim.ParseAddr(msg.User.String()) + if err != nil { + return err + } + + err = CreateConfig(s.config, addr.Hash(), key.String()) + if err != nil { + return err + } + + return bot.SendWithConfig(msg.User.String(), &saltyim.Config{ + Endpoint: s.config.BaseURL + "/" + path.Join("inbox", key.String()), + Key: key.String(), + }, "OK") + }) +} + func (s *Server) runStartupJobs() { time.Sleep(time.Second * 5) @@ -361,11 +411,16 @@ func NewServer(bind string, options ...Option) (*Server, error) { // Log interesting configuration options log.Infof("Debug: %t", server.config.Debug) + log.Infof("Debug: %t", server.config.Debug) + log.Infof("Admin User: %s", server.config.AdminUser) + log.Infof("Admin Email: %s", server.config.AdminEmail) + log.Infof("Service Users; %s", server.config.ServicesUser) api.initRoutes() server.initRoutes() go server.runStartupJobs() + go server.setupServices() return server, nil } diff --git a/internal/tasks.go b/internal/tasks.go index 2116cb0..e9504b6 100644 --- a/internal/tasks.go +++ b/internal/tasks.go @@ -13,9 +13,9 @@ const ( wellknownPath = ".well-known/salty" ) -func CreateConfig(conf *Config, addr saltyim.Addr, key string) error { +func CreateConfig(conf *Config, hash string, key string) error { p := filepath.Join(conf.Data, wellknownPath) - fn := filepath.Join(p, fmt.Sprintf("%s.json", addr.Hash())) + fn := filepath.Join(p, fmt.Sprintf("%s.json", hash)) if err := os.MkdirAll(p, 0755); err != nil { return fmt.Errorf("error creating config paths %s: %w", p, err) diff --git a/internal/tui/tui.go b/internal/tui/tui.go index 7485e71..adf0aa2 100644 --- a/internal/tui/tui.go +++ b/internal/tui/tui.go @@ -33,6 +33,7 @@ type ChatTUI struct { cli *saltyim.Client user string addr *saltyim.Addr + config *saltyim.Config // Configurations. palette map[string]string @@ -158,7 +159,7 @@ func (c *ChatTUI) RunChat(inCh chan<- string, outCh <-chan string) { // Receives incoming messages on a separate goroutine to be non-blocking. go func() { for msg := range c.cli.Read(ctx, "", "") { - inCh <- msg + inCh <- msg.Text } }() diff --git a/service.go b/service.go new file mode 100644 index 0000000..ee9153f --- /dev/null +++ b/service.go @@ -0,0 +1,109 @@ +package saltyim + +import ( + "bytes" + "context" + "fmt" + "strings" + "sync" + + "github.com/keys-pub/keys" + log "github.com/sirupsen/logrus" + + "go.yarn.social/lextwt" +) + +type Service struct { + *Client + + mu sync.RWMutex + + textFns map[string]MessageTextHandlerFunc + eventFns map[string]MessageEventHandlerFunc +} + +type MessageTextHandlerFunc func(context.Context, *Service, *keys.EdX25519PublicKey, *lextwt.SaltyText) error +type MessageEventHandlerFunc func(context.Context, *Service, *keys.EdX25519PublicKey, *lextwt.SaltyEvent) error + +func NewService(client *Client) (*Service, error) { + svc := &Service{ + Client: client, + textFns: make(map[string]MessageTextHandlerFunc), + eventFns: make(map[string]MessageEventHandlerFunc), + } + svc.TextFunc("ping", func(ctx context.Context, svc *Service, key *keys.EdX25519PublicKey, msg *lextwt.SaltyText) error { + return svc.Send(msg.User.String(), "Pong!") + }) + + return svc, nil +} + +func (svc *Service) String() string { + buf := &bytes.Buffer{} + fmt.Fprintln(buf, "Bot: ", svc.Client.me) + svc.mu.RLock() + defer svc.mu.RUnlock() + for k := range svc.textFns { + fmt.Fprintln(buf, " - TextCmd: ", k) + } + for k := range svc.eventFns { + fmt.Fprintln(buf, " - EventCmd: ", k) + } + return buf.String() +} + +func (svc *Service) Run(ctx context.Context) { + log.Println("listining for bot: ", svc.Me().Endpoint()) + msgch := svc.Read(ctx, "", "") + for { + select { + case <-ctx.Done(): + return + case msg := <-msgch: + if err := svc.handle(ctx, msg); err != nil { + log.WithError(err).Println("failed to handle message") + } + } + } +} + +func (svc *Service) handle(ctx context.Context, msg Message) error { + decoded, err := lextwt.ParseSalty(msg.Text) + if err != nil { + return err + } + + switch m := decoded.(type) { + case *lextwt.SaltyText: + fields := strings.Fields(m.LiteralText()) + svc.mu.RLock() + defer svc.mu.RUnlock() + + if fn, ok := svc.textFns[strings.ToUpper(fields[0])]; ok { + err = fn(ctx, svc, msg.Key, m) + } + case *lextwt.SaltyEvent: + svc.mu.RLock() + defer svc.mu.RUnlock() + + if fn, ok := svc.eventFns[strings.ToUpper(m.Command)]; ok { + err = fn(ctx, svc, msg.Key, m) + } + } + + return err +} + +func (svc *Service) TextFunc(name string, fn MessageTextHandlerFunc) { + svc.mu.Lock() + defer svc.mu.Unlock() + + svc.textFns[strings.ToUpper(name)] = fn +} + +func (svc *Service) EventFunc(name string, fn MessageEventHandlerFunc) { + svc.mu.Lock() + defer svc.mu.Unlock() + + svc.eventFns[strings.ToUpper(name)] = fn +} diff --git a/types.go b/types.go index 4fc6aff..f20b8a4 100644 --- a/types.go +++ b/types.go @@ -10,7 +10,7 @@ import ( // RegisterRequest is the request used by clients to register to a broker type RegisterRequest struct { - Addr Addr + Addr *Addr Key string }