# msgbus [![Build Status](https://ci.mills.io/api/badges/prologic/msgbus/status.svg)](https://ci.mills.io/prologic/msgbus) [![Go Reference](https://pkg.go.dev/git.mills.io/prologic/msgbus?status.svg)](https://pkg.go.dev/git.mills.io/prologic/msgbus) A real-time message bus server and library written in Go. ## Features * Simple HTTP API * Simple command-line client * In memory queues * WebSockets for real-time messages * Pull and Push model ## Install ```#!bash $ go install git.mills.io/prologic/msgbus/cmd/... ``` ## Use Cases * As a simple generic webhook You can use msgbus as a simple generic webhook. For example in my [dockerfiles](https://git.mills.io/prologic/dockerfiles) repo I have hooked up [Prometheus](https://prometheus.io/)'s [AlertManager](https://prometheus.io/docs/alerting/alertmanager/) to send alert notifications to an IRC channel using some simple shell scripts. See: [alert](https://hub.docker.com/r/prologic/alert/) * As a general-purpose message / event bus that supports pub/sub as well as pulling messages synchronously. ## Usage (library) Install the package into your project: ```#!bash $ go get git.mills.io/prologic/msgbus ``` Use the `MessageBus` type either directly: ```#!go package main import ( "log" "git.mills.io/prologic/msgbus" ) func main() { m := msgbus.New() m.Put("foo", m.NewMessage([]byte("Hello World!"))) msg, ok := m.Get("foo") if !ok { log.Printf("No more messages in queue: foo") } else { log.Printf "Received message: id=%s topic=%s payload=%s", msg.ID, msg.Topic, msg.Payload, ) } } ``` Running this example should yield something like this: ```#!bash $ go run examples/hello.go 2017/08/09 03:01:54 [msgbus] PUT id=0 topic=foo payload=Hello World! 2017/08/09 03:01:54 [msgbus] NotifyAll id=0 topic=foo payload=Hello World! 2017/08/09 03:01:54 [msgbus] GET topic=foo 2017/08/09 03:01:54 Received message: id=%!s(uint64=0) topic=foo payload=Hello World! ``` See the [godoc](https://godoc.org/git.mills.io/prologic/msgbus) for further documentation and other examples. ## Usage (tool) Run the message bus daemon/server: ```#!bash $ msgbusd 2017/08/07 01:11:16 [msgbus] Subscribe id=[::1]:55341 topic=foo 2017/08/07 01:11:22 [msgbus] PUT id=0 topic=foo payload=hi 2017/08/07 01:11:22 [msgbus] NotifyAll id=0 topic=foo payload=hi 2017/08/07 01:11:26 [msgbus] PUT id=1 topic=foo payload=bye 2017/08/07 01:11:26 [msgbus] NotifyAll id=1 topic=foo payload=bye 2017/08/07 01:11:33 [msgbus] GET topic=foo 2017/08/07 01:11:33 [msgbus] GET topic=foo 2017/08/07 01:11:33 [msgbus] GET topic=foo ``` Subscribe to a topic using the message bus client: ```#!bash $ msgbus sub foo 2017/08/07 01:11:22 [msgbus] received message: id=0 topic=foo payload=hi 2017/08/07 01:11:26 [msgbus] received message: id=1 topic=foo payload=bye ``` Send a few messages with the message bus client: ```#!bash $ msgbus pub foo hi $ msgbus pub foo bye ``` You can also manually pull messages using the client: ```#!bash $ msgbus pull foo 2017/08/07 01:11:33 [msgbus] received message: id=0 topic=foo payload=hi 2017/08/07 01:11:33 [msgbus] received message: id=1 topic=foo payload=bye ``` > This is slightly different from a listening subscriber (*using websockets*) where messages are pulled directly. ## Usage (HTTP) Run the message bus daemon/server: ```#!bash $ msgbusd 2018/03/25 13:21:18 msgbusd listening on :8000 ``` Send a message with using `curl`: ```#!bash $ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello ``` Pull the messages off the "hello" queue using `curl`: ```#!bash $ curl -q -o - http://localhost:8000/hello {"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"} ``` Decode the payload: ```#!bash $ echo 'eyJtZXNzYWdlIjogImhlbGxvIn0=' | base64 -d {"message": "hello"} ``` ## API ### GET / List all known topics/queues. Example: ```#!bash $ curl -q -o - http://localhost:8000/ | jq '.' { "hello": { "name": "hello", "ttl": 60000000000, "seq": 1, "created": "2018-05-07T23:44:25.681392205-07:00" } } ``` ## POST|PUT /topic Post a new message to the queue named by ``. **NB:** Either `POST` or `PUT` methods can be used here. Example: ```#!bash $ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello message successfully published to hello with sequence 1 ``` ## GET /topic Get the next message of the queue named by ``. - If the topic is not found. Returns: `404 Not Found` - If the Websockets `Upgrade` header is found, upgrades to a websocket channel and subscribes to the topic ``. Each new message published to the topic `` are instantly published to all subscribers. Example: ```#!bash $ curl -q -o - http://localhost:8000/hello {"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"} ``` ## DELETE /topic Deletes a queue named by ``. *Not implemented*. ## Related Projects * [je](https://git.mills.io/prologic/je) -- A distributed job execution engine for the execution of batch jobs, workflows, remediations and more. ## License msgbus is licensed under the [MIT License](https://git.mills.io/prologic/msgbus/blob/master/LICENSE)