1
0
mirror of https://git.mills.io/kayos/bitraft.git synced 2024-06-30 18:51:41 +00:00
bitraft/server.go

527 lines
11 KiB
Go
Raw Normal View History

2017-01-31 13:58:02 +00:00
package node
2017-01-29 17:46:39 +00:00
import (
"bufio"
2017-01-31 13:09:00 +00:00
"bytes"
2017-01-29 17:46:39 +00:00
"compress/gzip"
"encoding/binary"
2017-01-31 01:00:20 +00:00
"errors"
2017-01-29 17:46:39 +00:00
"io"
"os"
"path/filepath"
2017-01-31 01:00:20 +00:00
"strconv"
2017-01-29 17:46:39 +00:00
"strings"
"sync"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
2017-01-31 13:58:02 +00:00
"github.com/tidwall/finn"
2017-01-31 13:09:00 +00:00
"github.com/tidwall/match"
2017-01-29 17:46:39 +00:00
"github.com/tidwall/redcon"
2017-01-31 13:09:00 +00:00
"github.com/tidwall/redlog"
2017-01-29 17:46:39 +00:00
)
2017-01-31 13:09:00 +00:00
var (
errSyntaxError = errors.New("syntax error")
log = redlog.New(os.Stderr)
)
2017-01-31 15:48:42 +00:00
func ListenAndServe(addr, join, dir, logdir string, fastlog bool, consistency, durability finn.Level) error {
2017-01-31 13:09:00 +00:00
var opts finn.Options
2017-01-31 15:48:42 +00:00
if fastlog {
opts.Backend = finn.LevelDB
} else {
opts.Backend = finn.FastLog
}
2017-01-31 15:03:12 +00:00
opts.Consistency = consistency
opts.Durability = durability
2017-01-31 13:09:00 +00:00
m, err := NewMachine(dir, addr)
if err != nil {
return err
}
2017-01-31 15:43:05 +00:00
n, err := finn.Open(logdir, addr, join, m, &opts)
2017-01-31 13:09:00 +00:00
if err != nil {
return err
}
defer n.Close()
select {
// blocking, there's no way out
}
}
2017-01-29 17:46:39 +00:00
type Machine struct {
2017-01-31 13:09:00 +00:00
mu sync.RWMutex
2017-01-29 17:46:39 +00:00
dir string
db *leveldb.DB
opts *opt.Options
dbPath string
2017-01-31 01:00:20 +00:00
addr string
closed bool
2017-01-29 17:46:39 +00:00
}
2017-01-31 01:00:20 +00:00
func NewMachine(dir, addr string) (*Machine, error) {
2017-01-29 17:46:39 +00:00
kvm := &Machine{
dir: dir,
2017-01-31 01:00:20 +00:00
addr: addr,
2017-01-29 17:46:39 +00:00
}
var err error
kvm.dbPath = filepath.Join(dir, "node.db")
kvm.opts = &opt.Options{
2017-01-31 01:00:20 +00:00
NoSync: true,
2017-01-29 17:46:39 +00:00
Filter: filter.NewBloomFilter(10),
}
kvm.db, err = leveldb.OpenFile(kvm.dbPath, kvm.opts)
if err != nil {
return nil, err
}
return kvm, nil
}
func (kvm *Machine) Close() error {
2017-01-31 01:00:20 +00:00
kvm.mu.Lock()
defer kvm.mu.Unlock()
2017-01-29 17:46:39 +00:00
kvm.db.Close()
2017-01-31 01:00:20 +00:00
kvm.closed = true
2017-01-29 17:46:39 +00:00
return nil
}
2017-01-31 01:00:20 +00:00
func (kvm *Machine) Command(
2017-01-31 13:58:02 +00:00
m finn.Applier, conn redcon.Conn, cmd redcon.Command,
2017-01-31 01:00:20 +00:00
) (interface{}, error) {
2017-01-29 17:46:39 +00:00
switch strings.ToLower(string(cmd.Args[0])) {
default:
return nil, finn.ErrUnknownCommand
case "set":
2017-01-31 13:58:02 +00:00
return kvm.cmdSet(m, conn, cmd)
2017-01-31 20:48:02 +00:00
case "mset":
return kvm.cmdMset(m, conn, cmd)
2017-01-29 17:46:39 +00:00
case "get":
return kvm.cmdGet(m, conn, cmd)
2017-01-31 20:48:02 +00:00
case "mget":
return kvm.cmdMget(m, conn, cmd)
2017-01-29 17:46:39 +00:00
case "del":
2017-01-31 23:43:08 +00:00
return kvm.cmdDel(m, conn, cmd, false)
case "delif":
return kvm.cmdDel(m, conn, cmd, true)
2017-01-29 17:46:39 +00:00
case "keys":
return kvm.cmdKeys(m, conn, cmd)
case "flushdb":
return kvm.cmdFlushdb(m, conn, cmd)
case "shutdown":
log.Warningf("shutting down")
conn.WriteString("OK")
conn.Close()
os.Exit(0)
return nil, nil
}
}
func (kvm *Machine) Restore(rd io.Reader) error {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var err error
if err := kvm.db.Close(); err != nil {
return err
}
if err := os.RemoveAll(kvm.dbPath); err != nil {
return err
}
kvm.db = nil
kvm.db, err = leveldb.OpenFile(kvm.dbPath, kvm.opts)
if err != nil {
return err
}
var read int
batch := new(leveldb.Batch)
num := make([]byte, 8)
gzr, err := gzip.NewReader(rd)
if err != nil {
return err
}
r := bufio.NewReader(gzr)
for {
2017-01-31 01:00:20 +00:00
if read > 4*1024*1024 {
2017-01-31 13:09:00 +00:00
if err := kvm.db.Write(batch, nil); err != nil {
2017-01-29 17:46:39 +00:00
return err
}
read = 0
}
if _, err := io.ReadFull(r, num); err != nil {
if err == io.EOF {
break
}
return err
}
key := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, key); err != nil {
return err
}
if _, err := io.ReadFull(r, num); err != nil {
return err
}
value := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, value); err != nil {
return err
}
batch.Put(key, value)
read += (len(key) + len(value))
}
2017-01-31 13:09:00 +00:00
if err := kvm.db.Write(batch, nil); err != nil {
2017-01-29 17:46:39 +00:00
return err
}
return gzr.Close()
}
func (kvm *Machine) Snapshot(wr io.Writer) error {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
gzw := gzip.NewWriter(wr)
ss, err := kvm.db.GetSnapshot()
if err != nil {
return err
}
defer ss.Release()
iter := ss.NewIterator(nil, nil)
defer iter.Release()
var buf []byte
num := make([]byte, 8)
for ok := iter.First(); ok; ok = iter.Next() {
buf = buf[:0]
key := iter.Key()
value := iter.Value()
binary.LittleEndian.PutUint64(num, uint64(len(key)))
buf = append(buf, num...)
buf = append(buf, key...)
binary.LittleEndian.PutUint64(num, uint64(len(value)))
buf = append(buf, num...)
buf = append(buf, value...)
if _, err := gzw.Write(buf); err != nil {
return err
}
}
if err := gzw.Close(); err != nil {
return err
}
iter.Release()
return iter.Error()
}
2017-01-31 01:00:20 +00:00
2017-01-31 13:09:00 +00:00
func (kvm *Machine) cmdSet(
m finn.Applier, conn redcon.Conn, cmd redcon.Command,
) (interface{}, error) {
if len(cmd.Args) != 3 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
return nil, kvm.db.Put(makeKey('k', cmd.Args[1]), cmd.Args[2], nil)
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}
2017-01-31 20:48:02 +00:00
func (kvm *Machine) cmdMset(
m finn.Applier, conn redcon.Conn, cmd redcon.Command,
) (interface{}, error) {
if len(cmd.Args) < 3 || (len(cmd.Args)-1)%2 == 1 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var batch leveldb.Batch
for i := 1; i < len(cmd.Args); i += 2 {
batch.Put(makeKey('k', cmd.Args[i]), cmd.Args[i+1])
}
return nil, kvm.db.Write(&batch, nil)
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}
2017-01-31 13:09:00 +00:00
func (kvm *Machine) cmdGet(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 2 {
return nil, finn.ErrWrongNumberOfArguments
}
key := makeKey('k', cmd.Args[1])
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
2017-01-31 01:00:20 +00:00
kvm.mu.RLock()
2017-01-31 13:09:00 +00:00
defer kvm.mu.RUnlock()
value, err := kvm.db.Get(key, nil)
2017-01-31 01:00:20 +00:00
if err != nil {
2017-01-31 13:09:00 +00:00
if err == leveldb.ErrNotFound {
conn.WriteNull()
return nil, nil
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
return nil, err
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
conn.WriteBulk(value)
return nil, nil
},
)
}
2017-01-31 20:48:02 +00:00
func (kvm *Machine) cmdMget(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) < 2 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
var values [][]byte
for i := 1; i < len(cmd.Args); i++ {
key := makeKey('k', cmd.Args[i])
value, err := kvm.db.Get(key, nil)
if err != nil {
if err == leveldb.ErrNotFound {
values = append(values, nil)
} else {
return nil, err
}
} else {
values = append(values, bcopy(value))
}
}
conn.WriteArray(len(values))
for _, v := range values {
if v == nil {
conn.WriteNull()
} else {
conn.WriteBulk(v)
}
}
return nil, nil
},
)
}
2017-01-31 23:43:08 +00:00
func (kvm *Machine) cmdDel(m finn.Applier, conn redcon.Conn, cmd redcon.Command, delif bool) (interface{}, error) {
if (delif && len(cmd.Args) < 3) || len(cmd.Args) < 2 {
2017-01-31 13:09:00 +00:00
return nil, finn.ErrWrongNumberOfArguments
}
2017-01-31 23:43:08 +00:00
var valueif []byte
var startIdx = 1
if delif {
valueif = cmd.Args[1]
startIdx = 2
}
2017-01-31 13:09:00 +00:00
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var batch leveldb.Batch
var n int
2017-01-31 23:43:08 +00:00
for i := startIdx; i < len(cmd.Args); i++ {
2017-01-31 13:09:00 +00:00
key := makeKey('k', cmd.Args[i])
2017-01-31 23:43:08 +00:00
var has bool
var err error
var val []byte
if delif {
val, err = kvm.db.Get(key, nil)
if err == nil {
has = bytes.Contains(val, valueif)
}
} else {
has, err = kvm.db.Has(key, nil)
}
2017-01-31 13:09:00 +00:00
if err != nil && err != leveldb.ErrNotFound {
return 0, err
} else if has {
n++
batch.Delete(key)
2017-01-31 01:00:20 +00:00
}
}
2017-01-31 13:09:00 +00:00
if err := kvm.db.Write(&batch, nil); err != nil {
return nil, err
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
return n, nil
},
func(v interface{}) (interface{}, error) {
n := v.(int)
conn.WriteInt(n)
return nil, nil
},
)
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
func (kvm *Machine) cmdKeys(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) < 2 {
return nil, finn.ErrWrongNumberOfArguments
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
var withvalues bool
var pivot []byte
var usingPivot bool
var desc bool
limit := 500
for i := 2; i < len(cmd.Args); i++ {
switch strings.ToLower(string(cmd.Args[i])) {
default:
return nil, errSyntaxError
case "withvalues":
withvalues = true
case "desc":
desc = true
case "pivot":
i++
if i == len(cmd.Args) {
return nil, errSyntaxError
}
pivot = makeKey('k', cmd.Args[i])
usingPivot = true
case "limit":
i++
if i == len(cmd.Args) {
return nil, errSyntaxError
}
n, err := strconv.ParseInt(string(cmd.Args[i]), 10, 64)
if err != nil || n < 0 {
return nil, errSyntaxError
}
limit = int(n)
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
}
pattern := makeKey('k', cmd.Args[1])
spattern := string(pattern)
min, max := match.Allowable(spattern)
bmin := []byte(min)
bmax := []byte(max)
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
var keys [][]byte
var values [][]byte
iter := kvm.db.NewIterator(nil, nil)
var ok bool
if desc {
if usingPivot && bytes.Compare(pivot, bmax) < 0 {
bmax = pivot
}
ok = iter.Seek(bmax)
if !ok {
ok = iter.Last()
}
} else {
if usingPivot && bytes.Compare(pivot, bmin) > 0 {
bmin = pivot
}
ok = iter.Seek(bmin)
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
step := func() bool {
if desc {
return iter.Prev()
} else {
return iter.Next()
}
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
var inRange bool
for ; ok; ok = step() {
if len(keys) == limit {
break
}
rkey := iter.Key()
if desc {
if !inRange {
if bytes.Compare(rkey, bmax) >= 0 {
continue
}
inRange = true
}
if bytes.Compare(rkey, bmin) < 0 {
break
}
} else {
if !inRange {
if usingPivot {
if bytes.Compare(rkey, bmin) <= 0 {
continue
}
}
inRange = true
}
if bytes.Compare(rkey, bmax) >= 0 {
break
}
}
skey := string(rkey)
if !match.Match(skey, spattern) {
continue
}
keys = append(keys, bcopy(rkey[1:]))
if withvalues {
values = append(values, bcopy(iter.Value()))
}
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
iter.Release()
err := iter.Error()
2017-01-31 01:00:20 +00:00
if err != nil {
return nil, err
}
2017-01-31 13:09:00 +00:00
if withvalues {
conn.WriteArray(len(keys) * 2)
} else {
conn.WriteArray(len(keys))
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
for i := 0; i < len(keys); i++ {
conn.WriteBulk(keys[i])
if withvalues {
conn.WriteBulk(values[i])
}
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
return nil, nil
},
)
}
func (kvm *Machine) cmdFlushdb(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 1 {
return nil, finn.ErrWrongNumberOfArguments
2017-01-31 01:00:20 +00:00
}
2017-01-31 13:09:00 +00:00
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
if err := kvm.db.Close(); err != nil {
panic(err.Error())
}
if err := os.RemoveAll(kvm.dbPath); err != nil {
panic(err.Error())
}
var err error
kvm.db, err = leveldb.OpenFile(kvm.dbPath, kvm.opts)
if err != nil {
panic(err.Error())
}
return nil, nil
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}
func makeKey(prefix byte, b []byte) []byte {
key := make([]byte, 1+len(b))
key[0] = prefix
copy(key[1:], b)
return key
}
func bcopy(b []byte) []byte {
r := make([]byte, len(b))
copy(r, b)
return r
2017-01-31 01:00:20 +00:00
}