Initial Commit

这个提交包含在:
James Mills 2017-06-03 16:16:17 +01:00
当前提交 537969e9f2
找不到此签名对应的密钥
GPG 密钥 ID: AC4C014F1440EBD6
共有 6 个文件被更改,包括 575 次插入0 次删除

39
cmd/msgbus/main.go 普通文件
查看文件

@ -0,0 +1,39 @@
package main
import (
"fmt"
"log"
"golang.org/x/net/websocket"
"github.com/prologic/msgbus"
)
func main() {
origin := "http://localhost/"
url := "ws://localhost:8000/push/foo"
ws, err := websocket.Dial(url, "", origin)
if err != nil {
log.Fatal(err)
}
var (
err error
msg msgbus.Message
)
for {
err = websocket.JSON.Receive(ws, &msg)
if err != nil {
log.Fatal(err)
}
ack := Ack{Ack: msg.Id}
err = websocket.JSON.Send(ws, &ack)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received: %s.\n", msg.Payload)
}
}

105
cmd/msgbusd/main.go 普通文件
查看文件

@ -0,0 +1,105 @@
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"golang.org/x/net/websocket"
"github.com/julienschmidt/httprouter"
)
// IndexHandler ...
func IndexHandler(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
fmt.Fprint(w, "Welcome!\n")
}
// PushHandler ...
func PushHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
topic := p.ByName("topic")
websocket.Handler(PushWebSocketHandler(topic)).ServeHTTP(w, r)
}
// PullHandler ...
func PullHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
topic := p.ByName("topic")
message, ok := msgbus.Get(topic)
if !ok {
http.Error(w, "Not Found", http.StatusNotFound)
return
}
out, err := json.Marshal(message)
if err != nil {
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
w.Write(out)
}
// PutHandler ...
func PutHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
topic := p.ByName("topic")
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
msgbus.Put(topic, msgbus.NewMessage(body))
}
// PushWebSocketHandler ...
func PushWebSocketHandler(topic string) websocket.Handler {
return func(conn *websocket.Conn) {
id := conn.Request().RemoteAddr
ch := msgbus.Subscribe(id, topic)
defer func() {
msgbus.Unsubscribe(id, topic)
}()
var (
err error
ack Ack
)
for {
msg := <-ch
err = websocket.JSON.Send(conn, msg)
if err != nil {
log.Printf("Error sending msg to %s", id)
continue
}
err = websocket.JSON.Receive(conn, &ack)
if err != nil {
log.Printf("Error receiving ack from %s", id)
continue
}
log.Printf("message %v acked %v by %s", msg, ack, id)
}
}
}
var msgbus = NewMessageBus()
func main() {
router := httprouter.New()
// UI
router.GET("/", IndexHandler)
// Subscribe
router.GET("/push/:topic", PushHandler)
router.GET("/pull/:topic", PullHandler)
// Publish
router.PUT("/:topic", PutHandler)
log.Fatal(http.ListenAndServe(":8000", router))
}

175
msgbus.go 普通文件
查看文件

