1
0
mirror of https://git.mills.io/kayos/bitraft.git synced 2024-06-27 09:19:00 +00:00
This commit is contained in:
Josh Baker 2017-01-29 10:46:39 -07:00
commit 1cbb6be351
4 changed files with 588 additions and 0 deletions

16
bloom_test.go Normal file

@ -0,0 +1,16 @@
package roam
import (
"fmt"
"testing"
"github.com/willf/bloom"
)
func TestBloom(t *testing.T) {
filter := bloom.New(10000000, 5) // load of 20, 5 keys
filter.Add([]byte("Love"))
println(filter.Test([]byte("Love")))
println(filter.Test([]byte("Loved")))
fmt.Printf("%v\n", filter.EstimateFalsePositiveRate(1000000))
}

358
crud.go Normal file

@ -0,0 +1,358 @@
package roam
import (
"bytes"
"encoding/binary"
"errors"
"os"
"strconv"
"strings"
"time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/tidwall/finn"
"github.com/tidwall/match"
"github.com/tidwall/redcon"
)
var errSyntaxError = errors.New("syntax error")
type dbValue struct {
value []byte
usingEx bool
ex time.Duration
}
func (dbv *dbValue) Bytes() []byte {
if dbv.usingEx {
b := make([]byte, 9+len(dbv.value))
b[0] = 1
binary.LittleEndian.PutUint64(b[1:], uint64(dbv.ex))
copy(b[9:], dbv.value)
return b
}
b := make([]byte, 1+len(dbv.value))
copy(b[1:], dbv.value)
return b
}
func parseDBValue(b []byte, peek bool) (*dbValue, error) {
var dbv dbValue
if len(b) == 0 {
return nil, errors.New("invalid value size")
}
switch b[0] {
default:
return nil, errors.New("invalid value type")
case 0:
if peek {
dbv.value = b[1:]
} else {
dbv.value = make([]byte, len(b)-1)
copy(dbv.value, b[1:])
}
return &dbv, nil
case 1:
if len(b) < 9 {
return nil, errors.New("invalid value size")
}
dbv.usingEx = true
dbv.ex = time.Duration(binary.LittleEndian.Uint64(b[1:]))
if peek {
dbv.value = b[9:]
} else {
dbv.value = make([]byte, len(b)-9)
copy(dbv.value, b[9:])
}
return &dbv, nil
}
}
func (kvm *Machine) cmdSet(
m finn.Applier, conn redcon.Conn, cmd redcon.Command,
) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
if len(cmd.Args) < 3 {
return nil, finn.ErrWrongNumberOfArguments
}
var dbv dbValue
dbv.value = cmd.Args[2]
for i := 3; i < len(cmd.Args); i++ {
switch strings.ToLower(string(cmd.Args[i])) {
default:
return nil, errSyntaxError
case "ex":
i++
if i == len(cmd.Args) {
return nil, errSyntaxError
}
n, err := strconv.ParseFloat(string(cmd.Args[i]), 64)
if err != nil {
return nil, errSyntaxError
}
dbv.ex = time.Duration(n * float64(time.Second))
dbv.usingEx = true
}
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
tx, err := kvm.db.OpenTransaction()
if err != nil {
return nil, err
}
defer tx.Discard()
if err := kvm.db.Put(cmd.Args[1], dbv.Bytes(), kvm.wo); err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return nil, nil
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}
func (kvm *Machine) cmdGet(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
if len(cmd.Args) != 2 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
val, err := kvm.db.Get(cmd.Args[1], nil)
if err != nil {
if err == leveldb.ErrNotFound {
conn.WriteNull()
return nil, nil
}
return nil, err
}
dbv, err := parseDBValue(val, true)
if err != nil {
return nil, err
}
conn.WriteBulk(dbv.value)
return nil, nil
},
)
}
func (kvm *Machine) cmdDel(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
if len(cmd.Args) < 2 {
return nil, finn.ErrWrongNumberOfArguments
}
deleted := make([][]byte, 0, len(cmd.Args))
return m.Apply(conn, cmd,
func() (interface{}, error) {
tx, err := kvm.db.OpenTransaction()
if err != nil {
return 0, err
}
defer tx.Discard()
var n int
for i := 1; i < len(cmd.Args); i++ {
has, err := tx.Has(cmd.Args[i], nil)
if err != nil && err != leveldb.ErrNotFound {
return 0, err
} else if has {
n++
if err := tx.Delete(cmd.Args[i], kvm.wo); err != nil {
return nil, err
}
deleted = append(deleted, cmd.Args[i])
}
}
if err := tx.Commit(); err != nil {
return 0, err
}
for _, key := range deleted {
kvm.deleteExItem(key)
}
return n, nil
},
func(v interface{}) (interface{}, error) {
n := v.(int)
conn.WriteInt(n)
return nil, nil
},
)
}
func (kvm *Machine) cmdKeys(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
if len(cmd.Args) < 2 {
return nil, finn.ErrWrongNumberOfArguments
}
var withvalues bool
var pivot []byte
var usingPivot bool
var desc bool
limit := 500
for i := 2; i < len(cmd.Args); i++ {
switch strings.ToLower(string(cmd.Args[i])) {
default:
return nil, errSyntaxError
case "withvalues":
withvalues = true
case "desc":
desc = true
case "pivot":
i++
if i == len(cmd.Args) {
return nil, errSyntaxError
}
pivot = cmd.Args[i]
usingPivot = true
case "limit":
i++
if i == len(cmd.Args) {
return nil, errSyntaxError
}
n, err := strconv.ParseInt(string(cmd.Args[i]), 10, 64)
if err != nil || n < 0 {
return nil, errSyntaxError
}
limit = int(n)
}
}
pattern := cmd.Args[1]
spattern := string(pattern)
min, max := match.Allowable(spattern)
bmin := []byte(min)
bmax := []byte(max)
var keys [][]byte
var vals [][]byte
useMax := !(len(spattern) > 0 && spattern[0] == '*')
iter := kvm.db.NewIterator(nil, nil)
var ok bool
var movedPast bool
if desc {
var lasted bool
if min == "" && max == "" && spattern != "" {
if usingPivot {
bmax = pivot
} else {
ok = iter.Last()
lasted = true
}
} else if usingPivot {
if bytes.Compare(pivot, bmax) < 0 {
bmax = pivot
}
}
if !lasted {
ok = iter.Seek(bmax)
if !ok {
ok = iter.Last()
} else {
for ; ok; ok = iter.Prev() {
key := iter.Key()
if bytes.Compare(key, bmax) <= 0 {
break
}
}
}
}
} else {
if usingPivot {
if bytes.Compare(pivot, bmin) > 0 {
bmin = pivot
}
}
ok = iter.Seek(bmin)
}
step := func() bool {
if desc {
return iter.Prev()
}
return iter.Next()
}
for ok = iter.Valid(); ok; ok = step() {
key := iter.Key()
if usingPivot {
if !movedPast {
if !desc {
if bytes.Compare(key, pivot) <= 0 {
continue
}
} else {
if bytes.Compare(key, pivot) >= 0 {
continue
}
}
movedPast = true
}
}
if len(keys) >= limit {
break
}
skey := string(key)
if useMax && skey >= max {
break
}
if match.Match(skey, spattern) {
keys = append(keys, []byte(skey))
if withvalues {
value := iter.Value()
vals = append(vals, bcopy(value))
}
}
}
iter.Release()
err := iter.Error()
if err != nil {
return nil, err
}
if err != nil {
conn.WriteError(err.Error())
} else {
if withvalues {
conn.WriteArray(len(keys) * 2)
} else {
conn.WriteArray(len(keys))
}
for i := 0; i < len(keys); i++ {
conn.WriteBulk(keys[i])
if withvalues {
conn.WriteBulk(vals[i])
}
}
}
return nil, nil
}
func bcopy(b []byte) []byte {
r := make([]byte, len(b))
copy(r, b)
return r
}
func (kvm *Machine) cmdFlushdb(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
if len(cmd.Args) != 1 {
return nil, finn.ErrWrongNumberOfArguments
}
if err := kvm.db.Close(); err != nil {
panic(err.Error())
}
if err := os.RemoveAll(kvm.dbPath); err != nil {
panic(err.Error())
}
var err error
kvm.db, err = leveldb.OpenFile(kvm.dbPath, kvm.opts)
if err != nil {
panic(err.Error())
}
conn.WriteString("OK")
return nil, nil
}

