1
0
mirror of https://git.mills.io/kayos/bitraft.git synced 2024-06-30 18:51:41 +00:00
bitraft/server.go

404 lines
8.6 KiB
Go
Raw Normal View History

package main
2017-01-29 17:46:39 +00:00
import (
"bufio"
"compress/gzip"
"encoding/binary"
2017-01-31 01:00:20 +00:00
"errors"
2017-01-29 17:46:39 +00:00
"io"
2017-02-09 17:51:37 +00:00
"net"
2017-01-29 17:46:39 +00:00
"os"
"path/filepath"
2017-01-31 01:00:20 +00:00
"strconv"
2017-01-29 17:46:39 +00:00
"strings"
"sync"
2017-02-09 17:51:37 +00:00
"time"
2017-01-29 17:46:39 +00:00
"github.com/prologic/bitcask"
log "github.com/sirupsen/logrus"
2017-01-31 13:58:02 +00:00
"github.com/tidwall/finn"
2017-01-29 17:46:39 +00:00
"github.com/tidwall/redcon"
)
2017-02-09 17:51:37 +00:00
const defaultTCPKeepAlive = time.Minute * 5
2017-01-31 13:09:00 +00:00
var (
errSyntaxError = errors.New("syntax error")
)
func ListenAndServe(addr, join, dir, logdir string, consistency, durability finn.Level) error {
opts := finn.Options{
Backend: finn.FastLog,
Consistency: consistency,
Durability: durability,
ConnAccept: func(conn redcon.Conn) bool {
if tcp, ok := conn.NetConn().(*net.TCPConn); ok {
if err := tcp.SetKeepAlive(true); err != nil {
log.Warningf("could not set keepalive: %s",
2017-02-09 17:51:37 +00:00
tcp.RemoteAddr().String())
} else {
err := tcp.SetKeepAlivePeriod(defaultTCPKeepAlive)
if err != nil {
log.Warningf("could not set keepalive period: %s",
tcp.RemoteAddr().String())
}
2017-02-09 17:51:37 +00:00
}
}
return true
},
2017-02-09 17:51:37 +00:00
}
2017-01-31 13:09:00 +00:00
m, err := NewMachine(dir, addr)
if err != nil {
return err
}
2017-01-31 15:43:05 +00:00
n, err := finn.Open(logdir, addr, join, m, &opts)
2017-01-31 13:09:00 +00:00
if err != nil {
return err
}
defer n.Close()
select {
// blocking, there's no way out
}
}
2017-01-29 17:46:39 +00:00
type Machine struct {
2017-01-31 13:09:00 +00:00
mu sync.RWMutex
2017-01-29 17:46:39 +00:00
dir string
db *bitcask.Bitcask
2017-01-29 17:46:39 +00:00
dbPath string
2017-01-31 01:00:20 +00:00
addr string
closed bool
2017-01-29 17:46:39 +00:00
}
2017-01-31 01:00:20 +00:00
func NewMachine(dir, addr string) (*Machine, error) {
2017-01-29 17:46:39 +00:00
kvm := &Machine{
dir: dir,
2017-01-31 01:00:20 +00:00
addr: addr,
2017-01-29 17:46:39 +00:00
}
var err error
kvm.dbPath = filepath.Join(dir, "node.db")
kvm.db, err = bitcask.Open(kvm.dir)
2017-01-29 17:46:39 +00:00
if err != nil {
return nil, err
}
return kvm, nil
}
func (kvm *Machine) Close() error {
2017-01-31 01:00:20 +00:00
kvm.mu.Lock()
defer kvm.mu.Unlock()
2017-01-29 17:46:39 +00:00
kvm.db.Close()
2017-01-31 01:00:20 +00:00
kvm.closed = true
2017-01-29 17:46:39 +00:00
return nil
}
2017-01-31 01:00:20 +00:00
func (kvm *Machine) Command(
2017-01-31 13:58:02 +00:00
m finn.Applier, conn redcon.Conn, cmd redcon.Command,
2017-01-31 01:00:20 +00:00
) (interface{}, error) {
2017-01-29 17:46:39 +00:00
switch strings.ToLower(string(cmd.Args[0])) {
default:
2017-02-18 17:12:54 +00:00
log.Warningf("unknown command: %s\n", cmd.Args[0])
2017-01-29 17:46:39 +00:00
return nil, finn.ErrUnknownCommand
2017-02-18 17:12:54 +00:00
case "echo":
return kvm.cmdEcho(m, conn, cmd)
2017-01-29 17:46:39 +00:00
case "set":
2017-01-31 13:58:02 +00:00
return kvm.cmdSet(m, conn, cmd)
2017-01-29 17:46:39 +00:00
case "get":
return kvm.cmdGet(m, conn, cmd)
case "del":
return kvm.cmdDel(m, conn, cmd)
2017-01-29 17:46:39 +00:00
case "keys":
return kvm.cmdKeys(m, conn, cmd)
case "flushdb":
return kvm.cmdFlushdb(m, conn, cmd)
case "shutdown":
log.Warningf("shutting down")
conn.WriteString("OK")
conn.Close()
os.Exit(0)
return nil, nil
}
}
func (kvm *Machine) Restore(rd io.Reader) error {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var err error
if err := kvm.db.Close(); err != nil {
return err
}
if err := os.RemoveAll(kvm.dbPath); err != nil {
return err
}
kvm.db = nil
kvm.db, err = bitcask.Open(kvm.dir)
2017-01-29 17:46:39 +00:00
if err != nil {
return err
}
num := make([]byte, 8)
gzr, err := gzip.NewReader(rd)
if err != nil {
return err
}
r := bufio.NewReader(gzr)
for {
if _, err := io.ReadFull(r, num); err != nil {
if err == io.EOF {
break
}
return err
}
key := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, key); err != nil {
return err
}
if _, err := io.ReadFull(r, num); err != nil {
return err
}
value := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, value); err != nil {
return err
}
kvm.db.Put(string(key), value)
2017-01-29 17:46:39 +00:00
}
return gzr.Close()
}
2017-02-18 16:52:37 +00:00
// WriteRedisCommandsFromSnapshot will read a snapshot and write all the
// Redis SET commands needed to rebuild the entire database.
// The commands are written to wr.
func WriteRedisCommandsFromSnapshot(wr io.Writer, snapshotPath string) error {
f, err := os.Open(snapshotPath)
if err != nil {
return err
}
defer f.Close()
var cmd []byte
num := make([]byte, 8)
var gzclosed bool
gzr, err := gzip.NewReader(f)
if err != nil {
return err
}
defer func() {
if !gzclosed {
gzr.Close()
}
}()
r := bufio.NewReader(gzr)
for {
if _, err := io.ReadFull(r, num); err != nil {
if err == io.EOF {
break
}
return err
}
key := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, key); err != nil {
return err
}
if _, err := io.ReadFull(r, num); err != nil {
return err
}
value := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, value); err != nil {
return err
}
2017-02-18 17:12:54 +00:00
if len(key) == 0 || key[0] != 'k' {
// do not accept keys that do not start with 'k'
continue
}
key = key[1:]
2017-02-18 16:52:37 +00:00
cmd = cmd[:0]
cmd = append(cmd, "*3\r\n$3\r\nSET\r\n$"...)
cmd = strconv.AppendInt(cmd, int64(len(key)), 10)
cmd = append(cmd, '\r', '\n')
cmd = append(cmd, key...)
cmd = append(cmd, '\r', '\n', '$')
cmd = strconv.AppendInt(cmd, int64(len(value)), 10)
cmd = append(cmd, '\r', '\n')
cmd = append(cmd, value...)
cmd = append(cmd, '\r', '\n')
if _, err := wr.Write(cmd); err != nil {
return err
}
}
err = gzr.Close()
gzclosed = true
return err
}
2017-01-29 17:46:39 +00:00
func (kvm *Machine) Snapshot(wr io.Writer) error {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
gzw := gzip.NewWriter(wr)
err := kvm.db.Fold(func(key string) error {
var buf []byte
value, err := kvm.db.Get(key)
if err != nil {
return err
}
num := make([]byte, 8)
2017-01-29 17:46:39 +00:00
binary.LittleEndian.PutUint64(num, uint64(len(key)))
buf = append(buf, num...)
buf = append(buf, key...)
binary.LittleEndian.PutUint64(num, uint64(len(value)))
buf = append(buf, num...)
buf = append(buf, value...)
if _, err := gzw.Write(buf); err != nil {
return err
}
return nil
})
if err != nil {
2017-01-29 17:46:39 +00:00
return err
}
return gzw.Close()
2017-01-29 17:46:39 +00:00
}
2017-01-31 01:00:20 +00:00
2017-01-31 13:09:00 +00:00
func (kvm *Machine) cmdSet(
m finn.Applier, conn redcon.Conn, cmd redcon.Command,
) (interface{}, error) {
if len(cmd.Args) != 3 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
return nil, kvm.db.Put(string(cmd.Args[1]), cmd.Args[2])
2017-01-31 20:48:02 +00:00
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}
2017-02-18 17:12:54 +00:00
func (kvm *Machine) cmdEcho(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 2 {
return nil, finn.ErrWrongNumberOfArguments
}
conn.WriteBulk(cmd.Args[1])
return nil, nil
}
2017-01-31 13:09:00 +00:00
func (kvm *Machine) cmdGet(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 2 {
return nil, finn.ErrWrongNumberOfArguments
}
key := string(cmd.Args[1])
2017-01-31 13:09:00 +00:00
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
2017-01-31 01:00:20 +00:00
kvm.mu.RLock()
2017-01-31 13:09:00 +00:00
defer kvm.mu.RUnlock()
value, err := kvm.db.Get(key)
2017-01-31 01:00:20 +00:00
if err != nil {
if err == bitcask.ErrKeyNotFound {
2017-01-31 13:09:00 +00:00
conn.WriteNull()
return nil, nil
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
return nil, err
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
conn.WriteBulk(value)
return nil, nil
},
)
}
func (kvm *Machine) cmdDel(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
2017-01-31 23:43:08 +00:00
var startIdx = 1
2017-01-31 13:09:00 +00:00
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var n int
2017-01-31 23:43:08 +00:00
for i := startIdx; i < len(cmd.Args); i++ {
key := string(cmd.Args[i])
err := kvm.db.Delete(key)
if err != nil {
2017-01-31 13:09:00 +00:00
return 0, err
2017-01-31 01:00:20 +00:00
}
n++
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
return n, nil
},
func(v interface{}) (interface{}, error) {
n := v.(int)
conn.WriteInt(n)
return nil, nil
},
)
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
func (kvm *Machine) cmdKeys(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
var withvalues bool
for i := 1; i < len(cmd.Args); i++ {
2017-01-31 13:09:00 +00:00
switch strings.ToLower(string(cmd.Args[i])) {
default:
return nil, errSyntaxError
case "withvalues":
withvalues = true
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
}
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
var keys [][]byte
var values [][]byte
err := kvm.db.Fold(func(key string) error {
keys = append(keys, []byte(key))
2017-01-31 13:09:00 +00:00
if withvalues {
value, err := kvm.db.Get(key)
if err != nil {
return err
}
values = append(values, value)
2017-01-31 13:09:00 +00:00
}
return nil
})
2017-01-31 01:00:20 +00:00
if err != nil {
return nil, err
}
2017-01-31 13:09:00 +00:00
if withvalues {
conn.WriteArray(len(keys) * 2)
} else {
conn.WriteArray(len(keys))
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
for i := 0; i < len(keys); i++ {
conn.WriteBulk(keys[i])
if withvalues {
conn.WriteBulk(values[i])
}
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
return nil, nil
},
)
}
func (kvm *Machine) cmdFlushdb(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 1 {
return nil, finn.ErrWrongNumberOfArguments
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
if err := kvm.db.Sync(); err != nil {
2017-01-31 13:09:00 +00:00
panic(err.Error())
}
return nil, nil
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}