@ -0,0 +1,175 @@
package msgbus
import (
"log"
"time"
)
// Message ...
type Message struct {
ID uint64 `json:"id"`
Payload []byte `json:"payload"`
Created time.Time `json:"created"`
Acked time.Time `json:"acked"`
}
// Ack ...
type Ack struct {
Ack uint64 `json:"ack"`
}
// Listeners ...
type Listeners struct {
ids map[string]bool
chs map[string]chan *Message
}
// NewListeners ...
func NewListeners() *Listeners {
return &Listeners{
ids: make(map[string]bool),
chs: make(map[string]chan *Message),
}
}
// Add ...
func (ls *Listeners) Add(id string) chan *Message {
log.Printf("Listeners.Add(%s)\n", id)
ls.ids[id] = true
ls.chs[id] = make(chan *Message)
return ls.chs[id]
}
// Remove ...
func (ls *Listeners) Remove(id string) {
log.Printf("Listeners.Remove(%s)\n", id)
delete(ls.ids, id)
close(ls.chs[id])
delete(ls.chs, id)
}
// Exists ...
func (ls *Listeners) Exists(id string) bool {
_, ok := ls.ids[id]
return ok
}
// Get ...
func (ls *Listeners) Get(id string) (chan *Message, bool) {
ch, ok := ls.chs[id]
if !ok {
return nil, false
}
return ch, true
}
// NotifyAll ...
func (ls *Listeners) NotifyAll(message *Message) {
log.Printf("Listeners.NotifyAll(%v)\n", message)
for _, ch := range ls.chs {
ch <- message
}
}
// MessageBus ...
type MessageBus struct {
seqid uint64
topics map[string]*Queue
listeners map[string]*Listeners
}
// NewMessageBus ...
func NewMessageBus() *MessageBus {
return &MessageBus{
topics: make(map[string]*Queue),
listeners: make(map[string]*Listeners),
}
}
// Len ...
func (mb *MessageBus) Len() int {
return len(mb.topics)
}
// NewMessage ...
func (mb *MessageBus) NewMessage(payload []byte) *Message {
message := &Message{
ID: mb.seqid,
Payload: payload,
Created: time.Now(),
}
mb.seqid++
return message
}
// Put ...
func (mb *MessageBus) Put(topic string, message *Message) {
log.Printf("MessageBus.Put(%s, %v)\n", topic, message)
q, ok := mb.topics[topic]
if !ok {
q = &Queue{}
mb.topics[topic] = q
}
q.Push(message)
mb.NotifyAll(topic, message)
}
// Get ...
func (mb *MessageBus) Get(topic string) (*Message, bool) {
log.Printf("MessageBus.Get(%s)\n", topic)
q, ok := mb.topics[topic]
if !ok {
return &Message{}, false
}
m := q.Pop()
if m == nil {
return &Message{}, false
}
return m.(*Message), true
}
// NotifyAll ...
func (mb *MessageBus) NotifyAll(topic string, message *Message) {
log.Printf("MessageBus.NotifyAll(%s, %v)\n", topic, message)
ls, ok := mb.listeners[topic]
if !ok {
return
}
ls.NotifyAll(message)
}
// Subscribe ...
func (mb *MessageBus) Subscribe(id, topic string) chan *Message {
log.Printf("MessageBus.Subscribe(%s, %s)\n", id, topic)
ls, ok := mb.listeners[topic]
if !ok {
ls = NewListeners()
mb.listeners[topic] = ls
}
if ls.Exists(id) {
// Already verified th listener exists
ch, _ := ls.Get(id)
return ch
}
return ls.Add(id)
}
// Unsubscribe ...
func (mb *MessageBus) Unsubscribe(id, topic string) {
log.Printf("MessageBus.Unsubscribe(%s, %s)\n", id, topic)
ls, ok := mb.listeners[topic]
if !ok {
return
}
if ls.Exists(id) {
// Already verified th listener exists
ls.Remove(id)
}
}

78
msgbus_test.go 普通文件
查看文件

@ -0,0 +1,78 @@
package msgbus
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMessageBusLen(t *testing.T) {
mb := NewMessageBus()
assert.Equal(t, mb.Len(), 0)
}
func TestMessage(t *testing.T) {
mb := NewMessageBus()
assert.Equal(t, mb.Len(), 0)
topic := "foo"
expected := &Message{Payload: []byte("bar")}
mb.Put(topic, expected)
actual, ok := mb.Get(topic)
assert.True(t, ok)
assert.Equal(t, actual, expected)
}
func TestMessageGetEmpty(t *testing.T) {
mb := NewMessageBus()
assert.Equal(t, mb.Len(), 0)
topic := "foo"
msg, ok := mb.Get(topic)
assert.False(t, ok)
assert.Equal(t, msg, &Message{})
}
func BenchmarkMessageBusPut(b *testing.B) {
mb := NewMessageBus()
topic := "foo"
msg := &Message{Payload: []byte("foo")}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mb.Put(topic, msg)
}
}
func BenchmarkMessageBusGet(b *testing.B) {
mb := NewMessageBus()
topic := "foo"
msg := &Message{Payload: []byte("foo")}
for i := 0; i < b.N; i++ {
mb.Put(topic, msg)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mb.Get(topic)
}
}
func BenchmarkMessageBusGetEmpty(b *testing.B) {
mb := NewMessageBus()
topic := "foo"
b.ResetTimer()
for i := 0; i < b.N; i++ {
mb.Get(topic)
}
}
func BenchmarkMessageBusPutGet(b *testing.B) {
mb := NewMessageBus()
topic := "foo"
msg := &Message{Payload: []byte("foo")}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mb.Put(topic, msg)
mb.Get(topic)
}
}

