From e7c649076233ff0a96c0134a5a9183b54e61b79f Mon Sep 17 00:00:00 2001 From: Yash Suresh Chandra Date: Wed, 2 Jun 2021 02:17:30 +0530 Subject: [PATCH] Purge api added to remove expired keys (#204) * purge api added * merged with master, import order fix * purge api renamed to RunGC Co-authored-by: yash --- bitcask.go | 146 ++++++++++++++++++++++--------- bitcask_test.go | 88 +++++++++++++++++++ internal/index/ttl_index.go | 71 +++++++++++++++ internal/index/ttl_index_test.go | 54 ++++++++++++ 4 files changed, 317 insertions(+), 42 deletions(-) create mode 100644 internal/index/ttl_index.go create mode 100644 internal/index/ttl_index_test.go diff --git a/bitcask.go b/bitcask.go index 5780236..8b46df0 100644 --- a/bitcask.go +++ b/bitcask.go @@ -26,7 +26,8 @@ import ( ) const ( - lockfile = "lock" + lockfile = "lock" + ttlIndexFile = "ttl_index" ) var ( @@ -71,15 +72,17 @@ type Bitcask struct { *flock.Flock - config *config.Config - options []Option - path string - curr data.Datafile - datafiles map[int]data.Datafile - trie art.Tree - indexer index.Indexer - metadata *metadata.MetaData - isMerging bool + config *config.Config + options []Option + path string + curr data.Datafile + datafiles map[int]data.Datafile + trie art.Tree + indexer index.Indexer + ttlIndexer index.Indexer + ttlIndex art.Tree + metadata *metadata.MetaData + isMerging bool } // Stats is a struct returned by Stats() on an open Bitcask instance @@ -118,7 +121,7 @@ func (b *Bitcask) Close() error { } func (b *Bitcask) close() error { - if err := b.saveIndex(); err != nil { + if err := b.saveIndexes(); err != nil { return err } @@ -162,8 +165,11 @@ func (b *Bitcask) Get(key []byte) ([]byte, error) { // Has returns true if the key exists in the database, false otherwise. func (b *Bitcask) Has(key []byte) bool { b.mu.RLock() + defer b.mu.RUnlock() _, found := b.trie.Search(key) - b.mu.RUnlock() + if found { + return !b.isExpired(key) + } return found } @@ -207,6 +213,9 @@ func (b *Bitcask) Put(key, value []byte, options ...PutOptions) error { item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n} b.trie.Insert(key, item) + if feature.Expiry != nil { + b.ttlIndex.Insert(key, *feature.Expiry) + } return nil } @@ -229,6 +238,7 @@ func (b *Bitcask) delete(key []byte) error { b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(key)) } b.trie.Delete(key) + b.ttlIndex.Delete(key) return nil } @@ -248,6 +258,7 @@ func (b *Bitcask) DeleteAll() (err error) { return true }) b.trie = art.New() + b.ttlIndex = art.New() return } @@ -286,6 +297,9 @@ func (b *Bitcask) Keys() chan []byte { for it := b.trie.Iterator(); it.HasNext(); { node, _ := it.Next() + if b.isExpired(node.Key()) { + continue + } ch <- node.Key() } close(ch) @@ -294,6 +308,28 @@ func (b *Bitcask) Keys() chan []byte { return ch } +// RunGC deletes all expired keys +func (b *Bitcask) RunGC() error { + b.mu.Lock() + defer b.mu.Unlock() + return b.runGC() +} + +// runGC deletes all keys that are expired +// caller function should take care of the locking when calling this method +func (b *Bitcask) runGC() (err error) { + b.ttlIndex.ForEach(func(node art.Node) (cont bool) { + if !b.isExpired(node.Key()) { + return true + } + if err = b.delete(node.Key()); err != nil { + return false + } + return true + }) + return +} + // Fold iterates over all keys in the database calling the function `f` for // each key. If the function returns an error, no further keys are processed // and the error returned. @@ -311,8 +347,7 @@ func (b *Bitcask) Fold(f func(key []byte) error) (err error) { return } -// get retrieves the value of the given key. If the key is not found or an/I/O -// error occurs a null byte slice is returned along with the error. +// get retrieves the value of the given key func (b *Bitcask) get(key []byte) (internal.Entry, error) { var df data.Datafile @@ -320,6 +355,10 @@ func (b *Bitcask) get(key []byte) (internal.Entry, error) { if !found { return internal.Entry{}, ErrKeyNotFound } + if expired := b.isExpired(key); expired { + _ = b.delete(key) // we don't care if it doesnt succeed + return internal.Entry{}, ErrKeyExpired + } item := value.(internal.Item) @@ -334,11 +373,6 @@ func (b *Bitcask) get(key []byte) (internal.Entry, error) { return internal.Entry{}, err } - if e.Expiry != nil && e.Expiry.Before(time.Now().UTC()) { - _ = b.delete(key) // we don't care if it doesnt succeed - return internal.Entry{}, ErrKeyExpired - } - checksum := crc32.ChecksumIEEE(e.Value) if checksum != e.Checksum { return internal.Entry{}, ErrChecksumFailed @@ -371,7 +405,7 @@ func (b *Bitcask) put(key, value []byte, feature Feature) (int64, int64, error) return -1, 0, err } b.curr = curr - err = b.saveIndex() + err = b.saveIndexes() if err != nil { return -1, 0, err } @@ -422,7 +456,7 @@ func (b *Bitcask) reopen() error { if err != nil { return err } - t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles, lastID, b.metadata.IndexUpToDate) + t, ttlIndex, err := loadIndexes(b, datafiles, lastID) if err != nil { return err } @@ -434,6 +468,7 @@ func (b *Bitcask) reopen() error { b.trie = t b.curr = curr + b.ttlIndex = ttlIndex b.datafiles = datafiles return nil @@ -605,12 +640,13 @@ func Open(path string, options ...Option) (*Bitcask, error) { } bitcask := &Bitcask{ - Flock: flock.New(filepath.Join(path, lockfile)), - config: cfg, - options: options, - path: path, - indexer: index.NewIndexer(), - metadata: meta, + Flock: flock.New(filepath.Join(path, lockfile)), + config: cfg, + options: options, + path: path, + indexer: index.NewIndexer(), + ttlIndexer: index.NewTTLIndexer(), + metadata: meta, } locked, err := bitcask.Flock.TryLock() @@ -667,13 +703,19 @@ func (b *Bitcask) Backup(path string) error { return internal.Copy(b.path, path, []string{lockfile}) } -// saveIndex saves index currently in RAM to disk -func (b *Bitcask) saveIndex() error { +// saveIndex saves index and ttl_index currently in RAM to disk +func (b *Bitcask) saveIndexes() error { tempIdx := "temp_index" if err := b.indexer.Save(b.trie, filepath.Join(b.path, tempIdx)); err != nil { return err } - return os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index")) + if err := os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index")); err != nil { + return err + } + if err := b.ttlIndexer.Save(b.ttlIndex, filepath.Join(b.path, tempIdx)); err != nil { + return err + } + return os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, ttlIndexFile)) } // saveMetadata saves metadata into disk @@ -686,6 +728,16 @@ func (b *Bitcask) Reclaimable() int64 { return b.metadata.ReclaimableSpace } +// isExpired returns true if a key has expired +// it returns false if key does not exist in ttl index +func (b *Bitcask) isExpired(key []byte) bool { + expiry, found := b.ttlIndex.Search(key) + if !found { + return false + } + return expiry.(time.Time).Before(time.Now().UTC()) +} + func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileModeBeforeUmask os.FileMode) (datafiles map[int]data.Datafile, lastID int, err error) { fns, err := internal.GetDatafiles(path) if err != nil { @@ -724,30 +776,37 @@ func getSortedDatafiles(datafiles map[int]data.Datafile) []data.Datafile { return out } -func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile, lastID int, indexUpToDate bool) (art.Tree, error) { - t, found, err := indexer.Load(filepath.Join(path, "index"), maxKeySize) +// loadIndexes loads index from disk to memory. If index is not available or partially available (last bitcask process crashed) +// then it iterates over last datafile and construct index +// we construct ttl_index here also along with normal index +func loadIndexes(b *Bitcask, datafiles map[int]data.Datafile, lastID int) (art.Tree, art.Tree, error) { + t, found, err := b.indexer.Load(filepath.Join(b.path, "index"), b.config.MaxKeySize) if err != nil { - return nil, err + return nil, nil, err } - if found && indexUpToDate { - return t, nil + ttlIndex, _, err := b.ttlIndexer.Load(filepath.Join(b.path, ttlIndexFile), b.config.MaxKeySize) + if err != nil { + return nil, nil, err + } + if found && b.metadata.IndexUpToDate { + return t, ttlIndex, nil } if found { - if err := loadIndexFromDatafile(t, datafiles[lastID]); err != nil { - return nil, err + if err := loadIndexFromDatafile(t, ttlIndex, datafiles[lastID]); err != nil { + return nil, ttlIndex, err } - return t, nil + return t, ttlIndex, nil } sortedDatafiles := getSortedDatafiles(datafiles) for _, df := range sortedDatafiles { - if err := loadIndexFromDatafile(t, df); err != nil { - return nil, err + if err := loadIndexFromDatafile(t, ttlIndex, df); err != nil { + return nil, ttlIndex, err } } - return t, nil + return t, ttlIndex, nil } -func loadIndexFromDatafile(t art.Tree, df data.Datafile) error { +func loadIndexFromDatafile(t art.Tree, ttlIndex art.Tree, df data.Datafile) error { var offset int64 for { e, n, err := df.Read() @@ -765,6 +824,9 @@ func loadIndexFromDatafile(t art.Tree, df data.Datafile) error { } item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n} t.Insert(e.Key, item) + if e.Expiry != nil { + ttlIndex.Insert(e.Key, *e.Expiry) + } offset += n } return nil diff --git a/bitcask_test.go b/bitcask_test.go index a1b42c7..631454d 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -101,6 +101,24 @@ func TestAll(t *testing.T) { assert.True(db.Has([]byte("foo"))) }) + t.Run("HasWithExpired", func(t *testing.T) { + err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now())) + assert.NoError(err) + time.Sleep(time.Millisecond) + assert.False(db.Has([]byte("bar"))) + }) + + t.Run("RunGC", func(t *testing.T) { + err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now())) + assert.NoError(err) + time.Sleep(time.Millisecond) + err = db.RunGC() + assert.NoError(err) + _, err := db.Get([]byte("bar")) + assert.Error(err) + assert.Equal(ErrKeyNotFound, err) + }) + t.Run("Keys", func(t *testing.T) { keys := make([][]byte, 0) for key := range db.Keys() { @@ -214,6 +232,11 @@ func TestReopen(t *testing.T) { assert.NoError(err) }) + t.Run("PutWithExpiry", func(t *testing.T) { + err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now())) + assert.NoError(err) + }) + t.Run("Get", func(t *testing.T) { val, err := db.Get([]byte("foo")) assert.NoError(err) @@ -242,6 +265,13 @@ func TestReopen(t *testing.T) { assert.Equal([]byte("foo"), val) }) + t.Run("GetExpiredKeyAfterReopen", func(t *testing.T) { + val, err := db.Get([]byte("bar")) + assert.Error(err) + assert.Equal(ErrKeyExpired, err) + assert.Nil(val) + }) + t.Run("Close", func(t *testing.T) { err = db.Close() assert.NoError(err) @@ -484,6 +514,42 @@ func TestAutoRecovery(t *testing.T) { } } +func TestLoadIndexes(t *testing.T) { + assert := assert.New(t) + testdir, err1 := ioutil.TempDir("", "bitcask") + assert.NoError(err1) + defer os.RemoveAll(testdir) + + var db *Bitcask + var err error + + t.Run("Setup", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + for i:=0; i<5; i++ { + key := fmt.Sprintf("key%d", i) + val := fmt.Sprintf("val%d", i) + err := db.Put([]byte(key), []byte(val)) + assert.NoError(err) + } + for i:=0; i<5; i++ { + key := fmt.Sprintf("foo%d", i) + val := fmt.Sprintf("bar%d", i) + err := db.Put([]byte(key), []byte(val), WithExpiry(time.Now().Add(time.Duration(i)*time.Second))) + assert.NoError(err) + } + err = db.Close() + assert.NoError(err) + }) + + t.Run("OpenAgain", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + assert.Equal(10, db.trie.Size()) + assert.Equal(5, db.ttlIndex.Size()) + }) +} + func TestReIndex(t *testing.T) { assert := assert.New(t) @@ -506,6 +572,16 @@ func TestReIndex(t *testing.T) { assert.NoError(err) }) + t.Run("PutWithExpiry", func(t *testing.T) { + err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now())) + assert.NoError(err) + }) + + t.Run("PutWithLargeExpiry", func(t *testing.T) { + err = db.Put([]byte("bar1"), []byte("baz1"), WithExpiry(time.Now().Add(time.Hour))) + assert.NoError(err) + }) + t.Run("Get", func(t *testing.T) { val, err := db.Get([]byte("foo")) assert.NoError(err) @@ -525,6 +601,8 @@ func TestReIndex(t *testing.T) { t.Run("DeleteIndex", func(t *testing.T) { err := os.Remove(filepath.Join(testdir, "index")) assert.NoError(err) + err = os.Remove(filepath.Join(testdir, ttlIndexFile)) + assert.NoError(err) }) }) @@ -545,6 +623,16 @@ func TestReIndex(t *testing.T) { assert.Equal([]byte("bar"), val) }) + t.Run("GetKeyWithExpiry", func(t *testing.T) { + val, err := db.Get([]byte("bar")) + assert.Error(err) + assert.Equal(ErrKeyExpired, err) + assert.Nil(val) + val, err = db.Get([]byte("bar1")) + assert.NoError(err) + assert.Equal([]byte("baz1"), val) + }) + t.Run("Close", func(t *testing.T) { err = db.Close() assert.NoError(err) diff --git a/internal/index/ttl_index.go b/internal/index/ttl_index.go new file mode 100644 index 0000000..bb134af --- /dev/null +++ b/internal/index/ttl_index.go @@ -0,0 +1,71 @@ +package index + +import ( + "encoding/binary" + "io" + "os" + "time" + + art "github.com/plar/go-adaptive-radix-tree" + "github.com/prologic/bitcask/internal" +) + +type ttlIndexer struct{} + +func NewTTLIndexer() Indexer { + return ttlIndexer{} +} + +func (i ttlIndexer) Save(t art.Tree, path string) error { + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return err + } + buf := make([]byte, int64Size) + for it := t.Iterator(); it.HasNext(); { + node, err := it.Next() + if err != nil { + return err + } + // save key + err = writeBytes(node.Key(), f) + if err != nil { + return err + } + // save key ttl + binary.BigEndian.PutUint64(buf, uint64(node.Value().(time.Time).Unix())) + _, err = f.Write(buf) + if err != nil { + return err + } + } + return f.Sync() +} + +func (i ttlIndexer) Load(path string, maxKeySize uint32) (art.Tree, bool, error) { + t := art.New() + if !internal.Exists(path) { + return t, false, nil + } + f, err := os.Open(path) + if err != nil { + return t, true, err + } + buf := make([]byte, int64Size) + for { + key, err := readKeyBytes(f, maxKeySize) + if err != nil { + if err == io.EOF { + break + } + return t, true, err + } + _, err = io.ReadFull(f, buf) + if err != nil { + return t, true, err + } + expiry := time.Unix(int64(binary.BigEndian.Uint64(buf)), 0).UTC() + t.Insert(key, expiry) + } + return t, true, nil +} diff --git a/internal/index/ttl_index_test.go b/internal/index/ttl_index_test.go new file mode 100644 index 0000000..c456f0b --- /dev/null +++ b/internal/index/ttl_index_test.go @@ -0,0 +1,54 @@ +package index + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + art "github.com/plar/go-adaptive-radix-tree" + assert2 "github.com/stretchr/testify/assert" +) + +func Test_TTLIndexer(t *testing.T) { + assert := assert2.New(t) + tempDir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + defer os.RemoveAll(tempDir) + + currTime := time.Date(2020, 12, 27, 0, 0, 0, 0, time.UTC) + trie := art.New() + + t.Run("LoadEmpty", func(t *testing.T) { + newTrie, found, err := NewTTLIndexer().Load(filepath.Join(tempDir, "ttl_index"), 4) + assert.NoError(err) + assert.False(found) + assert.Equal(trie, newTrie) + }) + + t.Run("Save", func(t *testing.T) { + trie.Insert([]byte("key"), currTime) + err := NewTTLIndexer().Save(trie, filepath.Join(tempDir, "ttl_index")) + assert.NoError(err) + trie.Insert([]byte("foo"), currTime.Add(24*time.Hour)) + err = NewTTLIndexer().Save(trie, filepath.Join(tempDir, "ttl_index")) + assert.NoError(err) + trie.Insert([]byte("key"), currTime.Add(-24*time.Hour)) + err = NewTTLIndexer().Save(trie, filepath.Join(tempDir, "ttl_index")) + assert.NoError(err) + }) + + t.Run("Load", func(t *testing.T) { + newTrie, found, err := NewTTLIndexer().Load(filepath.Join(tempDir, "ttl_index"), 4) + assert.NoError(err) + assert.True(found) + assert.Equal(2, newTrie.Size()) + value, found := newTrie.Search([]byte("key")) + assert.True(found) + assert.Equal(currTime.Add(-24*time.Hour), value) + value, found = newTrie.Search([]byte("foo")) + assert.True(found) + assert.Equal(currTime.Add(24*time.Hour), value) + }) +}