Set a unique client id per subscriber connection (#35)
Co-authored-by: James Mills <prologic@shortcircuit.net.au> Reviewed-on: https://git.mills.io/prologic/msgbus/pulls/35
This commit is contained in:
parent
6c5e6c3184
commit
e7f0248805
1
go.mod
1
go.mod
|
@ -9,6 +9,7 @@ require (
|
||||||
github.com/jpillora/backoff v1.0.0
|
github.com/jpillora/backoff v1.0.0
|
||||||
github.com/mitchellh/go-homedir v1.1.0
|
github.com/mitchellh/go-homedir v1.1.0
|
||||||
github.com/mmcloughlin/professor v0.0.0-20170922221822-6b97112ab8b3
|
github.com/mmcloughlin/professor v0.0.0-20170922221822-6b97112ab8b3
|
||||||
|
github.com/oklog/ulid v1.3.1
|
||||||
github.com/prometheus/client_golang v1.12.1
|
github.com/prometheus/client_golang v1.12.1
|
||||||
github.com/sasha-s/go-deadlock v0.3.1
|
github.com/sasha-s/go-deadlock v0.3.1
|
||||||
github.com/sirupsen/logrus v1.8.1
|
github.com/sirupsen/logrus v1.8.1
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -221,6 +221,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
|
||||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||||
|
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
|
||||||
|
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||||
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
|
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
|
||||||
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
|
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
|
||||||
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
|
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
|
||||||
|
|
10
msgbus.go
10
msgbus.go
|
@ -798,7 +798,14 @@ type Client struct {
|
||||||
|
|
||||||
// NewClient ...
|
// NewClient ...
|
||||||
func NewClient(conn *websocket.Conn, topic *Topic, index int64, bus *MessageBus) *Client {
|
func NewClient(conn *websocket.Conn, topic *Topic, index int64, bus *MessageBus) *Client {
|
||||||
return &Client{conn: conn, topic: topic, index: index, bus: bus}
|
return &Client{
|
||||||
|
conn: conn,
|
||||||
|
topic: topic,
|
||||||
|
index: index,
|
||||||
|
bus: bus,
|
||||||
|
|
||||||
|
id: MustGenerateULID(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) readPump() {
|
func (c *Client) readPump() {
|
||||||
|
@ -882,7 +889,6 @@ func (c *Client) writePump() {
|
||||||
|
|
||||||
// Start ...
|
// Start ...
|
||||||
func (c *Client) Start() {
|
func (c *Client) Start() {
|
||||||
c.id = c.conn.RemoteAddr().String()
|
|
||||||
c.ch = c.bus.Subscribe(c.id, c.topic.Name, WithIndex(c.index))
|
c.ch = c.bus.Subscribe(c.id, c.topic.Name, WithIndex(c.index))
|
||||||
|
|
||||||
c.conn.SetCloseHandler(func(code int, text string) error {
|
c.conn.SetCloseHandler(func(code int, text string) error {
|
||||||
|
|
30
utils.go
30
utils.go
|
@ -1,6 +1,14 @@
|
||||||
package msgbus
|
package msgbus
|
||||||
|
|
||||||
import "strconv"
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/oklog/ulid"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
// SafeParseInt64 ...
|
// SafeParseInt64 ...
|
||||||
func SafeParseInt64(s string, d int64) int64 {
|
func SafeParseInt64(s string, d int64) int64 {
|
||||||
|
@ -10,3 +18,23 @@ func SafeParseInt64(s string, d int64) int64 {
|
||||||
}
|
}
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GenerateULID generates a new unique identifer
|
||||||
|
func GenerateULID() (string, error) {
|
||||||
|
entropy := rand.Reader
|
||||||
|
id, err := ulid.New(ulid.Timestamp(time.Now()), entropy)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("error generating ulid: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return id.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustGenerateULID generates a new unique identifer or fails
|
||||||
|
func MustGenerateULID() string {
|
||||||
|
ulid, err := GenerateULID()
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Fatal("error generating ulid")
|
||||||
|
}
|
||||||
|
return ulid
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue