Fix runGC behaviour to correctly delete all expired keys (#229)

Fixes #228

Co-authored-by: James Mills <prologic@shortcircuit.net.au>
Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/229
Co-authored-by: James Mills <james@mills.io>
Co-committed-by: James Mills <james@mills.io>
This commit is contained in:
James Mills 2021-07-20 20:42:22 +00:00
parent 3ff8937205
commit b094cd33d3
2 changed files with 43 additions and 4 deletions

@ -349,16 +349,24 @@ func (b *Bitcask) RunGC() error {
// runGC deletes all keys that are expired
// caller function should take care of the locking when calling this method
func (b *Bitcask) runGC() (err error) {
keysToDelete := art.New()
b.ttlIndex.ForEach(func(node art.Node) (cont bool) {
if !b.isExpired(node.Key()) {
// later, return false here when the ttlIndex is sorted
return true
}
if err = b.delete(node.Key()); err != nil {
return false
}
keysToDelete.Insert(node.Key(), true)
//keysToDelete = append(keysToDelete, node.Key())
return true
})
return
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return nil
}
// Fold iterates over all keys in the database calling the function `f` for

@ -1784,7 +1784,38 @@ func TestGetExpiredInsideFold(t *testing.T) {
return nil
})
assert.Contains(arr, "skipped")
}
func TestRunGCDeletesAllExpired(t *testing.T) {
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
db, err := Open(testdir)
assert.NoError(err)
defer db.Close()
// Add a node to the tree that won't expire
db.Put([]byte("static"), []byte("static"))
// Add a node that expires almost immediately to the tree
db.PutWithTTL([]byte("shortLived"), []byte("shortLived"), 0)
db.PutWithTTL([]byte("longLived"), []byte("longLived"), time.Hour)
db.PutWithTTL([]byte("longLived2"), []byte("longLived2"), time.Hour)
db.PutWithTTL([]byte("shortLived2"), []byte("shortLived2"), 0)
db.PutWithTTL([]byte("shortLived3"), []byte("shortLived3"), 0)
db.Put([]byte("static2"), []byte("static2"))
// Sleep a bit and run the Garbage Collector
time.Sleep(3 * time.Millisecond)
db.RunGC()
_ = db.Fold(func(key []byte) error {
_, err := db.Get(key)
assert.NoError(err)
return nil
})
}
type benchmarkTestCase struct {