Code cleanup

This commit is contained in:
James Mills 2022-03-20 22:28:49 +10:00
parent c86b989d24
commit 84e7cb8a95
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
2 changed files with 3 additions and 22 deletions

View File

@ -259,7 +259,6 @@ func (s *Subscriber) readLoop() {
s.conn.SetReadDeadline(time.Now().Add(pongWait))
s.conn.SetPongHandler(func(message string) error {
log.Debugf("recieved pong from %s: %s", s.url, message)
t, err := strconv.ParseInt(message, 10, 64)
d := time.Duration(time.Now().UnixNano() - t)
if err != nil {

View File

@ -17,13 +17,13 @@ import (
const (
// DefaultMaxQueueSize is the default maximum size of queues
DefaultMaxQueueSize = 1000 // ~4MB per queue (1000 * 4KB)
DefaultMaxQueueSize = 1024 // ~8MB per queue (1000 * 4KB)
// DefaultMaxPayloadSize is the default maximum payload size
DefaultMaxPayloadSize = 8192 // 8KB
// DefaultBufferLength is the default buffer length for subscriber chans
DefaultBufferLength = 100
DefaultBufferLength = 256
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
@ -160,7 +160,6 @@ func (ls *Listeners) NotifyAll(message Message) int {
for id, ch := range ls.chs {
select {
case ch <- message:
log.Debugf("successfully published message to %s: %+v", id, message)
i++
default:
// TODO: Drop this client?
@ -363,11 +362,6 @@ func (mb *MessageBus) Put(message Message) {
mb.Lock()
defer mb.Unlock()
log.Debugf(
"[msgbus] PUT id=%d topic=%s payload=%s",
message.ID, message.Topic.Name, message.Payload,
)
t := message.Topic
q, ok := mb.queues[t]
if !ok {
@ -388,8 +382,6 @@ func (mb *MessageBus) Get(t *Topic) (Message, bool) {
mb.RLock()
defer mb.RUnlock()
log.Debugf("[msgbus] GET topic=%s", t)
q, ok := mb.queues[t]
if !ok {
return Message{}, false
@ -410,10 +402,6 @@ func (mb *MessageBus) Get(t *Topic) (Message, bool) {
// publish ...
func (mb *MessageBus) publish(message Message) {
log.Debugf(
"[msgbus] publish id=%d topic=%s payload=%s",
message.ID, message.Topic.Name, message.Payload,
)
ls, ok := mb.listeners[message.Topic]
if !ok {
return
@ -422,7 +410,7 @@ func (mb *MessageBus) publish(message Message) {
n := ls.NotifyAll(message)
if n != ls.Length() && mb.metrics != nil {
log.Warnf("%d/%d subscribers notified", n, ls.Length())
mb.metrics.Counter("bus", "dropped").Inc()
mb.metrics.Counter("bus", "dropped").Add(float64(ls.Length() - n))
}
}
@ -431,8 +419,6 @@ func (mb *MessageBus) Subscribe(id, topic string) chan Message {
mb.Lock()
defer mb.Unlock()
log.Debugf("[msgbus] Subscribe id=%s topic=%s", id, topic)
t, ok := mb.topics[topic]
if !ok {
t = &Topic{Name: topic, Created: time.Now()}
@ -463,8 +449,6 @@ func (mb *MessageBus) Unsubscribe(id, topic string) {
mb.Lock()
defer mb.Unlock()
log.Debugf("[msgbus] Unsubscribe id=%s topic=%s", id, topic)
t, ok := mb.topics[topic]
if !ok {
return
@ -594,7 +578,6 @@ func (c *Client) readPump() {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(message string) error {
log.Debugf("recieved pong from %s: %s", c.id, message)
t, err := strconv.ParseInt(message, 10, 64)
d := time.Duration(time.Now().UnixNano() - t)
if err != nil {
@ -675,7 +658,6 @@ func (c *Client) Start() {
c.ch = c.bus.Subscribe(c.id, c.topic.Name)
c.conn.SetCloseHandler(func(code int, text string) error {
log.Debugf("recieved close from client %s", c.id)
c.bus.Unsubscribe(c.id, c.topic.Name)
message := websocket.FormatCloseMessage(code, "")
c.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second*1))