A real-time message bus server and library written in Go with strong consistency and reliability guarantees.
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
Go to file
James Mills 996ff9549f
Update CHANGELOG for v0.1.21
1 month ago
.chglog Add release tools, GoReleaser config and chglog config 1 year ago
.dockerfiles Improve the Docker image 1 year ago
client check for context cancelled errors using errors.Is (#39) 1 year ago
cmd Refactor client with correct reconnecting behaviour, contexts and a clean design (#37) 1 year ago
examples Fix Subscribe() deadlock (#34) 1 year ago
grafana Add grafana dashboard 5 years ago
logs Add support for a write-ahead-log (WAL) to persist messages (#33) 1 year ago
tools Fix Subscribe() deadlock (#34) 1 year ago
.dockerignore Improve the Docker image 1 year ago
.drone.yml Fix execution of notify step in CI 7 months ago
.gitignore Add support for a write-ahead-log (WAL) to persist messages (#33) 1 year ago
.goreleaser.yml Add release tools, GoReleaser config and chglog config 1 year ago
CHANGELOG.md Update CHANGELOG for v0.1.21 1 month ago
Dockerfile Improve the Docker image 1 year ago
LICENSE Add LICENSE 6 years ago
Makefile Fix Subscribe() deadlock (#34) 1 year ago
README.md Add support for subscribers to start from an index (#26) 1 year ago
docker-compose.yml Fixed tests, Added Drone CI config and Docker Stackfile 5 years ago
go.mod Set a unique client id per subscriber connection (#35) 1 year ago
go.sum Set a unique client id per subscriber connection (#35) 1 year ago
metrics.go Add support for subscribers to start from an index (#26) 1 year ago
metrics_test.go Added support for counter vectors 5 years ago
msgbus.go Fix off-by-one error in Queue.ForEach() (#41) 1 month ago
msgbus_test.go Fix off-by-one error in Queue.ForEach() (#41) 1 month ago
options.go Fix Subscribe() deadlock (#34) 1 year ago
preflight.sh Fix Makefile 1 year ago
queue.go Fix off-by-one error in Queue.ForEach() (#41) 1 month ago
queue_test.go Fix off-by-one error in Queue.ForEach() (#41) 1 month ago
utils.go Set a unique client id per subscriber connection (#35) 1 year ago
version.go Fix version 1 year ago
version_test.go Fix version test 1 year ago

README.md

msgbus

Build Status Go Reference

A real-time message bus server and library written in Go.

Features

  • Simple HTTP API
  • Simple command-line client
  • In memory queues
  • WebSockets for real-time messages
  • Pull and Push model

Install

$ go install git.mills.io/prologic/msgbus/cmd/...

Use Cases

  • As a simple generic webhook

You can use msgbus as a simple generic webhook. For example in my dockerfiles repo I have hooked up Prometheus's AlertManager to send alert notifications to an IRC channel using some simple shell scripts.

See: alert

  • As a general-purpose message / event bus that supports pub/sub as well as pulling messages synchronously.

Usage (library)

Install the package into your project:

$ go get git.mills.io/prologic/msgbus

Use the MessageBus type either directly:

package main

import (
    "log"

    "git.mills.io/prologic/msgbus"
)

func main() {
    m := msgbus.New()
    m.Put("foo", m.NewMessage([]byte("Hello World!")))

    msg, ok := m.Get("foo")
    if !ok {
        log.Printf("No more messages in queue: foo")
    } else {
        log.Printf
	    "Received message: id=%s topic=%s payload=%s",
	    msg.ID, msg.Topic, msg.Payload,
	)
    }
}

Running this example should yield something like this:

$ go run examples/hello.go
2017/08/09 03:01:54 [msgbus] PUT id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] NotifyAll id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] GET topic=foo
2017/08/09 03:01:54 Received message: id=%!s(uint64=0) topic=foo payload=Hello World!

See the godoc for further documentation and other examples.

Usage (tool)

Run the message bus daemon/server:

$ msgbusd
2017/08/07 01:11:16 [msgbus] Subscribe id=[::1]:55341 topic=foo
2017/08/07 01:11:22 [msgbus] PUT id=0 topic=foo payload=hi
2017/08/07 01:11:22 [msgbus] NotifyAll id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] PUT id=1 topic=foo payload=bye
2017/08/07 01:11:26 [msgbus] NotifyAll id=1 topic=foo payload=bye
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo

Subscribe to a topic using the message bus client:

$ msgbus sub foo
2017/08/07 01:11:22 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] received message: id=1 topic=foo payload=bye

Send a few messages with the message bus client:

$ msgbus pub foo hi
$ msgbus pub foo bye

You can also manually pull messages using the client:

$ msgbus pull foo
2017/08/07 01:11:33 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:33 [msgbus] received message: id=1 topic=foo payload=bye

This is slightly different from a listening subscriber (using websockets) where messages are pulled directly.

Usage (HTTP)

Run the message bus daemon/server:

$ msgbusd
2018/03/25 13:21:18 msgbusd listening on :8000

Send a message with using curl:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello

Pull the messages off the "hello" queue using curl:

$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}

Decode the payload:

$ echo 'eyJtZXNzYWdlIjogImhlbGxvIn0=' | base64 -d
{"message": "hello"}

API

GET /

List all known topics/queues.

Example:

$ curl -q -o - http://localhost:8000/ | jq '.'
{
  "hello": {
    "name": "hello",
    "ttl": 60000000000,
    "seq": 1,
    "created": "2018-05-07T23:44:25.681392205-07:00"
  }
}

POST|PUT /topic

Post a new message to the queue named by <topic>.

NB: Either POST or PUT methods can be used here.

Example:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
message successfully published to hello with sequence 1

GET /topic

Get the next message of the queue named by <topic>.

  • If the topic is not found. Returns: 404 Not Found
  • If the Websockets Upgrade header is found, upgrades to a websocket channel and subscribes to the topic <topic>. Each new message published to the topic <topic> are instantly published to all subscribers.

Example:

$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}

DELETE /topic

Deletes a queue named by <topic>.

Not implemented.

  • je -- A distributed job execution engine for the execution of batch jobs, workflows, remediations and more.

License

msgbus is licensed under the MIT License