From ef187f83153a64f37c46d8d9cec16372fe6492ad Mon Sep 17 00:00:00 2001 From: Tai Groot Date: Wed, 21 Jul 2021 00:19:25 +0000 Subject: [PATCH] [ADD] Sift and ScanSift (+ tests) (#232) Added Sift and ScanSift functions for review without tests (for now) fix docstrings Added tests for Sift and ScanSift Note this also fixes a bug in the Scan() function where the RMutex is not locked, allowing a potential race condition closes #231 Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/232 Co-authored-by: Tai Groot Co-committed-by: Tai Groot --- bitcask.go | 77 ++++++++++++++++++++++++- bitcask_test.go | 146 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 221 insertions(+), 2 deletions(-) diff --git a/bitcask.go b/bitcask.go index f15ed95..2d29359 100644 --- a/bitcask.go +++ b/bitcask.go @@ -274,6 +274,39 @@ func (b *Bitcask) delete(key []byte) error { return nil } +// Sift iterates over all keys in the database calling the function `f` for +// each key. If the KV pair is expired or the function returns true, that key is +// deleted from the database. +// If the function returns an error on any key, no further keys are processed, no +// keys are deleted, and the first error is returned. +func (b *Bitcask) Sift(f func(key []byte) (bool, error)) (err error) { + keysToDelete := art.New() + + b.mu.RLock() + b.trie.ForEach(func(node art.Node) bool { + if b.isExpired(node.Key()) { + keysToDelete.Insert(node.Key(), true) + return true + } + var shouldDelete bool + if shouldDelete, err = f(node.Key()); err != nil { + return false + } else if shouldDelete { + keysToDelete.Insert(node.Key(), true) + } + return true + }) + b.mu.RUnlock() + + b.mu.Lock() + defer b.mu.Unlock() + keysToDelete.ForEach(func(node art.Node) (cont bool) { + b.delete(node.Key()) + return true + }) + return +} + // DeleteAll deletes all the keys. If an I/O error occurs the error is returned. func (b *Bitcask) DeleteAll() (err error) { b.mu.RLock() @@ -296,8 +329,11 @@ func (b *Bitcask) DeleteAll() (err error) { // Scan performs a prefix scan of keys matching the given prefix and calling // the function `f` with the keys found. If the function returns an error -// no further keys are processed and the first error returned. +// no further keys are processed and the first error is returned. func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) { + b.mu.RLock() + defer b.mu.RUnlock() + b.trie.ForEachPrefix(prefix, func(node art.Node) bool { // Skip the root node if len(node.Key()) == 0 { @@ -312,6 +348,43 @@ func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) { return } +// ScanSift iterates over all keys in the database beginning with the given +// prefix, calling the function `f` for each key. If the KV pair is expired or +// the function returns true, that key is deleted from the database. +// If the function returns an error on any key, no further keys are processed, +// no keys are deleted, and the first error is returned. +func (b *Bitcask) ScanSift(prefix []byte, f func(key []byte) (bool, error)) (err error) { + keysToDelete := art.New() + + b.mu.RLock() + b.trie.ForEachPrefix(prefix, func(node art.Node) bool { + // Skip the root node + if len(node.Key()) == 0 { + return true + } + if b.isExpired(node.Key()) { + keysToDelete.Insert(node.Key(), true) + return true + } + var shouldDelete bool + if shouldDelete, err = f(node.Key()); err != nil { + return false + } else if shouldDelete { + keysToDelete.Insert(node.Key(), true) + } + return true + }) + b.mu.RUnlock() + + b.mu.Lock() + defer b.mu.Unlock() + keysToDelete.ForEach(func(node art.Node) (cont bool) { + b.delete(node.Key()) + return true + }) + return +} + // Len returns the total number of keys in the database func (b *Bitcask) Len() int { b.mu.RLock() @@ -371,7 +444,7 @@ func (b *Bitcask) runGC() (err error) { // Fold iterates over all keys in the database calling the function `f` for // each key. If the function returns an error, no further keys are processed -// and the error returned. +// and the error is returned. func (b *Bitcask) Fold(f func(key []byte) error) (err error) { b.mu.RLock() defer b.mu.RUnlock() diff --git a/bitcask_test.go b/bitcask_test.go index 068c788..74bce69 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -168,6 +168,65 @@ func TestAll(t *testing.T) { assert.NoError(err) }) + t.Run("Sift", func(t *testing.T) { + err = db.Put([]byte("toBeSifted"), []byte("siftMe")) + assert.NoError(err) + err = db.Put([]byte("notToBeSifted"), []byte("dontSiftMe")) + assert.NoError(err) + err := db.Sift(func(key []byte) (bool, error) { + value, err := db.Get(key) + if err != nil { + return false, err + } + if string(value) == "siftMe" { + return true, nil + } + return false, nil + }) + assert.NoError(err) + _, err = db.Get([]byte("toBeSifted")) + assert.Equal(ErrKeyNotFound, err) + _, err = db.Get([]byte("notToBeSifted")) + assert.NoError(err) + }) + + t.Run("ScanSift", func(t *testing.T) { + err := db.DeleteAll() + assert.NoError(err) + err = db.Put([]byte("toBeSifted"), []byte("siftMe")) + assert.NoError(err) + err = db.Put([]byte("toBeSkipped"), []byte("siftMe")) + assert.NoError(err) + err = db.Put([]byte("toBeSiftedAsWell"), []byte("siftMe")) + assert.NoError(err) + err = db.Put([]byte("toBeSiftedButNotReally"), []byte("dontSiftMe")) + assert.NoError(err) + err = db.ScanSift([]byte("toBeSifted"), func(key []byte) (bool, error) { + value, err := db.Get(key) + if err != nil { + return false, err + } + if string(value) == "siftMe" { + return true, nil + } + return false, nil + }) + assert.NoError(err) + _, err = db.Get([]byte("toBeSifted")) + assert.Equal(ErrKeyNotFound, err) + _, err = db.Get([]byte("toBeSiftedAsWell")) + assert.Equal(ErrKeyNotFound, err) + _, err = db.Get([]byte("toBeSkipped")) + assert.NoError(err) + _, err = db.Get([]byte("toBeSiftedButNotReally")) + assert.NoError(err) + }) + + t.Run("DeleteAll", func(t *testing.T) { + err = db.DeleteAll() + assert.NoError(err) + }) + t.Run("Close", func(t *testing.T) { err = db.Close() assert.NoError(err) @@ -1655,6 +1714,93 @@ func TestConcurrent(t *testing.T) { }) } +func TestSift(t *testing.T) { + assert := assert.New(t) + + testdir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + + var db *Bitcask + + t.Run("Setup", func(t *testing.T) { + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Put", func(t *testing.T) { + var items = map[string][]byte{ + "1": []byte("1"), + "2": []byte("2"), + "3": []byte("3"), + "food": []byte("pizza"), + "foo": []byte([]byte("foo")), + "fooz": []byte("fooz ball"), + "hello": []byte("world"), + } + for k, v := range items { + err = db.Put([]byte(k), v) + assert.NoError(err) + } + }) + }) + + t.Run("SiftErrors", func(t *testing.T) { + err = db.Sift(func(key []byte) (bool, error) { + return false, ErrMockError + }) + assert.Equal(ErrMockError, err) + + err = db.ScanSift([]byte("fo"), func(key []byte) (bool, error) { + return true, ErrMockError + }) + assert.Equal(ErrMockError, err) + }) +} +func TestScanSift(t *testing.T) { + assert := assert.New(t) + + testdir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + + var db *Bitcask + + t.Run("Setup", func(t *testing.T) { + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Put", func(t *testing.T) { + var items = map[string][]byte{ + "1": []byte("1"), + "2": []byte("2"), + "3": []byte("3"), + "food": []byte("pizza"), + "foo": []byte([]byte("foo")), + "fooz": []byte("fooz ball"), + "hello": []byte("world"), + } + for k, v := range items { + err = db.Put([]byte(k), v) + assert.NoError(err) + } + }) + }) + + t.Run("ScanSiftErrors", func(t *testing.T) { + err = db.ScanSift([]byte("fo"), func(key []byte) (bool, error) { + return false, ErrMockError + }) + assert.Equal(ErrMockError, err) + + err = db.ScanSift([]byte("fo"), func(key []byte) (bool, error) { + return true, ErrMockError + }) + assert.Equal(ErrMockError, err) + }) +} + func TestScan(t *testing.T) { assert := assert.New(t)