185
node.go Normal file

@ -0,0 +1,185 @@
package roam
import (
"bufio"
"compress/gzip"
"encoding/binary"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/tidwall/btree"
"github.com/tidwall/finn"
"github.com/tidwall/redcon"
)
type exItem struct {
key []byte
when time.Time
}
func (a *exItem) Less(v btree.Item, _ interface{}) bool {
return a.when.Before(v.(*exItem).when)
}
func (kvm *Machine) putExItem(dbv *dbValue) {
}
func (kvm *Machine) deleteExItem(key []byte) {
}
type Machine struct {
mu sync.RWMutex // for FlushDB
dir string
db *leveldb.DB
wo *opt.WriteOptions
opts *opt.Options
dbPath string
extr *btree.BTree
exmp map[string]time.Time
}
func NewMachine(dir string, sync bool) (*Machine, error) {
kvm := &Machine{
dir: dir,
extr: btree.New(32, nil),
exmp: make(map[string]time.Time),
}
var err error
kvm.dbPath = filepath.Join(dir, "node.db")
kvm.opts = &opt.Options{
NoSync: !sync,
Filter: filter.NewBloomFilter(10),
}
kvm.wo = &opt.WriteOptions{Sync: sync}
kvm.db, err = leveldb.OpenFile(kvm.dbPath, kvm.opts)
if err != nil {
return nil, err
}
return kvm, nil
}
func (kvm *Machine) Close() error {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
kvm.db.Close()
return nil
}
func (kvm *Machine) Command(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
switch strings.ToLower(string(cmd.Args[0])) {
default:
return nil, finn.ErrUnknownCommand
case "set":
return kvm.cmdSet(m, conn, cmd)
case "get":
return kvm.cmdGet(m, conn, cmd)
case "del":
return kvm.cmdDel(m, conn, cmd)
case "keys":
return kvm.cmdKeys(m, conn, cmd)
case "flushdb":
return kvm.cmdFlushdb(m, conn, cmd)
case "shutdown":
log.Warningf("shutting down")
conn.WriteString("OK")
conn.Close()
os.Exit(0)
return nil, nil
}
}
func (kvm *Machine) Restore(rd io.Reader) error {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var err error
if err := kvm.db.Close(); err != nil {
return err
}
if err := os.RemoveAll(kvm.dbPath); err != nil {
return err
}
kvm.db = nil
kvm.db, err = leveldb.OpenFile(kvm.dbPath, kvm.opts)
if err != nil {
return err
}
var read int
batch := new(leveldb.Batch)
num := make([]byte, 8)
gzr, err := gzip.NewReader(rd)
if err != nil {
return err
}
r := bufio.NewReader(gzr)
for {
if read > 50*1024*1024 {
if err := kvm.db.Write(batch, kvm.wo); err != nil {
return err
}
read = 0
}
if _, err := io.ReadFull(r, num); err != nil {
if err == io.EOF {
break
}
return err
}
key := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, key); err != nil {
return err
}
if _, err := io.ReadFull(r, num); err != nil {
return err
}
value := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, value); err != nil {
return err
}
batch.Put(key, value)
read += (len(key) + len(value))
}
if err := kvm.db.Write(batch, kvm.wo); err != nil {
return err
}
return gzr.Close()
}
func (kvm *Machine) Snapshot(wr io.Writer) error {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
gzw := gzip.NewWriter(wr)
ss, err := kvm.db.GetSnapshot()
if err != nil {
return err
}
defer ss.Release()
iter := ss.NewIterator(nil, nil)
defer iter.Release()
var buf []byte
num := make([]byte, 8)
for ok := iter.First(); ok; ok = iter.Next() {
buf = buf[:0]
key := iter.Key()
value := iter.Value()
binary.LittleEndian.PutUint64(num, uint64(len(key)))
buf = append(buf, num...)
buf = append(buf, key...)
binary.LittleEndian.PutUint64(num, uint64(len(value)))
buf = append(buf, num...)
buf = append(buf, value...)
if _, err := gzw.Write(buf); err != nil {
return err
}
}
if err := gzw.Close(); err != nil {
return err
}
iter.Release()
return iter.Error()
}

29
server.go Normal file

@ -0,0 +1,29 @@
package roam
import (
"os"
"github.com/tidwall/finn"
"github.com/tidwall/redlog"
)
var log = redlog.New(os.Stderr)
func ListenAndServe(addr, dir string) error {
var opts finn.Options
opts.Backend = finn.LevelDB
opts.Consistency = finn.Low
opts.Durability = finn.Low
m, err := NewMachine(dir, opts.Durability == finn.High)
if err != nil {
return err
}
n, err := finn.Open(dir, addr, "", m, &opts)
if err != nil {
return err
}
defer n.Close()
select {
// blocking
}
}