1
0
mirror of https://git.mills.io/kayos/bitraft.git synced 2024-06-27 09:19:00 +00:00

simplified api

This commit is contained in:
Josh Baker 2017-01-31 06:09:00 -07:00
parent 4cf7cde693
commit 0e19ac1bb3
5 changed files with 265 additions and 748 deletions

@ -1,16 +0,0 @@
package roam
import (
"fmt"
"testing"
"github.com/willf/bloom"
)
func TestBloom(t *testing.T) {
filter := bloom.New(10000000, 5) // load of 20, 5 keys
filter.Add([]byte("Love"))
println(filter.Test([]byte("Love")))
println(filter.Test([]byte("Loved")))
fmt.Printf("%v\n", filter.EstimateFalsePositiveRate(1000000))
}

454
crud.go

@ -1,454 +0,0 @@
package roam
import (
"bytes"
"encoding/binary"
"errors"
"os"
"strconv"
"strings"
"time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/tidwall/match"
"github.com/tidwall/redcon"
"github.com/tile38/roam/finn"
)
// k{KEY} -> data
// e{KEY} -> unix-ttl + index
// x{UNIX}{INDEX}{KEY} -> empty
var errSyntaxError = errors.New("syntax error")
func (kvm *Machine) cmdSet(
m finn.Applier, conn redcon.Conn, cmd redcon.Command,
index uint64,
) (interface{}, error) {
if len(cmd.Args) < 3 {
return nil, finn.ErrWrongNumberOfArguments
}
var unix uint64
var usingEx bool
for i := 3; i < len(cmd.Args); i++ {
switch strings.ToLower(string(cmd.Args[i])) {
default:
return nil, errSyntaxError
case "ex":
i++
if i == len(cmd.Args) {
return nil, errSyntaxError
}
n, err := strconv.ParseFloat(string(cmd.Args[i]), 64)
if err != nil || n < 0 {
return nil, errSyntaxError
}
unix = uint64(now().Add(time.Duration(n * float64(time.Second))).UnixNano())
usingEx = true
}
}
kKey := makeKey('k', cmd.Args[1])
eKey := makeKey('e', cmd.Args[1])
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var batch leveldb.Batch
eVal, err := kvm.db.Get(eKey, nil)
if err != nil {
if err != leveldb.ErrNotFound {
return nil, err
}
} else {
if len(eVal) != 16 {
return nil, errors.New("invalid exkey value")
}
xKey := make([]byte, 17+len(cmd.Args[1]))
xKey[0] = 'x'
copy(xKey[1:], eVal)
copy(xKey[17:], cmd.Args[1])
batch.Delete(xKey)
if !usingEx {
batch.Delete(eKey)
}
}
if usingEx {
eVal := make([]byte, 16)
binary.BigEndian.PutUint64(eVal, unix)
binary.BigEndian.PutUint64(eVal[8:], index)
batch.Put(eKey, eVal)
xKey := make([]byte, 17+len(cmd.Args[1]))
xKey[0] = 'x'
copy(xKey[1:], eVal)
copy(xKey[17:], cmd.Args[1])
batch.Put(xKey, []byte{})
}
batch.Put(kKey, cmd.Args[2])
return nil, kvm.db.Write(&batch, kvm.wo)
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}
func (kvm *Machine) cmdDelif(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) < 2 {
return nil, finn.ErrWrongNumberOfArguments
}
maxidx, err := strconv.ParseUint(string(cmd.Args[1]), 10, 64)
if err != nil {
return nil, err
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var batch leveldb.Batch
for _, key := range cmd.Args[2:] {
eKey := makeKey('e', key)
eVal, err := kvm.db.Get(eKey, nil)
if err != nil || len(eVal) != 16 {
continue // just ignore the error for now
}
index := binary.BigEndian.Uint64(eVal[8:])
if index <= maxidx {
kKey := makeKey('k', key)
xKey := make([]byte, 17+len(key))
xKey[0] = 'x'
copy(xKey[1:], eVal)
copy(xKey[17:], key)
batch.Delete(eKey)
batch.Delete(kKey)
batch.Delete(xKey)
}
}
return nil, kvm.db.Write(&batch, kvm.wo)
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}
func (kvm *Machine) cmdListex(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 2 {
return nil, finn.ErrWrongNumberOfArguments
}
limit, err := strconv.ParseUint(string(cmd.Args[1]), 10, 64)
if err != nil {
return nil, err
}
n := now()
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
s, err := kvm.db.GetSnapshot()
if err != nil {
return nil, err
}
defer s.Release()
iter := s.NewIterator(nil, nil)
var keys [][]byte
var indexes []uint64
ok := iter.Seek([]byte{'x'})
for ; ok; ok = iter.Next() {
if len(keys) == int(limit) {
break
}
xKey := iter.Key()
if len(xKey) < 17 || xKey[0] != 'x' {
break
}
exi := int64(binary.BigEndian.Uint64(xKey[1:]))
ex := time.Unix(0, exi)
if ex.After(n) {
break
}
index := uint64(binary.BigEndian.Uint64(xKey[9:]))
keys = append(keys, bcopy(xKey[17:]))
indexes = append(indexes, index)
}
iter.Release()
err = iter.Error()
if err != nil {
return nil, err
}
conn.WriteArray(len(keys) * 2)
for i := 0; i < len(keys); i++ {
conn.WriteBulk(keys[i])
conn.WriteBulkString(strconv.FormatUint(indexes[i], 10))
}
return nil, nil
},
)
}
func (kvm *Machine) cmdDump(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 1 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
s, err := kvm.db.GetSnapshot()
if err != nil {
return nil, err
}
defer s.Release()
iter := s.NewIterator(nil, nil)
var keys [][]byte
var vals [][]byte
for ok := iter.First(); ok; ok = iter.Next() {
keys = append(keys, bcopy(iter.Key()))
vals = append(vals, bcopy(iter.Value()))
}
iter.Release()
err = iter.Error()
if err != nil {
return nil, err
}
conn.WriteArray(len(keys) * 2)
for i := 0; i < len(keys); i++ {
conn.WriteBulk(keys[i])
conn.WriteBulk(vals[i])
}
return nil, nil
},
)
}
func (kvm *Machine) cmdGet(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 2 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
value, err := kvm.db.Get(makeKey('k', cmd.Args[1]), nil)
if err != nil {
if err == leveldb.ErrNotFound {
conn.WriteNull()
return nil, nil
}
return nil, err
}
conn.WriteBulk(value)
return nil, nil
},
)
}
func (kvm *Machine) cmdDel(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) < 2 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var batch leveldb.Batch
var n int
for i := 1; i < len(cmd.Args); i++ {
kKey := makeKey('k', cmd.Args[i])
has, err := kvm.db.Has(kKey, nil)
if err != nil && err != leveldb.ErrNotFound {
return 0, err
} else if has {
n++
batch.Delete(kKey)
eKey := makeKey('e', cmd.Args[i])
eVal, err := kvm.db.Get(eKey, nil)
if err == nil {
if len(eVal) != 16 {
return nil, errors.New("invalid exkey value")
}
xKey := make([]byte, 17+len(cmd.Args[1]))
xKey[0] = 'x'
copy(xKey[1:], eVal)
copy(xKey[17:], cmd.Args[1])
batch.Delete(xKey)
batch.Delete(eKey)
} else if err != leveldb.ErrNotFound {
return nil, err
}
}
}
if err := kvm.db.Write(&batch, kvm.wo); err != nil {
return nil, err
}
return n, nil
},
func(v interface{}) (interface{}, error) {
n := v.(int)
conn.WriteInt(n)
return nil, nil
},
)
}
func (kvm *Machine) cmdKeys(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) < 2 {
return nil, finn.ErrWrongNumberOfArguments
}
var withvalues bool
var pivot []byte
var usingPivot bool
var desc bool
limit := 500
for i := 2; i < len(cmd.Args); i++ {
switch strings.ToLower(string(cmd.Args[i])) {
default:
return nil, errSyntaxError
case "withvalues":
withvalues = true
case "desc":
desc = true
case "pivot":
i++
if i == len(cmd.Args) {
return nil, errSyntaxError
}
pivot = makeKey('k', cmd.Args[i])
usingPivot = true
case "limit":
i++
if i == len(cmd.Args) {
return nil, errSyntaxError
}
n, err := strconv.ParseInt(string(cmd.Args[i]), 10, 64)
if err != nil || n < 0 {
return nil, errSyntaxError
}
limit = int(n)
}
}
pattern := makeKey('k', cmd.Args[1])
spattern := string(pattern)
min, max := match.Allowable(spattern)
bmin := []byte(min)
bmax := []byte(max)
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
var keys [][]byte
var values [][]byte
iter := kvm.db.NewIterator(nil, nil)
var ok bool
if desc {
if usingPivot && bytes.Compare(pivot, bmax) < 0 {
bmax = pivot
}
ok = iter.Seek(bmax)
if !ok {
ok = iter.Last()
}
} else {
if usingPivot && bytes.Compare(pivot, bmin) > 0 {
bmin = pivot
}
ok = iter.Seek(bmin)
}
step := func() bool {
if desc {
return iter.Prev()
} else {
return iter.Next()
}
}
var inRange bool
for ; ok; ok = step() {
if len(keys) == limit {
break
}
rkey := iter.Key()
if desc {
if !inRange {
if bytes.Compare(rkey, bmax) >= 0 {
continue
}
inRange = true
}
if bytes.Compare(rkey, bmin) < 0 {
break
}
} else {
if !inRange {
if usingPivot {
if bytes.Compare(rkey, bmin) <= 0 {
continue
}
}
inRange = true
}
if bytes.Compare(rkey, bmax) >= 0 {
break
}
}
skey := string(rkey)
if !match.Match(skey, spattern) {
continue
}
keys = append(keys, bcopy(rkey[1:]))
if withvalues {
values = append(values, bcopy(iter.Value()))
}
}
iter.Release()
err := iter.Error()
if err != nil {
return nil, err
}
if withvalues {
conn.WriteArray(len(keys) * 2)
} else {
conn.WriteArray(len(keys))
}
for i := 0; i < len(keys); i++ {
conn.WriteBulk(keys[i])
if withvalues {
conn.WriteBulk(values[i])
}
}
return nil, nil
},
)
}
func bcopy(b []byte) []byte {
r := make([]byte, len(b))
copy(r, b)
return r
}
func (kvm *Machine) cmdFlushdb(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 1 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
if err := kvm.db.Close(); err != nil {
panic(err.Error())
}
if err := os.RemoveAll(kvm.dbPath); err != nil {
panic(err.Error())
}
var err error
kvm.db, err = leveldb.OpenFile(kvm.dbPath, kvm.opts)
if err != nil {
panic(err.Error())
}
return nil, nil
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}

