[ADD] Sift and ScanSift (+ tests) (#232)

Added Sift and ScanSift functions for review without tests (for now)

fix docstrings

Added tests for Sift and ScanSift

Note this also fixes a bug in the Scan() function where the RMutex is not locked, allowing a potential race condition

closes #231

Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/232
Co-authored-by: Tai Groot <tai@taigrr.com>
Co-committed-by: Tai Groot <tai@taigrr.com>
This commit is contained in:
Tai Groot 2021-07-21 00:19:25 +00:00 committed by James Mills
parent b094cd33d3
commit ef187f8315
2 changed files with 221 additions and 2 deletions

@ -274,6 +274,39 @@ func (b *Bitcask) delete(key []byte) error {
return nil
}
// Sift iterates over all keys in the database calling the function `f` for
// each key. If the KV pair is expired or the function returns true, that key is
// deleted from the database.
// If the function returns an error on any key, no further keys are processed, no
// keys are deleted, and the first error is returned.
func (b *Bitcask) Sift(f func(key []byte) (bool, error)) (err error) {
keysToDelete := art.New()
b.mu.RLock()
b.trie.ForEach(func(node art.Node) bool {
if b.isExpired(node.Key()) {
keysToDelete.Insert(node.Key(), true)
return true
}
var shouldDelete bool
if shouldDelete, err = f(node.Key()); err != nil {
return false
} else if shouldDelete {
keysToDelete.Insert(node.Key(), true)
}
return true
})
b.mu.RUnlock()
b.mu.Lock()
defer b.mu.Unlock()
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return
}
// DeleteAll deletes all the keys. If an I/O error occurs the error is returned.
func (b *Bitcask) DeleteAll() (err error) {
b.mu.RLock()
@ -296,8 +329,11 @@ func (b *Bitcask) DeleteAll() (err error) {
// Scan performs a prefix scan of keys matching the given prefix and calling
// the function `f` with the keys found. If the function returns an error
// no further keys are processed and the first error returned.
// no further keys are processed and the first error is returned.
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
// Skip the root node
if len(node.Key()) == 0 {
@ -312,6 +348,43 @@ func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
return
}
// ScanSift iterates over all keys in the database beginning with the given
// prefix, calling the function `f` for each key. If the KV pair is expired or
// the function returns true, that key is deleted from the database.
// If the function returns an error on any key, no further keys are processed,
// no keys are deleted, and the first error is returned.
func (b *Bitcask) ScanSift(prefix []byte, f func(key []byte) (bool, error)) (err error) {
keysToDelete := art.New()
b.mu.RLock()
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
// Skip the root node
if len(node.Key()) == 0 {
return true
}
if b.isExpired(node.Key()) {
keysToDelete.Insert(node.Key(), true)
return true
}
var shouldDelete bool
if shouldDelete, err = f(node.Key()); err != nil {
return false
} else if shouldDelete {
keysToDelete.Insert(node.Key(), true)
}
return true
})
b.mu.RUnlock()
b.mu.Lock()
defer b.mu.Unlock()
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return
}
// Len returns the total number of keys in the database
func (b *Bitcask) Len() int {
b.mu.RLock()
@ -371,7 +444,7 @@ func (b *Bitcask) runGC() (err error) {
// Fold iterates over all keys in the database calling the function `f` for
// each key. If the function returns an error, no further keys are processed
// and the error returned.
// and the error is returned.
func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
b.mu.RLock()
defer b.mu.RUnlock()

@ -168,6 +168,65 @@ func TestAll(t *testing.T) {
assert.NoError(err)
})
t.Run("Sift", func(t *testing.T) {
err = db.Put([]byte("toBeSifted"), []byte("siftMe"))
assert.NoError(err)
err = db.Put([]byte("notToBeSifted"), []byte("dontSiftMe"))
assert.NoError(err)
err := db.Sift(func(key []byte) (bool, error) {
value, err := db.Get(key)
if err != nil {
return false, err
}
if string(value) == "siftMe" {
return true, nil
}
return false, nil
})
assert.NoError(err)
_, err = db.Get([]byte("toBeSifted"))
assert.Equal(ErrKeyNotFound, err)
_, err = db.Get([]byte("notToBeSifted"))
assert.NoError(err)
})
t.Run("ScanSift", func(t *testing.T) {
err := db.DeleteAll()
assert.NoError(err)
err = db.Put([]byte("toBeSifted"), []byte("siftMe"))
assert.NoError(err)
err = db.Put([]byte("toBeSkipped"), []byte("siftMe"))
assert.NoError(err)
err = db.Put([]byte("toBeSiftedAsWell"), []byte("siftMe"))
assert.NoError(err)
err = db.Put([]byte("toBeSiftedButNotReally"), []byte("dontSiftMe"))
assert.NoError(err)
err = db.ScanSift([]byte("toBeSifted"), func(key []byte) (bool, error) {
value, err := db.Get(key)
if err != nil {
return false, err
}
if string(value) == "siftMe" {
return true, nil
}
return false, nil
})
assert.NoError(err)
_, err = db.Get([]byte("toBeSifted"))
assert.Equal(ErrKeyNotFound, err)
_, err = db.Get([]byte("toBeSiftedAsWell"))
assert.Equal(ErrKeyNotFound, err)
_, err = db.Get([]byte("toBeSkipped"))
assert.NoError(err)
_, err = db.Get([]byte("toBeSiftedButNotReally"))
assert.NoError(err)
})
t.Run("DeleteAll", func(t *testing.T) {
err = db.DeleteAll()
assert.NoError(err)
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
@ -1655,6 +1714,93 @@ func TestConcurrent(t *testing.T) {
})
}
func TestSift(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
var items = map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte([]byte("foo")),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err = db.Put([]byte(k), v)
assert.NoError(err)
}
})
})
t.Run("SiftErrors", func(t *testing.T) {
err = db.Sift(func(key []byte) (bool, error) {
return false, ErrMockError
})
assert.Equal(ErrMockError, err)
err = db.ScanSift([]byte("fo"), func(key []byte) (bool, error) {
return true, ErrMockError
})
assert.Equal(ErrMockError, err)
})
}
func TestScanSift(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
var db *Bitcask
t.Run("Setup", func(t *testing.T) {
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
var items = map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"food": []byte("pizza"),
"foo": []byte([]byte("foo")),
"fooz": []byte("fooz ball"),
"hello": []byte("world"),
}
for k, v := range items {
err = db.Put([]byte(k), v)
assert.NoError(err)
}
})
})
t.Run("ScanSiftErrors", func(t *testing.T) {
err = db.ScanSift([]byte("fo"), func(key []byte) (bool, error) {
return false, ErrMockError
})
assert.Equal(ErrMockError, err)
err = db.ScanSift([]byte("fo"), func(key []byte) (bool, error) {
return true, ErrMockError
})
assert.Equal(ErrMockError, err)
})
}
func TestScan(t *testing.T) {
assert := assert.New(t)