From b98b684bb46fff4a666d9908456583526f6fc696 Mon Sep 17 00:00:00 2001 From: James Mills Date: Fri, 9 Jul 2021 17:21:35 +1000 Subject: [PATCH] Refactor TTL with a new API PutWithTTL() and reduce memory allocs (#220) --- bitcask.go | 202 +++++++++++++++++++++++++++++------------ bitcask_test.go | 22 ++--- cmd/bitcaskd/server.go | 27 ++++-- options.go | 14 --- 4 files changed, 176 insertions(+), 89 deletions(-) diff --git a/bitcask.go b/bitcask.go index 8b46df0..e303189 100644 --- a/bitcask.go +++ b/bitcask.go @@ -174,7 +174,7 @@ func (b *Bitcask) Has(key []byte) bool { } // Put stores the key and value in the database. -func (b *Bitcask) Put(key, value []byte, options ...PutOptions) error { +func (b *Bitcask) Put(key, value []byte) error { if len(key) == 0 { return ErrEmptyKey } @@ -184,16 +184,10 @@ func (b *Bitcask) Put(key, value []byte, options ...PutOptions) error { if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize { return ErrValueTooLarge } - var feature Feature - for _, opt := range options { - if err := opt(&feature); err != nil { - return err - } - } b.mu.Lock() defer b.mu.Unlock() - offset, n, err := b.put(key, value, feature) + offset, n, err := b.put(key, value) if err != nil { return err } @@ -213,9 +207,47 @@ func (b *Bitcask) Put(key, value []byte, options ...PutOptions) error { item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n} b.trie.Insert(key, item) - if feature.Expiry != nil { - b.ttlIndex.Insert(key, *feature.Expiry) + + return nil +} + +// PutWithTTL stores the key and value in the database with the given TTL +func (b *Bitcask) PutWithTTL(key, value []byte, ttl time.Duration) error { + if len(key) == 0 { + return ErrEmptyKey } + if b.config.MaxKeySize > 0 && uint32(len(key)) > b.config.MaxKeySize { + return ErrKeyTooLarge + } + if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize { + return ErrValueTooLarge + } + + expiry := time.Now().Add(ttl) + + b.mu.Lock() + defer b.mu.Unlock() + offset, n, err := b.putWithExpiry(key, value, expiry) + if err != nil { + return err + } + + if b.config.Sync { + if err := b.curr.Sync(); err != nil { + return err + } + } + + // in case of successful `put`, IndexUpToDate will be always be false + b.metadata.IndexUpToDate = false + + if oldItem, found := b.trie.Search(key); found { + b.metadata.ReclaimableSpace += oldItem.(internal.Item).Size + } + + item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n} + b.trie.Insert(key, item) + b.ttlIndex.Insert(key, expiry) return nil } @@ -230,7 +262,7 @@ func (b *Bitcask) Delete(key []byte) error { // delete deletes the named key. If the key doesn't exist or an I/O error // occurs the error is returned. func (b *Bitcask) delete(key []byte) error { - _, _, err := b.put(key, []byte{}, Feature{}) + _, _, err := b.put(key, []byte{}) if err != nil { return err } @@ -249,7 +281,7 @@ func (b *Bitcask) DeleteAll() (err error) { defer b.mu.RUnlock() b.trie.ForEach(func(node art.Node) bool { - _, _, err = b.put(node.Key(), []byte{}, Feature{}) + _, _, err = b.put(node.Key(), []byte{}) if err != nil { return false } @@ -381,48 +413,82 @@ func (b *Bitcask) get(key []byte) (internal.Entry, error) { return e, nil } -// put inserts a new (key, value). Both key and value are valid inputs. -func (b *Bitcask) put(key, value []byte, feature Feature) (int64, int64, error) { +func (b *Bitcask) maybeRotate() error { size := b.curr.Size() - if size >= int64(b.config.MaxDatafileSize) { - err := b.curr.Close() - if err != nil { - return -1, 0, err - } - - id := b.curr.FileID() - - df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask) - if err != nil { - return -1, 0, err - } - - b.datafiles[id] = df - - id = b.curr.FileID() + 1 - curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask) - if err != nil { - return -1, 0, err - } - b.curr = curr - err = b.saveIndexes() - if err != nil { - return -1, 0, err - } + if size < int64(b.config.MaxDatafileSize) { + return nil } - e := internal.NewEntry(key, value, feature.Expiry) - return b.curr.Write(e) -} - -// closeCurrentFile closes current datafile and makes it read only. -func (b *Bitcask) closeCurrentFile() error { err := b.curr.Close() if err != nil { return err } + id := b.curr.FileID() - df, err := data.NewDatafile(b.path, id, true, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask) + + df, err := data.NewDatafile( + b.path, id, true, + b.config.MaxKeySize, + b.config.MaxValueSize, + b.config.FileFileModeBeforeUmask, + ) + if err != nil { + return err + } + + b.datafiles[id] = df + + id = b.curr.FileID() + 1 + curr, err := data.NewDatafile( + b.path, id, false, + b.config.MaxKeySize, + b.config.MaxValueSize, + b.config.FileFileModeBeforeUmask, + ) + if err != nil { + return err + } + b.curr = curr + err = b.saveIndexes() + if err != nil { + return err + } + + return nil +} + +// put inserts a new (key, value). Both key and value are valid inputs. +func (b *Bitcask) put(key, value []byte) (int64, int64, error) { + if err := b.maybeRotate(); err != nil { + return -1, 0, fmt.Errorf("error rotating active datafile: %w", err) + } + + return b.curr.Write(internal.NewEntry(key, value, nil)) +} + +// putWithExpiry inserts a new (key, value, expiry). +// Both key and value are valid inputs. +func (b *Bitcask) putWithExpiry(key, value []byte, expiry time.Time) (int64, int64, error) { + if err := b.maybeRotate(); err != nil { + return -1, 0, fmt.Errorf("error rotating active datafile: %w", err) + } + + return b.curr.Write(internal.NewEntry(key, value, &expiry)) +} + +// closeCurrentFile closes current datafile and makes it read only. +func (b *Bitcask) closeCurrentFile() error { + if err := b.curr.Close(); err != nil { + return err + } + + id := b.curr.FileID() + df, err := data.NewDatafile( + b.path, id, true, + b.config.MaxKeySize, + b.config.MaxValueSize, + b.config.FileFileModeBeforeUmask, + ) if err != nil { return err } @@ -434,7 +500,12 @@ func (b *Bitcask) closeCurrentFile() error { // openNewWritableFile opens new datafile for writing data func (b *Bitcask) openNewWritableFile() error { id := b.curr.FileID() + 1 - curr, err := data.NewDatafile(b.path, id, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask) + curr, err := data.NewDatafile( + b.path, id, false, + b.config.MaxKeySize, + b.config.MaxValueSize, + b.config.FileFileModeBeforeUmask, + ) if err != nil { return err } @@ -442,6 +513,7 @@ func (b *Bitcask) openNewWritableFile() error { return nil } +// Reopen closes and reopsns the database func (b *Bitcask) Reopen() error { b.mu.Lock() defer b.mu.Unlock() @@ -452,7 +524,12 @@ func (b *Bitcask) Reopen() error { // reopen reloads a bitcask object with index and datafiles // caller of this method should take care of locking func (b *Bitcask) reopen() error { - datafiles, lastID, err := loadDatafiles(b.path, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask) + datafiles, lastID, err := loadDatafiles( + b.path, + b.config.MaxKeySize, + b.config.MaxValueSize, + b.config.FileFileModeBeforeUmask, + ) if err != nil { return err } @@ -461,7 +538,12 @@ func (b *Bitcask) reopen() error { return err } - curr, err := data.NewDatafile(b.path, lastID, false, b.config.MaxKeySize, b.config.MaxValueSize, b.config.FileFileModeBeforeUmask) + curr, err := data.NewDatafile( + b.path, lastID, false, + b.config.MaxKeySize, + b.config.MaxValueSize, + b.config.FileFileModeBeforeUmask, + ) if err != nil { return err } @@ -532,14 +614,15 @@ func (b *Bitcask) Merge() error { if err != nil { return err } - // prepare entry options - var opts []PutOptions - if e.Expiry != nil { - opts = append(opts, WithExpiry(*(e.Expiry))) - } - if err := mdb.Put(key, e.Value, opts...); err != nil { - return err + if e.Expiry != nil { + if err := mdb.PutWithTTL(key, e.Value, time.Until(*e.Expiry)); err != nil { + return err + } + } else { + if err := mdb.Put(key, e.Value); err != nil { + return err + } } return nil @@ -751,7 +834,12 @@ func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileMode datafiles = make(map[int]data.Datafile, len(ids)) for _, id := range ids { - datafiles[id], err = data.NewDatafile(path, id, true, maxKeySize, maxValueSize, fileModeBeforeUmask) + datafiles[id], err = data.NewDatafile( + path, id, true, + maxKeySize, + maxValueSize, + fileModeBeforeUmask, + ) if err != nil { return } diff --git a/bitcask_test.go b/bitcask_test.go index 631454d..f1a2825 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -85,8 +85,8 @@ func TestAll(t *testing.T) { assert.Equal(1, db.Len()) }) - t.Run("PutWithExpiry", func(t *testing.T) { - err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now())) + t.Run("PutWithTTL", func(t *testing.T) { + err = db.PutWithTTL([]byte("bar"), []byte("baz"), 0) assert.NoError(err) }) @@ -102,14 +102,14 @@ func TestAll(t *testing.T) { }) t.Run("HasWithExpired", func(t *testing.T) { - err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now())) + err = db.PutWithTTL([]byte("bar"), []byte("baz"), 0) assert.NoError(err) time.Sleep(time.Millisecond) assert.False(db.Has([]byte("bar"))) }) t.Run("RunGC", func(t *testing.T) { - err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now())) + err = db.PutWithTTL([]byte("bar"), []byte("baz"), 0) assert.NoError(err) time.Sleep(time.Millisecond) err = db.RunGC() @@ -232,8 +232,8 @@ func TestReopen(t *testing.T) { assert.NoError(err) }) - t.Run("PutWithExpiry", func(t *testing.T) { - err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now())) + t.Run("PutWithTTL", func(t *testing.T) { + err = db.PutWithTTL([]byte("bar"), []byte("baz"), 0) assert.NoError(err) }) @@ -526,16 +526,16 @@ func TestLoadIndexes(t *testing.T) { t.Run("Setup", func(t *testing.T) { db, err = Open(testdir) assert.NoError(err) - for i:=0; i<5; i++ { + for i := 0; i < 5; i++ { key := fmt.Sprintf("key%d", i) val := fmt.Sprintf("val%d", i) err := db.Put([]byte(key), []byte(val)) assert.NoError(err) } - for i:=0; i<5; i++ { + for i := 0; i < 5; i++ { key := fmt.Sprintf("foo%d", i) val := fmt.Sprintf("bar%d", i) - err := db.Put([]byte(key), []byte(val), WithExpiry(time.Now().Add(time.Duration(i)*time.Second))) + err := db.PutWithTTL([]byte(key), []byte(val), time.Duration(i)*time.Second) assert.NoError(err) } err = db.Close() @@ -573,12 +573,12 @@ func TestReIndex(t *testing.T) { }) t.Run("PutWithExpiry", func(t *testing.T) { - err = db.Put([]byte("bar"), []byte("baz"), WithExpiry(time.Now())) + err = db.PutWithTTL([]byte("bar"), []byte("baz"), 0) assert.NoError(err) }) t.Run("PutWithLargeExpiry", func(t *testing.T) { - err = db.Put([]byte("bar1"), []byte("baz1"), WithExpiry(time.Now().Add(time.Hour))) + err = db.PutWithTTL([]byte("bar1"), []byte("baz1"), time.Hour) assert.NoError(err) }) diff --git a/cmd/bitcaskd/server.go b/cmd/bitcaskd/server.go index 39aadc4..d580064 100644 --- a/cmd/bitcaskd/server.go +++ b/cmd/bitcaskd/server.go @@ -38,13 +38,20 @@ func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) { conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } + + var ttl *time.Duration + key := cmd.Args[1] value := cmd.Args[2] - var opts []bitcask.PutOptions + if len(cmd.Args) == 4 { - ttl, _ := binary.Varint(cmd.Args[3]) - e := time.Now().UTC().Add(time.Duration(ttl)*time.Millisecond) - opts = append(opts, bitcask.WithExpiry(e)) + val, n := binary.Varint(cmd.Args[3]) + if n <= 0 { + conn.WriteError("ERR error parsing ttl") + return + } + d := time.Duration(val) * time.Millisecond + ttl = &d } err := s.db.Lock() @@ -54,11 +61,17 @@ func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) { } defer s.db.Unlock() - if err := s.db.Put(key, value, opts...); err != nil { - conn.WriteString(fmt.Sprintf("ERR: %s", err)) + if ttl != nil { + if err := s.db.PutWithTTL(key, value, *ttl); err != nil { + conn.WriteString(fmt.Sprintf("ERR: %s", err)) + } } else { - conn.WriteString("OK") + if err := s.db.Put(key, value); err != nil { + conn.WriteString(fmt.Sprintf("ERR: %s", err)) + } } + + conn.WriteString("OK") } func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) { diff --git a/options.go b/options.go index 701df80..ad8f494 100644 --- a/options.go +++ b/options.go @@ -2,7 +2,6 @@ package bitcask import ( "os" - "time" "github.com/prologic/bitcask/internal/config" ) @@ -117,16 +116,3 @@ func newDefaultConfig() *config.Config { DBVersion: CurrentDBVersion, } } - -type Feature struct { - Expiry *time.Time -} - -type PutOptions func(*Feature) error - -func WithExpiry(expiry time.Time) PutOptions { - return func(f *Feature) error { - f.Expiry = &expiry - return nil - } -}