6
0
mirror of https://git.mills.io/prologic/msgbus.git synced 2024-06-26 00:38:55 +00:00
prologic-msgbus/README.md

202 lines
5.2 KiB
Markdown
Raw Normal View History

2017-08-07 08:15:56 +00:00
# msgbus
2017-08-10 07:38:53 +00:00
[![Build Status](https://travis-ci.org/prologic/msgbus.svg)](https://travis-ci.org/prologic/msgbus)
[![CodeCov](https://codecov.io/gh/prologic/msgbus/branch/master/graph/badge.svg)](https://codecov.io/gh/prologic/msgbus)
[![Go Report Card](https://goreportcard.com/badge/prologic/msgbus)](https://goreportcard.com/report/prologic/msgbus)
[![GoDoc](https://godoc.org/github.com/prologic/msgbus?status.svg)](https://godoc.org/github.com/prologic/msgbus)
2018-03-25 20:39:24 +00:00
A real-time message bus server and library written in Go with strong
consistency and reliability guarantees.
(*eventual goals of distributed high-availability and sharding*)
2017-06-08 05:01:09 +00:00
**Status:** Alpha Software. This is considered "alpha quality" and is actively
being developed, tested and evolved with other projects and use-cases.
2017-06-08 05:01:09 +00:00
2017-08-07 08:15:56 +00:00
**Update:** (*2017-08-07*) This is now being used by [autodock](https://github.com/prologic/autodock) and is undergoing heavy development to deliver what is laid out here.
2017-06-08 05:01:09 +00:00
## Install
```#!bash
$ go install github.com/prologic/msgbus/...
```
## Usage (library)
Install the package into your project:
```#!bash
$ go get github.com/prologic/msgbus
```
Use the `MessageBus` type either directly:
```#!go
package main
import (
"log"
"github.com/prologic/msgbus"
)
func main() {
m := msgbus.NewMessageBus()
m.Put("foo", m.NewMessage([]byte("Hello World!")))
msg, ok := m.Get("foo")
if !ok {
log.Printf("No more messages in queue: foo")
} else {
log.Printf(
"Received message: id=%s topic=%s payload=%s",
msg.ID, msg.Topic, msg.Payload,
)
}
}
```
Running this example should yield something like this:
```#!bash
$ go run examples/hello.go
2017/08/09 03:01:54 [msgbus] PUT id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] NotifyAll id=0 topic=foo payload=Hello World!
2017/08/09 03:01:54 [msgbus] GET topic=foo
2017/08/09 03:01:54 Received message: id=%!s(uint64=0) topic=foo payload=Hello World!
```
See the [godoc](https://godoc.org/github.com/prologic/msgbus) for further
documentation and other examples.
## Usage (tool)
2017-06-08 05:01:09 +00:00
2017-06-08 14:47:16 +00:00
Run the message bus daemon/server:
2017-06-08 05:01:09 +00:00
```#!bash
$ msgbusd
2017-08-07 08:15:56 +00:00
2017/08/07 01:11:16 [msgbus] Subscribe id=[::1]:55341 topic=foo
2017/08/07 01:11:22 [msgbus] PUT id=0 topic=foo payload=hi
2017/08/07 01:11:22 [msgbus] NotifyAll id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] PUT id=1 topic=foo payload=bye
2017/08/07 01:11:26 [msgbus] NotifyAll id=1 topic=foo payload=bye
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017/08/07 01:11:33 [msgbus] GET topic=foo
2017-06-08 05:01:09 +00:00
```
Subscribe to a topic using the message bus client:
```#!bash
$ msgbus sub foo
2017-08-07 08:15:56 +00:00
2017/08/07 01:11:22 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:26 [msgbus] received message: id=1 topic=foo payload=bye
2017-06-08 05:01:09 +00:00
```
2017-06-08 14:47:16 +00:00
Send a few messages with the message bus client:
2017-06-08 05:01:09 +00:00
```#!bash
2017-08-07 08:15:56 +00:00
$ msgbus pub foo hi
$ msgbus pub foo bye
```
You can also manually pull messages using the client:
```#!bash
$ msgbus pull foo
2017/08/07 01:11:33 [msgbus] received message: id=0 topic=foo payload=hi
2017/08/07 01:11:33 [msgbus] received message: id=1 topic=foo payload=bye
2017-06-08 05:01:09 +00:00
```
2017-08-07 08:15:56 +00:00
> This is slightly different from a listening subscriber (*using websockets*) where messages are pulled directly.
2018-03-25 20:32:31 +00:00
## Usage (HTTP)
Run the message bus daemon/server:
```#!bash
$ msgbusd
2018/03/25 13:21:18 msgbusd listening on :8000
```
Send a message with using `curl`:
```#!bash
$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
```
Pull the messages off the "hello" queue using `curl`:
```#!bash
$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","expires":"0001-01-01T00:00:00Z","created":"2018-03-25T13:18:38.732465-07:00"}
```
Decode the payload:
```#!bash
$ echo 'eyJtZXNzYWdlIjogImhlbGxvIn0=' | base64 -d
{"message": "hello"}
```
## API
### GET /
List all known topics/queues.
Example:
```#!bash
$ curl -q -o - http://localhost:8000/
hello
```
## POST|PUT /<topic>
Post a new message to the queue named by `<topic>`.
**NB:** Either `POST` or `PUT` methods can be used here.
Example:
```#!bash
$ curl -q -o - -X PUT -d '{"message": "hello"}' http://localhost:8000/hello
```
## GET /<topic>
Get the next message of the queue named by `<topic>`.
- If the topic is not found. Returns: `404 Not Found`
- If the Websockets `Upgrade` header is found, upgrades to a websocket channel
and subscribes to the topic `<topic>`. Each new message published to the
topic `<topic>` are instantly published to all subscribers.
Example:
```#!bash
$ curl -q -o - http://localhost:8000/hello
{"id":0,"topic":{"name":"hello","ttl":60000000000,"seq":1,"created":"2018-03-25T13:18:38.732437-07:00"},"payload":"eyJtZXNzYWdlIjogImhlbGxvIn0=","expires":"0001-01-01T00:00:00Z","created":"2018-03-25T13:18:38.732465-07:00"}
```
## DELETE /<topic>
Deletes a queue named by `<topic>`.
*Not implemented*.
2017-06-08 05:01:09 +00:00
## Design
Design decisions so far:
2017-08-07 08:15:56 +00:00
* In memory queues (*may extend this with interfaces and persistence*)
2017-06-08 05:01:09 +00:00
* HTTP API
2018-03-25 20:39:24 +00:00
* Websockets for real-time push of events
2017-08-07 08:15:56 +00:00
* Sequence ID Message tracking
* Pull and Push model
## License
2017-06-08 05:01:09 +00:00
2017-08-07 08:15:56 +00:00
msgbus is licensed under the [MIT License](https://github.com/prologic/msgbus/blob/master/LICENSE)