Refactor TTL with a new API PutWithTTL() and reduce memory allocs (#220)

This commit is contained in:
James Mills 2021-07-09 17:21:35 +10:00 committed by GitHub
parent 2ee13b8e32
commit b98b684bb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 176 additions and 89 deletions

@ -174,7 +174,7 @@ func (b *Bitcask) Has(key []byte) bool {
}
// Put stores the key and value in the database.
func (b *Bitcask) Put(key, value []byte, options ...PutOptions) error {
func (b *Bitcask) Put(key, value []byte) error {
if len(key) == 0 {
return ErrEmptyKey
}
@ -184,16 +184,10 @@ func (b *Bitcask) Put(key, value []byte, options ...PutOptions) error {
if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize {
return ErrValueTooLarge
}
var feature Feature
for _, opt := range options {
if err := opt(&feature); err != nil {
return err
}
}
b.mu.Lock()
defer b.mu.Unlock()
offset, n, err := b.put(key, value, feature)
offset, n, err := b.put(key, value)
if err != nil {
return err
}
@ -213,9 +207,47 @@ func (b *Bitcask) Put(key, value []byte, options ...PutOptions) error {
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
b.trie.Insert(key, item)
if feature.Expiry != nil {
b.ttlIndex.Insert(key, *feature.Expiry)
return nil
}
// PutWithTTL stores the key and value in the database with the given TTL
func (b *Bitcask) PutWithTTL(key, value []byte, ttl time.Duration) error {
if len(key) == 0 {
return ErrEmptyKey
}
if b.config.MaxKeySize > 0 && uint32(len(key)) > b.config.MaxKeySize {
return ErrKeyTooLarge
}
if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize {
return ErrValueTooLarge
}
expiry := time.Now().Add(ttl)
b.mu.Lock()
defer b.mu.Unlock()
offset, n, err := b.putWithExpiry(key, value, expiry)
if err != nil {
return err
}
if b.config.Sync {
if err := b.curr.Sync(); err != nil {
return err
}
}
// in case of successful `put`, IndexUpToDate will be always be false
b.metadata.IndexUpToDate = false
if oldItem, found := b.trie.Search(key); found {
b.metadata.ReclaimableSpace += oldItem.(internal.Item).Size
}
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
b.trie.Insert(key, item)
b.ttlIndex.Insert(key, expiry)
return nil
}
@ -230,7 +262,7 @@ func (b *Bitcask) Delete(key []byte) error {
// delete deletes the named key. If the key doesn't exist or an I/O error
// occurs the error is returned.
func (b *Bitcask) delete(key []byte) error {
_, _, err := b.put(key, []byte{}, Feature{})
_, _, err := b.put(key, []byte{})
if err != nil {
return err
}
@ -249,7 +281,7 @@ func (b *Bitcask) DeleteAll() (err error) {
defer b.mu.RUnlock()
b.trie.ForEach(func(node art.Node) bool {
_, _, err = b.put(node.Key(), []byte{}, Feature{})
_, _, err = b.put(node.Key(), []byte{})
if err != nil {
return false
}
@ -381,48 +413,82 @@ func (b *Bitcask) get(key []byte) (internal.Entry, error) {
return e, nil
}
// put inserts a new (key, value). Both key and value are valid inputs.
func (b *Bitcask) put(key, value []byte, feature Feature) (int64, int64, error) {
func (b *Bitcask) maybeRotate() error {
size := b.curr.Size()
if size >= int64(b.config.MaxDatafileSize) {
err := b.curr.Close()
if err != nil {
return -1, 0, err
}
id := b.curr.FileID()
df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
if err != nil {
return -1, 0, err
}
b.datafiles[id] = df
id = b.curr.FileID() + 1
curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
if err != nil {
return -1, 0, err
}
b.curr = curr
err = b.saveIndexes()
if err != nil {
return -1, 0, err
}
if size < int64(b.config.MaxDatafileSize) {
return nil
}
e := internal.NewEntry(key, value, feature.Expiry)
return b.curr.Write(e)
}
// closeCurrentFile closes current datafile and makes it read only.
func (b *Bitcask) closeCurrentFile() error {
err := b.curr.Close()
if err != nil {
return err
}
id := b.curr.FileID()
df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
df, err := data.NewDatafile(
b.path, id, true,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
b.datafiles[id] = df
id = b.curr.FileID() + 1
curr, err := data.NewDatafile(
b.path, id, false,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
b.curr = curr
err = b.saveIndexes()
if err != nil {
return err
}
return nil
}
// put inserts a new (key, value). Both key and value are valid inputs.
func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
if err := b.maybeRotate(); err != nil {
return -1, 0, fmt.Errorf("error rotating active datafile: %w", err)
}
return b.curr.Write(internal.NewEntry(key, value, nil))
}
// putWithExpiry inserts a new (key, value, expiry).
// Both key and value are valid inputs.
func (b *Bitcask) putWithExpiry(key, value []byte, expiry time.Time) (int64, int64, error) {
if err := b.maybeRotate(); err != nil {
return -1, 0, fmt.Errorf("error rotating active datafile: %w", err)
}
return b.curr.Write(internal.NewEntry(key, value, &expiry))
}
// closeCurrentFile closes current datafile and makes it read only.
func (b *Bitcask) closeCurrentFile() error {
if err := b.curr.Close(); err != nil {
return err
}
id := b.curr.FileID()
df, err := data.NewDatafile(
b.path, id, true,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
@ -434,7 +500,12 @@ func (b *Bitcask) closeCurrentFile() error {
// openNewWritableFile opens new datafile for writing data
func (b *Bitcask) openNewWritableFile() error {
id := b.curr.FileID() + 1
curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
curr, err := data.NewDatafile(
b.path, id, false,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
@ -442,6 +513,7 @@ func (b *Bitcask) openNewWritableFile() error {
return nil
}
// Reopen closes and reopsns the database
func (b *Bitcask) Reopen() error {
b.mu.Lock()
defer b.mu.Unlock()
@ -452,7 +524,12 @@ func (b *Bitcask) Reopen() error {
// reopen reloads a bitcask object with index and datafiles
// caller of this method should take care of locking
func (b *Bitcask) reopen() error {
datafiles, lastID, err := loadDatafiles(b.path, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
datafiles, lastID, err := loadDatafiles(
b.path,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
@ -461,7 +538,12 @@ func (b *Bitcask) reopen() error {
return err
}
curr, err := data.NewDatafile(b.path, lastID, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask)
curr, err := data.NewDatafile(
b.path, lastID, false,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
@ -532,14 +614,15 @@ func (b *Bitcask) Merge() error {
if err != nil {
return err
}
// prepare entry options
var opts []PutOptions
if e.Expiry != nil {
opts = append(opts, WithExpiry(*(e.Expiry)))
}
if err := mdb.Put(key, e.Value, opts...); err != nil {
return err
if e.Expiry != nil {
if err := mdb.PutWithTTL(key, e.Value, time.Until(*e.Expiry)); err != nil {
return err
}
} else {
if err := mdb.Put(key, e.Value); err != nil {
return err
}
}
return nil
@ -751,7 +834,12 @@ func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileMode
datafiles = make(map[int]data.Datafile, len(ids))
for _, id := range ids {
datafiles[id], err = data.NewDatafile(path, id, true, maxKeySize, maxValueSize, fileModeBeforeUmask)
datafiles[id], err = data.NewDatafile(
path, id, true,
maxKeySize,
maxValueSize,
fileModeBeforeUmask,
)
if err != nil {
return
}

@ -85,8 +85,8 @@ func TestAll(t *testing.T) {
assert.Equal(1, db.Len())
})
t.Run("PutWithExpiry", func(t *testing.T) {
err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now()))
t.Run("PutWithTTL", func(t *testing.T) {
err = db.PutWithTTL([]byte("bar"), []byte("baz"), 0)
assert.NoError(err)
})
@ -102,14 +102,14 @@ func TestAll(t *testing.T) {
})
t.Run("HasWithExpired", func(t *testing.T) {
err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now()))
err = db.PutWithTTL([]byte("bar"), []byte("baz"), 0)
assert.NoError(err)
time.Sleep(time.Millisecond)
assert.False(db.Has([]byte("bar")))
})
t.Run("RunGC", func(t *testing.T) {
err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now()))
err = db.PutWithTTL([]byte("bar"), []byte("baz"), 0)
assert.NoError(err)
time.Sleep(time.Millisecond)
err = db.RunGC()
@ -232,8 +232,8 @@ func TestReopen(t *testing.T) {
assert.NoError(err)
})
t.Run("PutWithExpiry", func(t *testing.T) {
err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now()))
t.Run("PutWithTTL", func(t *testing.T) {
err = db.PutWithTTL([]byte("bar"), []byte("baz"), 0)
assert.NoError(err)
})
@ -526,16 +526,16 @@ func TestLoadIndexes(t *testing.T) {
t.Run("Setup", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
for i:=0; i<5; i++ {
for i := 0; i < 5; i++ {
key := fmt.Sprintf("key%d", i)
val := fmt.Sprintf("val%d", i)
err := db.Put([]byte(key), []byte(val))
assert.NoError(err)
}
for i:=0; i<5; i++ {
for i := 0; i < 5; i++ {
key := fmt.Sprintf("foo%d", i)
val := fmt.Sprintf("bar%d", i)
err := db.Put([]byte(key), []byte(val), WithExpiry(time.Now().Add(time.Duration(i)*time.Second)))
err := db.PutWithTTL([]byte(key), []byte(val), time.Duration(i)*time.Second)
assert.NoError(err)
}
err = db.Close()
@ -573,12 +573,12 @@ func TestReIndex(t *testing.T) {
})
t.Run("PutWithExpiry", func(t *testing.T) {
err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now()))
err = db.PutWithTTL([]byte("bar"), []byte("baz"), 0)
assert.NoError(err)
})
t.Run("PutWithLargeExpiry", func(t *testing.T) {
err = db.Put([]byte("bar1"), []byte("baz1"), WithExpiry(time.Now().Add(time.Hour)))
err = db.PutWithTTL([]byte("bar1"), []byte("baz1"), time.Hour)
assert.NoError(err)
})

@ -38,13 +38,20 @@ func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
var ttl *time.Duration
key := cmd.Args[1]
value := cmd.Args[2]
var opts []bitcask.PutOptions
if len(cmd.Args) == 4 {
ttl, _ := binary.Varint(cmd.Args[3])
e := time.Now().UTC().Add(time.Duration(ttl)*time.Millisecond)
opts = append(opts, bitcask.WithExpiry(e))
val, n := binary.Varint(cmd.Args[3])
if n <= 0 {
conn.WriteError("ERR error parsing ttl")
return
}
d := time.Duration(val) * time.Millisecond
ttl = &d
}
err := s.db.Lock()
@ -54,11 +61,17 @@ func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) {
}
defer s.db.Unlock()
if err := s.db.Put(key, value, opts...); err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err))
if ttl != nil {
if err := s.db.PutWithTTL(key, value, *ttl); err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err))
}
} else {
conn.WriteString("OK")
if err := s.db.Put(key, value); err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err))
}
}
conn.WriteString("OK")
}
func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) {

@ -2,7 +2,6 @@ package bitcask
import (
"os"
"time"
"github.com/prologic/bitcask/internal/config"
)
@ -117,16 +116,3 @@ func newDefaultConfig() *config.Config {
DBVersion: CurrentDBVersion,
}
}
type Feature struct {
Expiry *time.Time
}
type PutOptions func(*Feature) error
func WithExpiry(expiry time.Time) PutOptions {
return func(f *Feature) error {
f.Expiry = &expiry
return nil
}
}