6
0
mirror of https://git.mills.io/prologic/msgbus.git synced 2024-06-24 07:48:57 +00:00
A real-time message bus server and library written in Go with strong consistency and reliability guarantees.
Go to file
James Mills e6ab9d9c70
Revert "Don't attempt to reconenct if we're actually closign down"
This reverts commit e954f2f47f4fd11f3da894153a6d58b0e6043577.
2022-03-29 11:30:40 +10:00
.chglog Add release tools, GoReleaser config and chglog config 2022-03-20 08:10:05 +10:00
.dockerfiles Improve the Docker image 2022-03-20 08:28:39 +10:00
.github/ISSUE_TEMPLATE Update issue templates 2018-05-02 01:37:44 -07:00
client Revert "Don't attempt to reconenct if we're actually closign down" 2022-03-29 11:30:40 +10:00
cmd Fix Client API 2022-03-28 01:49:03 +10:00
examples Fix some data races 2022-03-21 00:29:02 +10:00
grafana Add grafana dashboard 2018-05-02 02:40:23 -07:00
tools Add release tools, GoReleaser config and chglog config 2022-03-20 08:10:05 +10:00
_config.yml Set theme jekyll-theme-architect 2018-11-22 21:11:36 +10:00
.dockerignore Improve the Docker image 2022-03-20 08:28:39 +10:00
.drone.yml Fix Drone CI notify step in pipeline 2019-04-06 11:48:40 +10:00
.gitignore Add release tools, GoReleaser config and chglog config 2022-03-20 08:10:05 +10:00
.goreleaser.yml Add release tools, GoReleaser config and chglog config 2022-03-20 08:10:05 +10:00
bench-yarn.txt Cleanup logging 2022-03-21 02:12:31 +10:00
CHANGELOG.md Update CHANGELOG for v0.1.12 2022-03-28 01:49:11 +10:00
CODE_OF_CONDUCT.md Create CODE_OF_CONDUCT.md (#4) 2018-05-02 01:36:30 -07:00
CONTRIBUTING.md Create CONTRIBUTING.md 2018-05-02 01:35:43 -07:00
coverage.out Cleanup logging 2022-03-21 02:12:31 +10:00
docker-compose.yml Fixed tests, Added Drone CI config and Docker Stackfile 2018-03-25 15:03:18 -07:00
Dockerfile Improve the Docker image 2022-03-20 08:28:39 +10:00
go.mod Partially unrevert 6a840a2 2022-03-27 14:47:14 +10:00
go.sum Partially unrevert 6a840a2 2022-03-27 14:47:14 +10:00
LICENSE Add LICENSE 2017-08-07 01:16:28 -07:00
Makefile Fix Makefile 2022-03-21 02:29:11 +10:00
metrics_test.go Added support for counter vectors 2018-06-10 08:35:21 -07:00
metrics.go Added support for counter vectors 2018-06-10 08:35:21 -07:00
msgbus_test.go Partially unrevert 6a840a2 2022-03-27 14:47:14 +10:00
msgbus.go Revert "change gorilla/websocket to nhooyr.io/websocket" 2022-03-27 12:42:56 +10:00
preflight.sh Fix Makefile 2022-03-20 08:04:05 +10:00
PULL_REQUEST_TEMPLATE.md Create PULL_REQUEST_TEMPLATE.md 2018-05-02 01:39:27 -07:00
queue_test.go Added configurable bounded queues with a deque data structure with added metrics (#6) 2018-05-09 23:25:13 -07:00
queue.go Added configurable bounded queues with a deque data structure with added metrics (#6) 2018-05-09 23:25:13 -07:00
README.md Fix import paths 2021-07-13 07:57:54 +10:00
version_test.go Fix version test 2022-03-25 22:33:50 +10:00
version.go Fix version 2022-03-21 02:28:47 +10:00

msgbus

Build Status CodeCov Go Report Card GoDoc GitHub license Sourcegraph

A real-time message bus server and library written in Go.

Features

  • Simple HTTP API
  • Simple command-line client
  • In memory queues
  • WebSockets for real-time messages
  • Pull and Push model

Install

$ go install git.mills.io/prologic/msgbus/...

Use Cases

  • As a simple generic webhook

You can use msgbus as a simple generic webhook. For example in my dockerfiles repo I have hooked up Prometheus's AlertManager to send alert notifications to an IRC channel using some simple shell scripts.

See: alert

  • As a general-purpose message / event bus that supports pub/sub as well as pulling messages synchronously.

Usage (library)

Install the package into your project:

$ go get git.mills.io/prologic/msgbus

Use the MessageBus type either directly:

package main

import (
    "log"

    "git.mills.io/prologic/msgbus"
)

func main() {
    m := msgbus.New()
    m.Put("foo", m.NewMessage([]byte("Hello World!")))

    msg, ok := m.Get("foo")
    if !ok {
        log.Printf("No more messages in queue: foo")
    } else {
        log.Printf
	    "Received message: id=%s topic=%s payload=%s",
	    msg.ID, msg.Topic, msg.Payload,
	)
    }
}

Running this example should yield something like this:

$ go run examples/hello.go
2017/08/09 03:01:54 [msgbus] PUT id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] NotifyAll id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] GET topic=foo
2017/08/09 03:01:54 Received message: id=%!s(uint64=0) topic=foo payload=Hello World!