@ -1,89 +0,0 @@
package roam
type keyword int
const (
kwUnknown keyword = iota
kwSET
kwGET
kwDEL
kwKEYS
kwDUMP
kwDELIF
kwLISTEX
kwFLUSHDB
kwSHUTDOWN
)
func pkeyword(b []byte) keyword {
switch len(b) {
case 3:
if (b[0] == 's' || b[0] == 'S') &&
(b[1] == 'e' || b[1] == 'E') &&
(b[2] == 't' || b[2] == 'T') {
return kwSET
}
if (b[0] == 'g' || b[0] == 'G') &&
(b[1] == 'e' || b[1] == 'E') &&
(b[2] == 't' || b[2] == 'T') {
return kwGET
}
if (b[0] == 'd' || b[0] == 'D') &&
(b[1] == 'e' || b[1] == 'E') &&
(b[2] == 'l' || b[2] == 'L') {
return kwDEL
}
case 4:
if (b[0] == 'k' || b[0] == 'K') &&
(b[1] == 'e' || b[1] == 'E') &&
(b[2] == 'y' || b[2] == 'Y') &&
(b[3] == 's' || b[3] == 'S') {
return kwKEYS
}
if (b[0] == 'd' || b[0] == 'D') &&
(b[1] == 'u' || b[1] == 'U') &&
(b[2] == 'm' || b[2] == 'M') &&
(b[3] == 'p' || b[3] == 'P') {
return kwDUMP
}
case 5:
if (b[0] == 'd' || b[0] == 'D') &&
(b[1] == 'e' || b[1] == 'E') &&
(b[2] == 'l' || b[2] == 'L') &&
(b[3] == 'i' || b[3] == 'I') &&
(b[4] == 'f' || b[4] == 'F') {
return kwDELIF
}
case 6:
if (b[0] == 'l' || b[0] == 'L') &&
(b[1] == 'i' || b[1] == 'I') &&
(b[2] == 's' || b[2] == 'S') &&
(b[3] == 't' || b[3] == 'T') &&
(b[4] == 'e' || b[4] == 'E') &&
(b[5] == 'x' || b[5] == 'X') {
return kwLISTEX
}
case 7:
if (b[0] == 'f' || b[0] == 'F') &&
(b[1] == 'l' || b[1] == 'L') &&
(b[2] == 'u' || b[2] == 'U') &&
(b[3] == 's' || b[3] == 'S') &&
(b[4] == 'h' || b[4] == 'H') &&
(b[5] == 'd' || b[5] == 'D') &&
(b[6] == 'b' || b[6] == 'B') {
return kwFLUSHDB
}
case 8:
if (b[0] == 's' || b[0] == 'S') &&
(b[1] == 'h' || b[1] == 'H') &&
(b[2] == 'u' || b[2] == 'U') &&
(b[3] == 't' || b[3] == 'T') &&
(b[4] == 'd' || b[4] == 'D') &&
(b[5] == 'o' || b[5] == 'O') &&
(b[6] == 'w' || b[6] == 'W') &&
(b[7] == 'n' || b[7] == 'N') {
return kwSHUTDOWN
}
}
return kwUnknown
}