79
queue.go 普通文件
查看文件

@ -0,0 +1,79 @@
package msgbus
import (
"sync"
)
// QueueNode ...
type QueueNode struct {
data interface{}
next *QueueNode
}
// Queue ...
type Queue struct {
sync.Mutex
head *QueueNode
tail *QueueNode
size int
}
// Len ...
func (q *Queue) Len() int {
q.Lock()
defer q.Unlock()
return q.size
}
// Push ...
func (q *Queue) Push(item interface{}) {
q.Lock()
defer q.Unlock()
n := &QueueNode{data: item}
if q.tail == nil {
q.tail = n
q.head = n
} else {
q.tail.next = n
q.tail = n
}
q.size++
}
// Pop ...
func (q *Queue) Pop() interface{} {
q.Lock()
defer q.Unlock()
if q.head == nil {
return nil
}
n := q.head
q.head = n.next
if q.head == nil {
q.tail = nil
}
q.size--
return n.data
}
// Peek ...
func (q *Queue) Peek() interface{} {
q.Lock()
defer q.Unlock()
n := q.head
if n == nil {
return nil
}
return n.data
}

99
queue_test.go 普通文件
查看文件

@ -0,0 +1,99 @@
package msgbus
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestQueueLen(t *testing.T) {
q := Queue{}
assert.Equal(t, q.Len(), 0)
}
func TestQueuePush(t *testing.T) {
q := Queue{}
q.Push(1)
assert.Equal(t, q.Len(), 1)
}
func TestQueuePop(t *testing.T) {
q := Queue{}
q.Push(1)
assert.Equal(t, q.Len(), 1)
assert.Equal(t, q.Pop(), 1)
assert.Equal(t, q.Len(), 0)
}
func TestQueuePeek(t *testing.T) {
q := Queue{}
q.Push(1)
assert.Equal(t, q.Len(), 1)
assert.Equal(t, q.Peek(), 1)
assert.Equal(t, q.Len(), 1)
}
func TestQueue(t *testing.T) {
q := Queue{}
q.Push(1)
q.Push(2)
q.Push(3)
assert.Equal(t, q.Len(), 3)
assert.Equal(t, q.Peek(), 1)
assert.Equal(t, q.Pop(), 1)
assert.Equal(t, q.Len(), 2)
assert.Equal(t, q.Peek(), 2)
assert.Equal(t, q.Pop(), 2)
assert.Equal(t, q.Len(), 1)
assert.Equal(t, q.Peek(), 3)
assert.Equal(t, q.Pop(), 3)
assert.Equal(t, q.Len(), 0)
assert.Equal(t, q.Peek(), nil)
}
func BenchmarkQueuePush(b *testing.B) {
q := Queue{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Push(i)
}
}
func BenchmarkQueuePeekEmpty(b *testing.B) {
q := Queue{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Peek()
}
}
func BenchmarkQueuePeekNonEmpty(b *testing.B) {
q := Queue{}
q.Push(1)
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Peek()
}
}
func BenchmarkQueuePopEmpty(b *testing.B) {
q := Queue{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Pop()
}
}
func BenchmarkQueuePopNonEmpty(b *testing.B) {
q := Queue{}
for i := 0; i < b.N; i++ {
q.Push(i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Pop()
}
}