See the godoc for further documentation and other examples.

Usage (tool)

Run the message bus daemon/server:

$ msgbusd
2017/08/07 01:11:16 [msgbus] Subscribe id=[::1]:55341 topic=foo
2017/08/07 01:11:22 [msgbus] PUT id=0 topic=foo payload=hi
2017/08/07 01:11:22 [msgbus] NotifyAll id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] PUT id=1 topic=foo payload=bye
2017/08/07 01:11:26 [msgbus] NotifyAll id=1 topic=foo payload=bye
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo

Subscribe to a topic using the message bus client:

$ msgbus sub foo
2017/08/07 01:11:22 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] received message: id=1 topic=foo payload=bye

Send a few messages with the message bus client:

$ msgbus pub foo hi
$ msgbus pub foo bye

You can also manually pull messages using the client:

$ msgbus pull foo
2017/08/07 01:11:33 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:33 [msgbus] received message: id=1 topic=foo payload=bye

This is slightly different from a listening subscriber (using websockets) where messages are pulled directly.

Usage (HTTP)

Run the message bus daemon/server:

$ msgbusd
2018/03/25 13:21:18 msgbusd listening on :8000

Send a message with using curl:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello

Pull the messages off the "hello" queue using curl:

$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}

Decode the payload:

$ echo 'eyJtZXNzYWdlIjogImhlbGxvIn0=' | base64 -d
{"message": "hello"}

API

GET /

List all known topics/queues.

Example:

$ curl -q -o - http://localhost:8000/ | jq '.'
{
  "hello": {
    "name": "hello",
    "ttl": 60000000000,
    "seq": 1,
    "created": "2018-05-07T23:44:25.681392205-07:00"
  }
}

POST|PUT /topic

Post a new message to the queue named by <topic>.

NB: Either POST or PUT methods can be used here.

Example:

$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
message successfully published to hello with sequence 1

GET /topic

Get the next message of the queue named by <topic>.

  • If the topic is not found. Returns: 404 Not Found
  • If the Websockets Upgrade header is found, upgrades to a websocket channel and subscribes to the topic <topic>. Each new message published to the topic <topic> are instantly published to all subscribers.

Example:

$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","created":"2018-03-25T13:18:38.732465-07:00"}

DELETE /topic

Deletes a queue named by <topic>.

Not implemented.

  • je -- A distributed job execution engine for the execution of batch jobs, workflows, remediations and more.

License

msgbus is licensed under the MIT License