Purge api added to remove expired keys (#204)

* purge api added

* merged with master, import order fix

* purge api renamed to RunGC

Co-authored-by: yash <yash.chandra@grabpay.com>
This commit is contained in:
Yash Suresh Chandra 2021-06-02 02:17:30 +05:30 committed by GitHub
parent 1009661b52
commit e7c6490762
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 317 additions and 42 deletions

View File

@ -26,7 +26,8 @@ import (
)
const (
lockfile = "lock"
lockfile = "lock"
ttlIndexFile = "ttl_index"
)
var (
@ -71,15 +72,17 @@ type Bitcask struct {
*flock.Flock
config *config.Config
options []Option
path string
curr data.Datafile
datafiles map[int]data.Datafile
trie art.Tree
indexer index.Indexer
metadata *metadata.MetaData
isMerging bool
config *config.Config
options []Option
path string
curr data.Datafile
datafiles map[int]data.Datafile
trie art.Tree
indexer index.Indexer
ttlIndexer index.Indexer
ttlIndex art.Tree
metadata *metadata.MetaData
isMerging bool
}
// Stats is a struct returned by Stats() on an open Bitcask instance
@ -118,7 +121,7 @@ func (b *Bitcask) Close() error {
}
func (b *Bitcask) close() error {
if err := b.saveIndex(); err != nil {
if err := b.saveIndexes(); err != nil {
return err
}
@ -162,8 +165,11 @@ func (b *Bitcask) Get(key []byte) ([]byte, error) {
// Has returns true if the key exists in the database, false otherwise.
func (b *Bitcask) Has(key []byte) bool {
b.mu.RLock()
defer b.mu.RUnlock()
_, found := b.trie.Search(key)
b.mu.RUnlock()
if found {
return !b.isExpired(key)
}
return found
}
@ -207,6 +213,9 @@ func (b *Bitcask) Put(key, value []byte, options ...PutOptions) error {
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
b.trie.Insert(key, item)
if feature.Expiry != nil {
b.ttlIndex.Insert(key, *feature.Expiry)
}
return nil
}
@ -229,6 +238,7 @@ func (b *Bitcask) delete(key []byte) error {
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(key))
}
b.trie.Delete(key)
b.ttlIndex.Delete(key)
return nil
}
@ -248,6 +258,7 @@ func (b *Bitcask) DeleteAll() (err error) {
return true
})
b.trie = art.New()
b.ttlIndex = art.New()
return
}
@ -286,6 +297,9 @@ func (b *Bitcask) Keys() chan []byte {
for it := b.trie.Iterator(); it.HasNext(); {
node, _ := it.Next()
if b.isExpired(node.Key()) {
continue
}
ch <- node.Key()
}
close(ch)
@ -294,6 +308,28 @@ func (b *Bitcask) Keys() chan []byte {
return ch
}
// RunGC deletes all expired keys
func (b *Bitcask) RunGC() error {
b.mu.Lock()
defer b.mu.Unlock()
return b.runGC()
}
// runGC deletes all keys that are expired
// caller function should take care of the locking when calling this method
func (b *Bitcask) runGC() (err error) {
b.ttlIndex.ForEach(func(node art.Node) (cont bool) {
if !b.isExpired(node.Key()) {
return true
}
if err = b.delete(node.Key()); err != nil {
return false
}
return true
})
return
}
// Fold iterates over all keys in the database calling the function `f` for
// each key. If the function returns an error, no further keys are processed
// and the error returned.
@ -311,8 +347,7 @@ func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
return
}
// get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
// get retrieves the value of the given key
func (b *Bitcask) get(key []byte) (internal.Entry, error) {
var df data.Datafile
@ -320,6 +355,10 @@ func (b *Bitcask) get(key []byte) (internal.Entry, error) {
if !found {
return internal.Entry{}, ErrKeyNotFound
}
if expired := b.isExpired(key); expired {
_ = b.delete(key) // we don't care if it doesnt succeed
return internal.Entry{}, ErrKeyExpired
}
item := value.(internal.Item)
@ -334,11 +373,6 @@ func (b *Bitcask) get(key []byte) (internal.Entry, error) {
return internal.Entry{}, err
}
if e.Expiry != nil && e.Expiry.Before(time.Now().UTC()) {
_ = b.delete(key) // we don't care if it doesnt succeed
return internal.Entry{}, ErrKeyExpired
}
checksum := crc32.ChecksumIEEE(e.Value)
if checksum != e.Checksum {
return internal.Entry{}, ErrChecksumFailed
@ -371,7 +405,7 @@ func (b *Bitcask) put(key, value []byte, feature Feature) (int64, int64, error)
return -1, 0, err
}
b.curr = curr
err = b.saveIndex()
err = b.saveIndexes()
if err != nil {
return -1, 0, err
}
@ -422,7 +456,7 @@ func (b *Bitcask) reopen() error {
if err != nil {
return err
}
t, err := loadIndex(b.path, b.indexer, b.config.MaxKeySize, datafiles, lastID, b.metadata.IndexUpToDate)
t, ttlIndex, err := loadIndexes(b, datafiles, lastID)
if err != nil {
return err
}
@ -434,6 +468,7 @@ func (b *Bitcask) reopen() error {
b.trie = t
b.curr = curr
b.ttlIndex = ttlIndex
b.datafiles = datafiles
return nil
@ -605,12 +640,13 @@ func Open(path string, options ...Option) (*Bitcask, error) {
}
bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, lockfile)),
config: cfg,
options: options,
path: path,
indexer: index.NewIndexer(),
metadata: meta,
Flock: flock.New(filepath.Join(path, lockfile)),
config: cfg,
options: options,
path: path,
indexer: index.NewIndexer(),
ttlIndexer: index.NewTTLIndexer(),
metadata: meta,
}
locked, err := bitcask.Flock.TryLock()
@ -667,13 +703,19 @@ func (b *Bitcask) Backup(path string) error {
return internal.Copy(b.path, path, []string{lockfile})
}
// saveIndex saves index currently in RAM to disk
func (b *Bitcask) saveIndex() error {
// saveIndex saves index and ttl_index currently in RAM to disk
func (b *Bitcask) saveIndexes() error {
tempIdx := "temp_index"
if err := b.indexer.Save(b.trie, filepath.Join(b.path, tempIdx)); err != nil {
return err
}
return os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index"))
if err := os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index")); err != nil {
return err
}
if err := b.ttlIndexer.Save(b.ttlIndex, filepath.Join(b.path, tempIdx)); err != nil {
return err
}
return os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, ttlIndexFile))
}
// saveMetadata saves metadata into disk
@ -686,6 +728,16 @@ func (b *Bitcask) Reclaimable() int64 {
return b.metadata.ReclaimableSpace
}
// isExpired returns true if a key has expired
// it returns false if key does not exist in ttl index
func (b *Bitcask) isExpired(key []byte) bool {
expiry, found := b.ttlIndex.Search(key)
if !found {
return false
}
return expiry.(time.Time).Before(time.Now().UTC())
}
func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileModeBeforeUmask os.FileMode) (datafiles map[int]data.Datafile, lastID int, err error) {
fns, err := internal.GetDatafiles(path)
if err != nil {
@ -724,30 +776,37 @@ func getSortedDatafiles(datafiles map[int]data.Datafile) []data.Datafile {
return out
}
func loadIndex(path string, indexer index.Indexer, maxKeySize uint32, datafiles map[int]data.Datafile, lastID int, indexUpToDate bool) (art.Tree, error) {
t, found, err := indexer.Load(filepath.Join(path, "index"), maxKeySize)
// loadIndexes loads index from disk to memory. If index is not available or partially available (last bitcask process crashed)
// then it iterates over last datafile and construct index
// we construct ttl_index here also along with normal index
func loadIndexes(b *Bitcask, datafiles map[int]data.Datafile, lastID int) (art.Tree, art.Tree, error) {
t, found, err := b.indexer.Load(filepath.Join(b.path, "index"), b.config.MaxKeySize)
if err != nil {
return nil, err
return nil, nil, err
}
if found && indexUpToDate {
return t, nil
ttlIndex, _, err := b.ttlIndexer.Load(filepath.Join(b.path, ttlIndexFile), b.config.MaxKeySize)
if err != nil {
return nil, nil, err
}
if found && b.metadata.IndexUpToDate {
return t, ttlIndex, nil
}
if found {
if err := loadIndexFromDatafile(t, datafiles[lastID]); err != nil {
return nil, err
if err := loadIndexFromDatafile(t, ttlIndex, datafiles[lastID]); err != nil {
return nil, ttlIndex, err
}
return t, nil
return t, ttlIndex, nil
}
sortedDatafiles := getSortedDatafiles(datafiles)
for _, df := range sortedDatafiles {
if err := loadIndexFromDatafile(t, df); err != nil {
return nil, err
if err := loadIndexFromDatafile(t, ttlIndex, df); err != nil {
return nil, ttlIndex, err
}
}
return t, nil
return t, ttlIndex, nil
}
func loadIndexFromDatafile(t art.Tree, df data.Datafile) error {
func loadIndexFromDatafile(t art.Tree, ttlIndex art.Tree, df data.Datafile) error {
var offset int64
for {
e, n, err := df.Read()
@ -765,6 +824,9 @@ func loadIndexFromDatafile(t art.Tree, df data.Datafile) error {
}
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
t.Insert(e.Key, item)
if e.Expiry != nil {
ttlIndex.Insert(e.Key, *e.Expiry)
}
offset += n
}
return nil

View File

@ -101,6 +101,24 @@ func TestAll(t *testing.T) {
assert.True(db.Has([]byte("foo")))
})
t.Run("HasWithExpired", func(t *testing.T) {
err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now()))
assert.NoError(err)
time.Sleep(time.Millisecond)
assert.False(db.Has([]byte("bar")))
})
t.Run("RunGC", func(t *testing.T) {
err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now()))
assert.NoError(err)
time.Sleep(time.Millisecond)
err = db.RunGC()
assert.NoError(err)
_, err := db.Get([]byte("bar"))
assert.Error(err)
assert.Equal(ErrKeyNotFound, err)
})
t.Run("Keys", func(t *testing.T) {
keys := make([][]byte, 0)
for key := range db.Keys() {
@ -214,6 +232,11 @@ func TestReopen(t *testing.T) {
assert.NoError(err)
})
t.Run("PutWithExpiry", func(t *testing.T) {
err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now()))
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
val, err := db.Get([]byte("foo"))
assert.NoError(err)
@ -242,6 +265,13 @@ func TestReopen(t *testing.T) {
assert.Equal([]byte("foo"), val)
})
t.Run("GetExpiredKeyAfterReopen", func(t *testing.T) {
val, err := db.Get([]byte("bar"))
assert.Error(err)
assert.Equal(ErrKeyExpired, err)
assert.Nil(val)
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
@ -484,6 +514,42 @@ func TestAutoRecovery(t *testing.T) {
}
}
func TestLoadIndexes(t *testing.T) {
assert := assert.New(t)
testdir, err1 := ioutil.TempDir("", "bitcask")
assert.NoError(err1)
defer os.RemoveAll(testdir)
var db *Bitcask
var err error
t.Run("Setup", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
for i:=0; i<5; i++ {
key := fmt.Sprintf("key%d", i)
val := fmt.Sprintf("val%d", i)
err := db.Put([]byte(key), []byte(val))
assert.NoError(err)
}
for i:=0; i<5; i++ {
key := fmt.Sprintf("foo%d", i)
val := fmt.Sprintf("bar%d", i)
err := db.Put([]byte(key), []byte(val), WithExpiry(time.Now().Add(time.Duration(i)*time.Second)))
assert.NoError(err)
}
err = db.Close()
assert.NoError(err)
})
t.Run("OpenAgain", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
assert.Equal(10, db.trie.Size())
assert.Equal(5, db.ttlIndex.Size())
})
}
func TestReIndex(t *testing.T) {
assert := assert.New(t)
@ -506,6 +572,16 @@ func TestReIndex(t *testing.T) {
assert.NoError(err)
})
t.Run("PutWithExpiry", func(t *testing.T) {
err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now()))
assert.NoError(err)
})
t.Run("PutWithLargeExpiry", func(t *testing.T) {
err = db.Put([]byte("bar1"), []byte("baz1"), WithExpiry(time.Now().Add(time.Hour)))
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
val, err := db.Get([]byte("foo"))
assert.NoError(err)
@ -525,6 +601,8 @@ func TestReIndex(t *testing.T) {
t.Run("DeleteIndex", func(t *testing.T) {
err := os.Remove(filepath.Join(testdir, "index"))
assert.NoError(err)
err = os.Remove(filepath.Join(testdir, ttlIndexFile))
assert.NoError(err)
})
})
@ -545,6 +623,16 @@ func TestReIndex(t *testing.T) {
assert.Equal([]byte("bar"), val)
})
t.Run("GetKeyWithExpiry", func(t *testing.T) {
val, err := db.Get([]byte("bar"))
assert.Error(err)
assert.Equal(ErrKeyExpired, err)
assert.Nil(val)
val, err = db.Get([]byte("bar1"))
assert.NoError(err)
assert.Equal([]byte("baz1"), val)
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)

View File

@ -0,0 +1,71 @@
package index
import (
"encoding/binary"
"io"
"os"
"time"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
type ttlIndexer struct{}
func NewTTLIndexer() Indexer {
return ttlIndexer{}
}
func (i ttlIndexer) Save(t art.Tree, path string) error {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
buf := make([]byte, int64Size)
for it := t.Iterator(); it.HasNext(); {
node, err := it.Next()
if err != nil {
return err
}
// save key
err = writeBytes(node.Key(), f)
if err != nil {
return err
}
// save key ttl
binary.BigEndian.PutUint64(buf, uint64(node.Value().(time.Time).Unix()))
_, err = f.Write(buf)
if err != nil {
return err
}
}
return f.Sync()
}
func (i ttlIndexer) Load(path string, maxKeySize uint32) (art.Tree, bool, error) {
t := art.New()
if !internal.Exists(path) {
return t, false, nil
}
f, err := os.Open(path)
if err != nil {
return t, true, err
}
buf := make([]byte, int64Size)
for {
key, err := readKeyBytes(f, maxKeySize)
if err != nil {
if err == io.EOF {
break
}
return t, true, err
}
_, err = io.ReadFull(f, buf)
if err != nil {
return t, true, err
}
expiry := time.Unix(int64(binary.BigEndian.Uint64(buf)), 0).UTC()
t.Insert(key, expiry)
}
return t, true, nil
}

View File

@ -0,0 +1,54 @@
package index
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
art "github.com/plar/go-adaptive-radix-tree"
assert2 "github.com/stretchr/testify/assert"
)
func Test_TTLIndexer(t *testing.T) {
assert := assert2.New(t)
tempDir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
defer os.RemoveAll(tempDir)
currTime := time.Date(2020, 12, 27, 0, 0, 0, 0, time.UTC)
trie := art.New()
t.Run("LoadEmpty", func(t *testing.T) {
newTrie, found, err := NewTTLIndexer().Load(filepath.Join(tempDir, "ttl_index"), 4)
assert.NoError(err)
assert.False(found)
assert.Equal(trie, newTrie)
})
t.Run("Save", func(t *testing.T) {
trie.Insert([]byte("key"), currTime)
err := NewTTLIndexer().Save(trie, filepath.Join(tempDir, "ttl_index"))
assert.NoError(err)
trie.Insert([]byte("foo"), currTime.Add(24*time.Hour))
err = NewTTLIndexer().Save(trie, filepath.Join(tempDir, "ttl_index"))
assert.NoError(err)
trie.Insert([]byte("key"), currTime.Add(-24*time.Hour))
err = NewTTLIndexer().Save(trie, filepath.Join(tempDir, "ttl_index"))
assert.NoError(err)
})
t.Run("Load", func(t *testing.T) {
newTrie, found, err := NewTTLIndexer().Load(filepath.Join(tempDir, "ttl_index"), 4)
assert.NoError(err)
assert.True(found)
assert.Equal(2, newTrie.Size())
value, found := newTrie.Search([]byte("key"))
assert.True(found)
assert.Equal(currTime.Add(-24*time.Hour), value)
value, found = newTrie.Search([]byte("foo"))
assert.True(found)
assert.Equal(currTime.Add(24*time.Hour), value)
})
}