From 51bac21c0a1f7bab8672baab81d143918b5ca2df Mon Sep 17 00:00:00 2001 From: James Mills <1290234+prologic@users.noreply.github.com> Date: Sat, 27 Jul 2019 07:52:25 +1000 Subject: [PATCH] Improves Merge() operation by also pruning old key/value pairs (#29) * Added new API Stats() and Prune() * Improved Merge() logic to also prune old key/values and actually reclaim disk space * Added backward compat for the old Merge() function * Refactor indexing of keys to items (hints) * Remove redundant TestOpenMerge * Add unit test for Stats() * Improve TestMerge() --- bitcask.go | 341 +++++++++++++++++++++++-------------------- bitcask_test.go | 127 ++++++---------- cmd/bitcask/merge.go | 21 +-- cmd/bitcask/stats.go | 55 +++++++ internal/keydir.go | 16 ++ internal/utils.go | 20 +++ 6 files changed, 326 insertions(+), 254 deletions(-) create mode 100644 cmd/bitcask/stats.go diff --git a/bitcask.go b/bitcask.go index a01c50b..b9903b5 100644 --- a/bitcask.go +++ b/bitcask.go @@ -6,8 +6,8 @@ import ( "io" "io/ioutil" "os" + "path" "path/filepath" - "strings" "sync" "github.com/gofrs/flock" @@ -46,6 +46,7 @@ type Bitcask struct { *flock.Flock config *config + options []Option path string curr *internal.Datafile keydir *internal.Keydir @@ -53,6 +54,30 @@ type Bitcask struct { trie *trie.Trie } +// Stats is a struct returned by Stats() on an open Bitcask instance +type Stats struct { + Datafiles int + Keys int + Size int64 +} + +// Stats returns statistics about the database including the number of +// data files, keys and overall size on disk of the data +func (b *Bitcask) Stats() (stats Stats, err error) { + var size int64 + + size, err = internal.DirSize(b.path) + if err != nil { + return + } + + stats.Datafiles = len(b.datafiles) + stats.Keys = b.keydir.Len() + stats.Size = size + + return +} + // Close closes the database and removes the lock. It is important to call // Close() as this is the only way to cleanup the lock held by the open // database. @@ -62,9 +87,16 @@ func (b *Bitcask) Close() error { os.Remove(b.Flock.Path()) }() - for _, df := range b.datafiles { - df.Close() + if err := b.keydir.Save(path.Join(b.path, "index")); err != nil { + return err } + + for _, df := range b.datafiles { + if err := df.Close(); err != nil { + return err + } + } + return b.curr.Close() } @@ -207,11 +239,11 @@ func (b *Bitcask) put(key string, value []byte) (int64, int64, error) { return b.curr.Write(e) } -// Merge merges all datafiles in the database creating hint files for faster -// startup. Old keys are squashed and deleted keys removes. Call this function -// periodically to reclaim disk space. -func Merge(path string, force bool) error { - fns, err := internal.GetDatafiles(path) +func (b *Bitcask) reopen() error { + b.mu.Lock() + defer b.mu.Unlock() + + fns, err := internal.GetDatafiles(b.path) if err != nil { return err } @@ -221,162 +253,36 @@ func Merge(path string, force bool) error { return err } - // Do not merge if we only have 1 Datafile - if len(ids) <= 1 { - return nil - } - - // Don't merge the Active Datafile (the last one) - fns = fns[:len(fns)-1] - ids = ids[:len(ids)-1] - - temp, err := ioutil.TempDir(path, "merge") - if err != nil { - return err - } - defer os.RemoveAll(temp) - - for i, fn := range fns { - // Don't merge Datafiles whose .hint files we've already generated - // (they are already merged); unless we set the force flag to true - // (forcing a re-merge). - if filepath.Ext(fn) == ".hint" && !force { - // Already merged - continue - } - - id := ids[i] - - keydir := internal.NewKeydir() - - df, err := internal.NewDatafile(path, id, true) - if err != nil { - return err - } - defer df.Close() - - for { - e, n, err := df.Read() - if err != nil { - if err == io.EOF { - break - } - return err - } - - // Tombstone value (deleted key) - if len(e.Value) == 0 { - keydir.Delete(e.Key) - continue - } - - keydir.Add(e.Key, ids[i], e.Offset, n) - } - - tempdf, err := internal.NewDatafile(temp, id, false) - if err != nil { - return err - } - defer tempdf.Close() - - for key := range keydir.Keys() { - item, _ := keydir.Get(key) - e, err := df.ReadAt(item.Offset, item.Size) - if err != nil { - return err - } - - _, _, err = tempdf.Write(e) - if err != nil { - return err - } - } - - err = tempdf.Close() - if err != nil { - return err - } - - err = df.Close() - if err != nil { - return err - } - - err = os.Rename(tempdf.Name(), df.Name()) - if err != nil { - return err - } - - hint := strings.TrimSuffix(df.Name(), ".data") + ".hint" - err = keydir.Save(hint) - if err != nil { - return err - } - } - - return nil -} - -// Open opens the database at the given path with optional options. -// Options can be provided with the `WithXXX` functions that provide -// configuration options as functions. -func Open(path string, options ...Option) (*Bitcask, error) { - if err := os.MkdirAll(path, 0755); err != nil { - return nil, err - } - - err := Merge(path, false) - if err != nil { - return nil, err - } - - fns, err := internal.GetDatafiles(path) - if err != nil { - return nil, err - } - - ids, err := internal.ParseIds(fns) - if err != nil { - return nil, err - } - var datafiles []*internal.Datafile + for _, id := range ids { + df, err := internal.NewDatafile(b.path, id, true) + if err != nil { + return err + } + datafiles = append(datafiles, df) + } + keydir := internal.NewKeydir() trie := trie.New() - for i, fn := range fns { - df, err := internal.NewDatafile(path, ids[i], true) - if err != nil { - return nil, err + if internal.Exists(path.Join(b.path, "index")) { + if err := keydir.Load(path.Join(b.path, "index")); err != nil { + return err } - datafiles = append(datafiles, df) - - if filepath.Ext(fn) == ".hint" { - f, err := os.Open(filepath.Join(path, fn)) - if err != nil { - return nil, err - } - defer f.Close() - - hint, err := internal.NewKeydirFromBytes(f) - if err != nil { - return nil, err - } - - for key := range hint.Keys() { - item, _ := hint.Get(key) - _ = keydir.Add(key, item.FileID, item.Offset, item.Size) - trie.Add(key, item) - } - } else { + for key := range keydir.Keys() { + item, _ := keydir.Get(key) + trie.Add(key, item) + } + } else { + for i, df := range datafiles { for { e, n, err := df.Read() if err != nil { if err == io.EOF { break } - return nil, err + return err } // Tombstone value (deleted key) @@ -396,24 +302,118 @@ func Open(path string, options ...Option) (*Bitcask, error) { id = ids[(len(ids) - 1)] } - curr, err := internal.NewDatafile(path, id, false) + curr, err := internal.NewDatafile(b.path, id, false) if err != nil { + return err + } + + b.curr = curr + b.datafiles = datafiles + + b.keydir = keydir + + b.trie = trie + + return nil +} + +// Merge merges all datafiles in the database. Old keys are squashed +// and deleted keys removes. Duplicate key/value pairs are also removed. +// Call this function periodically to reclaim disk space. +func (b *Bitcask) Merge() error { + // Temporary merged database path + temp, err := ioutil.TempDir(b.path, "merge") + if err != nil { + return err + } + defer os.RemoveAll(temp) + + // Create a merged database + mdb, err := Open(temp, b.options...) + if err != nil { + return err + } + + // Rewrite all key/value pairs into merged database + // Doing this automatically strips deleted keys and + // old key/value pairs + err = b.Fold(func(key string) error { + value, err := b.Get(key) + if err != nil { + return err + } + + if err := mdb.Put(key, value); err != nil { + return err + } + + return nil + }) + if err != nil { + return err + } + + err = mdb.Close() + if err != nil { + return err + } + + // Close the database + err = b.Close() + if err != nil { + return err + } + + // Remove all data files + files, err := ioutil.ReadDir(b.path) + if err != nil { + return err + } + for _, file := range files { + if !file.IsDir() { + err := os.RemoveAll(path.Join([]string{b.path, file.Name()}...)) + if err != nil { + return err + } + } + } + + // Rename all merged data files + files, err = ioutil.ReadDir(mdb.path) + if err != nil { + return err + } + for _, file := range files { + err := os.Rename( + path.Join([]string{mdb.path, file.Name()}...), + path.Join([]string{b.path, file.Name()}...), + ) + if err != nil { + return err + } + } + + // And finally reopen the database + return b.reopen() +} + +// Open opens the database at the given path with optional options. +// Options can be provided with the `WithXXX` functions that provide +// configuration options as functions. +func Open(path string, options ...Option) (*Bitcask, error) { + if err := os.MkdirAll(path, 0755); err != nil { return nil, err } bitcask := &Bitcask{ - Flock: flock.New(filepath.Join(path, "lock")), - config: newDefaultConfig(), - path: path, - curr: curr, - keydir: keydir, - datafiles: datafiles, - trie: trie, + Flock: flock.New(filepath.Join(path, "lock")), + config: newDefaultConfig(), + options: options, + path: path, } for _, opt := range options { - err = opt(bitcask.config) - if err != nil { + if err := opt(bitcask.config); err != nil { return nil, err } } @@ -427,5 +427,22 @@ func Open(path string, options ...Option) (*Bitcask, error) { return nil, ErrDatabaseLocked } + if err := bitcask.reopen(); err != nil { + return nil, err + } + return bitcask, nil } + +// Merge calls Bitcask.Merge() +// XXX: Deprecated; Please use the `.Merge()` method +// XXX: This is only kept here for backwards compatibility +// it will be removed in future releases at some point +func Merge(path string, force bool) error { + db, err := Open(path) + if err != nil { + return err + } + + return db.Merge() +} diff --git a/bitcask_test.go b/bitcask_test.go index 9079378..0a6d587 100644 --- a/bitcask_test.go +++ b/bitcask_test.go @@ -211,38 +211,39 @@ func TestMaxValueSize(t *testing.T) { }) } -func TestOpenMerge(t *testing.T) { +func TestStats(t *testing.T) { + var ( + db *Bitcask + err error + ) + assert := assert.New(t) testdir, err := ioutil.TempDir("", "bitcask") assert.NoError(err) t.Run("Setup", func(t *testing.T) { - var ( - db *Bitcask - err error - ) - t.Run("Open", func(t *testing.T) { - db, err = Open(testdir, WithMaxDatafileSize(32)) + db, err = Open(testdir) assert.NoError(err) }) t.Run("Put", func(t *testing.T) { - for i := 0; i < 1024; i++ { - err = db.Put(string(i), []byte(strings.Repeat(" ", 1024))) - assert.NoError(err) - } + err := db.Put("foo", []byte("bar")) + assert.NoError(err) }) t.Run("Get", func(t *testing.T) { - for i := 0; i < 32; i++ { - err = db.Put(string(i), []byte(strings.Repeat(" ", 1024))) - assert.NoError(err) - val, err := db.Get(string(i)) - assert.NoError(err) - assert.Equal([]byte(strings.Repeat(" ", 1024)), val) - } + val, err := db.Get("foo") + assert.NoError(err) + assert.Equal([]byte("bar"), val) + }) + + t.Run("Stats", func(t *testing.T) { + stats, err := db.Stats() + assert.NoError(err) + assert.Equal(stats.Datafiles, 0) + assert.Equal(stats.Keys, 1) }) t.Run("Sync", func(t *testing.T) { @@ -255,34 +256,9 @@ func TestOpenMerge(t *testing.T) { assert.NoError(err) }) }) - - t.Run("Merge", func(t *testing.T) { - var ( - db *Bitcask - err error - ) - - t.Run("Open", func(t *testing.T) { - db, err = Open(testdir) - assert.NoError(err) - }) - - t.Run("Get", func(t *testing.T) { - for i := 0; i < 32; i++ { - val, err := db.Get(string(i)) - assert.NoError(err) - assert.Equal([]byte(strings.Repeat(" ", 1024)), val) - } - }) - - t.Run("Close", func(t *testing.T) { - err = db.Close() - assert.NoError(err) - }) - }) } -func TestMergeOpen(t *testing.T) { +func TestMerge(t *testing.T) { var ( db *Bitcask err error @@ -300,22 +276,40 @@ func TestMergeOpen(t *testing.T) { }) t.Run("Put", func(t *testing.T) { - for i := 0; i < 1024; i++ { - err = db.Put(string(i), []byte(strings.Repeat(" ", 1024))) + err := db.Put("foo", []byte("bar")) + assert.NoError(err) + }) + + s1, err := db.Stats() + assert.NoError(err) + assert.Equal(0, s1.Datafiles) + assert.Equal(1, s1.Keys) + + t.Run("Put", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := db.Put("foo", []byte("bar")) assert.NoError(err) } }) - t.Run("Get", func(t *testing.T) { - for i := 0; i < 32; i++ { - err = db.Put(string(i), []byte(strings.Repeat(" ", 1024))) - assert.NoError(err) - val, err := db.Get(string(i)) - assert.NoError(err) - assert.Equal([]byte(strings.Repeat(" ", 1024)), val) - } + s2, err := db.Stats() + assert.NoError(err) + assert.Equal(5, s2.Datafiles) + assert.Equal(1, s2.Keys) + assert.True(s2.Size > s1.Size) + + t.Run("Merge", func(t *testing.T) { + err := db.Merge() + assert.NoError(err) }) + s3, err := db.Stats() + assert.NoError(err) + assert.Equal(1, s3.Datafiles) + assert.Equal(1, s3.Keys) + assert.True(s3.Size > s1.Size) + assert.True(s3.Size < s2.Size) + t.Run("Sync", func(t *testing.T) { err = db.Sync() assert.NoError(err) @@ -326,31 +320,6 @@ func TestMergeOpen(t *testing.T) { assert.NoError(err) }) }) - - t.Run("Merge", func(t *testing.T) { - t.Run("Merge", func(t *testing.T) { - err = Merge(testdir, true) - assert.NoError(err) - }) - - t.Run("Open", func(t *testing.T) { - db, err = Open(testdir) - assert.NoError(err) - }) - - t.Run("Get", func(t *testing.T) { - for i := 0; i < 32; i++ { - val, err := db.Get(string(i)) - assert.NoError(err) - assert.Equal([]byte(strings.Repeat(" ", 1024)), val) - } - }) - - t.Run("Close", func(t *testing.T) { - err = db.Close() - assert.NoError(err) - }) - }) } func TestConcurrent(t *testing.T) { diff --git a/cmd/bitcask/merge.go b/cmd/bitcask/merge.go index 2090b9b..4485529 100644 --- a/cmd/bitcask/merge.go +++ b/cmd/bitcask/merge.go @@ -20,28 +20,23 @@ keys.`, Args: cobra.ExactArgs(0), Run: func(cmd *cobra.Command, args []string) { path := viper.GetString("path") - force, err := cmd.Flags().GetBool("force") - if err != nil { - log.WithError(err).Error("error parsing force flag") - os.Exit(1) - } - os.Exit(merge(path, force)) + os.Exit(merge(path)) }, } func init() { RootCmd.AddCommand(mergeCmd) - - mergeCmd.Flags().BoolP( - "force", "f", false, - "Force a re-merge even if .hint files exist", - ) } -func merge(path string, force bool) int { - err := bitcask.Merge(path, force) +func merge(path string) int { + db, err := bitcask.Open(path) if err != nil { + log.WithError(err).Error("error opening database") + return 1 + } + + if err = db.Merge(); err != nil { log.WithError(err).Error("error merging database") return 1 } diff --git a/cmd/bitcask/stats.go b/cmd/bitcask/stats.go new file mode 100644 index 0000000..5b8708a --- /dev/null +++ b/cmd/bitcask/stats.go @@ -0,0 +1,55 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/prologic/bitcask" +) + +var statsCmd = &cobra.Command{ + Use: "stats", + Aliases: []string{}, + Short: "Display statis about the Database", + Long: `This displays statistics about the Database"`, + Args: cobra.ExactArgs(0), + Run: func(cmd *cobra.Command, args []string) { + path := viper.GetString("path") + + os.Exit(stats(path)) + }, +} + +func init() { + RootCmd.AddCommand(statsCmd) +} + +func stats(path string) int { + db, err := bitcask.Open(path) + if err != nil { + log.WithError(err).Error("error opening database") + return 1 + } + defer db.Close() + + stats, err := db.Stats() + if err != nil { + log.WithError(err).Error("error getting stats") + return 1 + } + + data, err := json.MarshalIndent(stats, "", " ") + if err != nil { + log.WithError(err).Error("error marshalling stats") + return 1 + } + + fmt.Println(string(data)) + + return 0 +} diff --git a/internal/keydir.go b/internal/keydir.go index 6e94fb3..be92d74 100644 --- a/internal/keydir.go +++ b/internal/keydir.go @@ -5,6 +5,7 @@ import ( "encoding/gob" "io" "io/ioutil" + "os" "sync" ) @@ -81,6 +82,21 @@ func (k *Keydir) Bytes() ([]byte, error) { return buf.Bytes(), nil } +func (k *Keydir) Load(fn string) error { + f, err := os.Open(fn) + if err != nil { + return err + } + defer f.Close() + + dec := gob.NewDecoder(f) + if err := dec.Decode(&k.kv); err != nil { + return err + } + + return nil +} + func (k *Keydir) Save(fn string) error { data, err := k.Bytes() if err != nil { diff --git a/internal/utils.go b/internal/utils.go index f174715..158c0d1 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -2,12 +2,32 @@ package internal import ( "fmt" + "os" "path/filepath" "sort" "strconv" "strings" ) +func Exists(path string) bool { + _, err := os.Stat(path) + return err == nil +} + +func DirSize(path string) (int64, error) { + var size int64 + err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + size += info.Size() + } + return err + }) + return size, err +} + func GetDatafiles(path string) ([]string, error) { fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path)) if err != nil {