424
node.go

@ -2,31 +2,55 @@ package roam
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/tidwall/match"
"github.com/tidwall/redcon"
"github.com/tidwall/redlog"
"github.com/tile38/roam/finn"
)
var (
errSyntaxError = errors.New("syntax error")
log = redlog.New(os.Stderr)
)
func ListenAndServe(addr, join, dir string) error {
var opts finn.Options
opts.Backend = finn.FastLog
opts.Consistency = finn.High
opts.Durability = finn.High
m, err := NewMachine(dir, addr)
if err != nil {
return err
}
n, err := finn.Open(dir, addr, join, m, &opts)
if err != nil {
return err
}
defer n.Close()
select {
// blocking, there's no way out
}
}
type Machine struct {
mu sync.RWMutex // for FlushDB
mu sync.RWMutex
dir string
db *leveldb.DB
wo *opt.WriteOptions
opts *opt.Options
dbPath string
addr string
@ -44,12 +68,10 @@ func NewMachine(dir, addr string) (*Machine, error) {
NoSync: true,
Filter: filter.NewBloomFilter(10),
}
kvm.wo = &opt.WriteOptions{Sync: false}
kvm.db, err = leveldb.OpenFile(kvm.dbPath, kvm.opts)
if err != nil {
return nil, err
}
go kvm.selfManage()
return kvm, nil
}
@ -77,14 +99,6 @@ func (kvm *Machine) Command(
return kvm.cmdKeys(m, conn, cmd)
case "flushdb":
return kvm.cmdFlushdb(m, conn, cmd)
case "dump":
return kvm.cmdDump(m, conn, cmd)
case "listex":
// LISTEX limit
return kvm.cmdListex(m, conn, cmd)
case "delif":
// DELIF maxindex key [key ...]
return kvm.cmdDelif(m, conn, cmd)
case "shutdown":
log.Warningf("shutting down")
conn.WriteString("OK")
@ -119,7 +133,7 @@ func (kvm *Machine) Restore(rd io.Reader) error {
r := bufio.NewReader(gzr)
for {
if read > 4*1024*1024 {
if err := kvm.db.Write(batch, kvm.wo); err != nil {
if err := kvm.db.Write(batch, nil); err != nil {
return err
}
read = 0
@ -144,7 +158,7 @@ func (kvm *Machine) Restore(rd io.Reader) error {
batch.Put(key, value)
read += (len(key) + len(value))
}
if err := kvm.db.Write(batch, kvm.wo); err != nil {
if err := kvm.db.Write(batch, nil); err != nil {
return err
}
return gzr.Close()
@ -184,155 +198,247 @@ func (kvm *Machine) Snapshot(wr io.Writer) error {
return iter.Error()
}
func (kvm *Machine) selfManage() {
var fast bool
for {
if fast {
time.Sleep(time.Millisecond)
} else {
time.Sleep(time.Second / 4)
}
fast = false
ok := func() bool {
func (kvm *Machine) cmdSet(
m finn.Applier, conn redcon.Conn, cmd redcon.Command,
index uint64,
) (interface{}, error) {
if len(cmd.Args) != 3 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
return nil, kvm.db.Put(makeKey('k', cmd.Args[1]), cmd.Args[2], nil)
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}
func (kvm *Machine) cmdGet(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 2 {
return nil, finn.ErrWrongNumberOfArguments
}
key := makeKey('k', cmd.Args[1])
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
if kvm.closed {
kvm.mu.RUnlock()
return false
}
kvm.mu.RUnlock()
const K = 1000
// ok let's find out if there're any keys that need deleting
v, err := do(kvm.addr, "LISTEX", K)
defer kvm.mu.RUnlock()
value, err := kvm.db.Get(key, nil)
if err != nil {
errs := err.Error()
if errs != "ERR leader not known" &&
!strings.HasPrefix(errs, "TRY ") {
log.Warningf("%v", err)
if err == leveldb.ErrNotFound {
conn.WriteNull()
return nil, nil
}
return true
return nil, err
}
ss, ok := v.([]string)
if !ok || len(ss) == 0 || len(ss)%2 != 0 {
return true
}
var delargs []interface{}
var maxidx uint64
delargs = append(delargs, "DELIF", 0)
for i := 0; i < len(ss); i += 2 {
key := ss[i+0]
index, err := strconv.ParseUint(ss[i+1], 10, 64)
if err != nil {
log.Warningf("%v", err)
return true
}
if index > maxidx {
maxidx = index
}
delargs = append(delargs, key)
}
delargs[1] = maxidx
v, err = do(kvm.addr, delargs...)
if err != nil {
log.Warningf("delfi failed: %v", err)
return true
}
if s, ok := v.(string); !ok || s != "OK" {
log.Warningf("delfi failed: %v", v)
return true
}
if len(ss)/2 >= K {
fast = true
}
return true
}()
if !ok {
return
}
}
conn.WriteBulk(value)
return nil, nil
},
)
}
func do(addr string, args ...interface{}) (interface{}, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
func (kvm *Machine) cmdDel(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) < 2 {
return nil, finn.ErrWrongNumberOfArguments
}
defer conn.Close()
var buf []byte
buf = append(buf, '*')
buf = strconv.AppendInt(buf, int64(len(args)), 10)
buf = append(buf, '\r', '\n')
for _, arg := range args {
args := fmt.Sprintf("%v", arg)
buf = append(buf, '$')
buf = strconv.AppendInt(buf, int64(len(args)), 10)
buf = append(buf, '\r', '\n')
buf = append(buf, args...)
buf = append(buf, '\r', '\n')
}
if _, err := conn.Write(buf); err != nil {
return nil, err
}
rd := bufio.NewReader(conn)
c, err := rd.ReadByte()
if err != nil {
return nil, err
}
l, err := rd.ReadString('\n')
if err != nil {
return nil, err
}
s := strings.TrimSpace(l)
if c == '-' {
return nil, errors.New(s)
} else if c == '+' {
return s, nil
} else if c == '$' {
n, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return nil, err
}
data := make([]byte, int(n)+2)
if _, err := io.ReadFull(rd, data); err != nil {
return nil, err
}
if data[len(data)-2] != '\r' || data[len(data)-1] != '\n' {
return nil, errors.New("invalid line ending")
}
return string(data[:len(data)-2]), nil
} else if c == '*' {
n, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return nil, err
}
var res []string
for i := 0; i < int(n); i++ {
c, err := rd.ReadByte()
if err != nil {
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var batch leveldb.Batch
var n int
for i := 1; i < len(cmd.Args); i++ {
key := makeKey('k', cmd.Args[i])
has, err := kvm.db.Has(key, nil)
if err != nil && err != leveldb.ErrNotFound {
return 0, err
} else if has {
n++
batch.Delete(key)
}
}
if err := kvm.db.Write(&batch, nil); err != nil {
return nil, err
}
l, err := rd.ReadString('\n')
if err != nil {
return nil, err
}
if c != '$' {
return nil, errors.New("invalid character")
}
s := strings.TrimSpace(l)
n, err := strconv.ParseUint(s, 10, 64)
if err != nil {
return nil, err
}
data := make([]byte, int(n)+2)
if _, err := io.ReadFull(rd, data); err != nil {
return nil, err
}
if data[len(data)-2] != '\r' || data[len(data)-1] != '\n' {
return nil, errors.New("invalid line ending")
}
res = append(res, string(data[:len(data)-2]))
}
return res, nil
}
return nil, nil
return n, nil
},
func(v interface{}) (interface{}, error) {
n := v.(int)
conn.WriteInt(n)
return nil, nil
},
)
}
func (kvm *Machine) cmdKeys(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) < 2 {
return nil, finn.ErrWrongNumberOfArguments
}
var withvalues bool
var pivot []byte
var usingPivot bool
var desc bool
limit := 500
for i := 2; i < len(cmd.Args); i++ {
switch strings.ToLower(string(cmd.Args[i])) {
default:
return nil, errSyntaxError
case "withvalues":
withvalues = true
case "desc":
desc = true
case "pivot":
i++
if i == len(cmd.Args) {
return nil, errSyntaxError
}
pivot = makeKey('k', cmd.Args[i])
usingPivot = true
case "limit":
i++
if i == len(cmd.Args) {
return nil, errSyntaxError
}
n, err := strconv.ParseInt(string(cmd.Args[i]), 10, 64)
if err != nil || n < 0 {
return nil, errSyntaxError
}
limit = int(n)
}
}
pattern := makeKey('k', cmd.Args[1])
spattern := string(pattern)
min, max := match.Allowable(spattern)
bmin := []byte(min)
bmax := []byte(max)
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
var keys [][]byte
var values [][]byte
iter := kvm.db.NewIterator(nil, nil)
var ok bool
if desc {
if usingPivot && bytes.Compare(pivot, bmax) < 0 {
bmax = pivot
}
ok = iter.Seek(bmax)
if !ok {
ok = iter.Last()
}
} else {
if usingPivot && bytes.Compare(pivot, bmin) > 0 {
bmin = pivot
}
ok = iter.Seek(bmin)
}
step := func() bool {
if desc {
return iter.Prev()
} else {
return iter.Next()
}
}
var inRange bool
for ; ok; ok = step() {
if len(keys) == limit {
break
}
rkey := iter.Key()
if desc {
if !inRange {
if bytes.Compare(rkey, bmax) >= 0 {
continue
}
inRange = true
}
if bytes.Compare(rkey, bmin) < 0 {
break
}
} else {
if !inRange {
if usingPivot {
if bytes.Compare(rkey, bmin) <= 0 {
continue
}
}
inRange = true
}
if bytes.Compare(rkey, bmax) >= 0 {
break
}
}
skey := string(rkey)
if !match.Match(skey, spattern) {
continue
}
keys = append(keys, bcopy(rkey[1:]))
if withvalues {
values = append(values, bcopy(iter.Value()))
}
}
iter.Release()
err := iter.Error()
if err != nil {
return nil, err
}
if withvalues {
conn.WriteArray(len(keys) * 2)
} else {
conn.WriteArray(len(keys))
}
for i := 0; i < len(keys); i++ {
conn.WriteBulk(keys[i])
if withvalues {
conn.WriteBulk(values[i])
}
}
return nil, nil
},
)
}
func (kvm *Machine) cmdFlushdb(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 1 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
if err := kvm.db.Close(); err != nil {
panic(err.Error())
}
if err := os.RemoveAll(kvm.dbPath); err != nil {
panic(err.Error())
}
var err error
kvm.db, err = leveldb.OpenFile(kvm.dbPath, kvm.opts)
if err != nil {
panic(err.Error())
}
return nil, nil
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}
func makeKey(prefix byte, b []byte) []byte {
key := make([]byte, 1+len(b))
key[0] = prefix
copy(key[1:], b)
return key
}
func bcopy(b []byte) []byte {
r := make([]byte, len(b))
copy(r, b)
return r
}

@ -1,30 +0,0 @@
package roam
import (
"os"
"github.com/tidwall/redlog"
"github.com/tile38/roam/finn"
)
var log = redlog.New(os.Stderr)
func ListenAndServe(addr, join, dir string) error {
var opts finn.Options
opts.Backend = finn.FastLog
opts.Consistency = finn.High
opts.Durability = finn.High
m, err := NewMachine(dir, addr)
if err != nil {
return err
}
n, err := finn.Open(dir, addr, join, m, &opts)
if err != nil {
return err
}
defer n.Close()
select {
// blocking
}
}