Improves Merge() operation by also pruning old key/value pairs (#29)

* Added new API Stats() and Prune()

* Improved Merge() logic to also prune old key/values and actually reclaim disk space

* Added backward compat for the old Merge() function

* Refactor indexing of keys to items (hints)

* Remove redundant TestOpenMerge

* Add unit test for Stats()

* Improve TestMerge()
This commit is contained in:
James Mills 2019-07-27 07:52:25 +10:00 committed by GitHub
parent b7ac95d66a
commit 51bac21c0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 326 additions and 254 deletions

@ -6,8 +6,8 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"sync"
"github.com/gofrs/flock"
@ -46,6 +46,7 @@ type Bitcask struct {
*flock.Flock
config *config
options []Option
path string
curr *internal.Datafile
keydir *internal.Keydir
@ -53,6 +54,30 @@ type Bitcask struct {
trie *trie.Trie
}
// Stats is a struct returned by Stats() on an open Bitcask instance
type Stats struct {
Datafiles int
Keys int
Size int64
}
// Stats returns statistics about the database including the number of
// data files, keys and overall size on disk of the data
func (b *Bitcask) Stats() (stats Stats, err error) {
var size int64
size, err = internal.DirSize(b.path)
if err != nil {
return
}
stats.Datafiles = len(b.datafiles)
stats.Keys = b.keydir.Len()
stats.Size = size
return
}
// Close closes the database and removes the lock. It is important to call
// Close() as this is the only way to cleanup the lock held by the open
// database.
@ -62,9 +87,16 @@ func (b *Bitcask) Close() error {
os.Remove(b.Flock.Path())
}()
for _, df := range b.datafiles {
df.Close()
if err := b.keydir.Save(path.Join(b.path, "index")); err != nil {
return err
}
for _, df := range b.datafiles {
if err := df.Close(); err != nil {
return err
}
}
return b.curr.Close()
}
@ -207,11 +239,11 @@ func (b *Bitcask) put(key string, value []byte) (int64, int64, error) {
return b.curr.Write(e)
}
// Merge merges all datafiles in the database creating hint files for faster
// startup. Old keys are squashed and deleted keys removes. Call this function
// periodically to reclaim disk space.
func Merge(path string, force bool) error {
fns, err := internal.GetDatafiles(path)
func (b *Bitcask) reopen() error {
b.mu.Lock()
defer b.mu.Unlock()
fns, err := internal.GetDatafiles(b.path)
if err != nil {
return err
}
@ -221,162 +253,36 @@ func Merge(path string, force bool) error {
return err
}
// Do not merge if we only have 1 Datafile
if len(ids) <= 1 {
return nil
}
// Don't merge the Active Datafile (the last one)
fns = fns[:len(fns)-1]
ids = ids[:len(ids)-1]
temp, err := ioutil.TempDir(path, "merge")
if err != nil {
return err
}
defer os.RemoveAll(temp)
for i, fn := range fns {
// Don't merge Datafiles whose .hint files we've already generated
// (they are already merged); unless we set the force flag to true
// (forcing a re-merge).
if filepath.Ext(fn) == ".hint" && !force {
// Already merged
continue
}
id := ids[i]
keydir := internal.NewKeydir()
df, err := internal.NewDatafile(path, id, true)
if err != nil {
return err
}
defer df.Close()
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
keydir.Delete(e.Key)
continue
}
keydir.Add(e.Key, ids[i], e.Offset, n)
}
tempdf, err := internal.NewDatafile(temp, id, false)
if err != nil {
return err
}
defer tempdf.Close()
for key := range keydir.Keys() {
item, _ := keydir.Get(key)
e, err := df.ReadAt(item.Offset, item.Size)
if err != nil {
return err
}
_, _, err = tempdf.Write(e)
if err != nil {
return err
}
}
err = tempdf.Close()
if err != nil {
return err
}
err = df.Close()
if err != nil {
return err
}
err = os.Rename(tempdf.Name(), df.Name())
if err != nil {
return err
}
hint := strings.TrimSuffix(df.Name(), ".data") + ".hint"
err = keydir.Save(hint)
if err != nil {
return err
}
}
return nil
}
// Open opens the database at the given path with optional options.
// Options can be provided with the `WithXXX` functions that provide
// configuration options as functions.
func Open(path string, options ...Option) (*Bitcask, error) {
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
err := Merge(path, false)
if err != nil {
return nil, err
}
fns, err := internal.GetDatafiles(path)
if err != nil {
return nil, err
}
ids, err := internal.ParseIds(fns)
if err != nil {
return nil, err
}
var datafiles []*internal.Datafile
for _, id := range ids {
df, err := internal.NewDatafile(b.path, id, true)
if err != nil {
return err
}
datafiles = append(datafiles, df)
}
keydir := internal.NewKeydir()
trie := trie.New()
for i, fn := range fns {
df, err := internal.NewDatafile(path, ids[i], true)
if err != nil {
return nil, err
if internal.Exists(path.Join(b.path, "index")) {
if err := keydir.Load(path.Join(b.path, "index")); err != nil {
return err
}
datafiles = append(datafiles, df)
if filepath.Ext(fn) == ".hint" {
f, err := os.Open(filepath.Join(path, fn))
if err != nil {
return nil, err
}
defer f.Close()
hint, err := internal.NewKeydirFromBytes(f)
if err != nil {
return nil, err
}
for key := range hint.Keys() {
item, _ := hint.Get(key)
_ = keydir.Add(key, item.FileID, item.Offset, item.Size)
trie.Add(key, item)
}
} else {
for key := range keydir.Keys() {
item, _ := keydir.Get(key)
trie.Add(key, item)
}
} else {
for i, df := range datafiles {
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return nil, err
return err
}
// Tombstone value (deleted key)
@ -396,24 +302,118 @@ func Open(path string, options ...Option) (*Bitcask, error) {
id = ids[(len(ids) - 1)]
}
curr, err := internal.NewDatafile(path, id, false)
curr, err := internal.NewDatafile(b.path, id, false)
if err != nil {
return err
}
b.curr = curr
b.datafiles = datafiles
b.keydir = keydir
b.trie = trie
return nil
}
// Merge merges all datafiles in the database. Old keys are squashed
// and deleted keys removes. Duplicate key/value pairs are also removed.
// Call this function periodically to reclaim disk space.
func (b *Bitcask) Merge() error {
// Temporary merged database path
temp, err := ioutil.TempDir(b.path, "merge")
if err != nil {
return err
}
defer os.RemoveAll(temp)
// Create a merged database
mdb, err := Open(temp, b.options...)
if err != nil {
return err
}
// Rewrite all key/value pairs into merged database
// Doing this automatically strips deleted keys and
// old key/value pairs
err = b.Fold(func(key string) error {
value, err := b.Get(key)
if err != nil {
return err
}
if err := mdb.Put(key, value); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = mdb.Close()
if err != nil {
return err
}
// Close the database
err = b.Close()
if err != nil {
return err
}
// Remove all data files
files, err := ioutil.ReadDir(b.path)
if err != nil {
return err
}
for _, file := range files {
if !file.IsDir() {
err := os.RemoveAll(path.Join([]string{b.path, file.Name()}...))
if err != nil {
return err
}
}
}
// Rename all merged data files
files, err = ioutil.ReadDir(mdb.path)
if err != nil {
return err
}
for _, file := range files {
err := os.Rename(
path.Join([]string{mdb.path, file.Name()}...),
path.Join([]string{b.path, file.Name()}...),
)
if err != nil {
return err
}
}
// And finally reopen the database
return b.reopen()
}
// Open opens the database at the given path with optional options.
// Options can be provided with the `WithXXX` functions that provide
// configuration options as functions.
func Open(path string, options ...Option) (*Bitcask, error) {
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
}
bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, "lock")),
config: newDefaultConfig(),
path: path,
curr: curr,
keydir: keydir,
datafiles: datafiles,
trie: trie,
Flock: flock.New(filepath.Join(path, "lock")),
config: newDefaultConfig(),
options: options,
path: path,
}
for _, opt := range options {
err = opt(bitcask.config)
if err != nil {
if err := opt(bitcask.config); err != nil {
return nil, err
}
}
@ -427,5 +427,22 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return nil, ErrDatabaseLocked
}
if err := bitcask.reopen(); err != nil {
return nil, err
}
return bitcask, nil
}
// Merge calls Bitcask.Merge()
// XXX: Deprecated; Please use the `.Merge()` method
// XXX: This is only kept here for backwards compatibility
// it will be removed in future releases at some point
func Merge(path string, force bool) error {
db, err := Open(path)
if err != nil {
return err
}
return db.Merge()
}

@ -211,38 +211,39 @@ func TestMaxValueSize(t *testing.T) {
})
}
func TestOpenMerge(t *testing.T) {
func TestStats(t *testing.T) {
var (
db *Bitcask
err error
)
assert := assert.New(t)
testdir, err := ioutil.TempDir("", "bitcask")
assert.NoError(err)
t.Run("Setup", func(t *testing.T) {
var (
db *Bitcask
err error
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir, WithMaxDatafileSize(32))
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Put", func(t *testing.T) {
for i := 0; i < 1024; i++ {
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
assert.NoError(err)
}
err := db.Put("foo", []byte("bar"))
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
for i := 0; i < 32; i++ {
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
assert.NoError(err)
val, err := db.Get(string(i))
assert.NoError(err)
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
}
val, err := db.Get("foo")
assert.NoError(err)
assert.Equal([]byte("bar"), val)
})
t.Run("Stats", func(t *testing.T) {
stats, err := db.Stats()
assert.NoError(err)
assert.Equal(stats.Datafiles, 0)
assert.Equal(stats.Keys, 1)
})
t.Run("Sync", func(t *testing.T) {
@ -255,34 +256,9 @@ func TestOpenMerge(t *testing.T) {
assert.NoError(err)
})
})
t.Run("Merge", func(t *testing.T) {
var (
db *Bitcask
err error
)
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
for i := 0; i < 32; i++ {
val, err := db.Get(string(i))
assert.NoError(err)
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
}
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
}
func TestMergeOpen(t *testing.T) {
func TestMerge(t *testing.T) {
var (
db *Bitcask
err error
@ -300,22 +276,40 @@ func TestMergeOpen(t *testing.T) {
})
t.Run("Put", func(t *testing.T) {
for i := 0; i < 1024; i++ {
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
err := db.Put("foo", []byte("bar"))
assert.NoError(err)
})
s1, err := db.Stats()
assert.NoError(err)
assert.Equal(0, s1.Datafiles)
assert.Equal(1, s1.Keys)
t.Run("Put", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := db.Put("foo", []byte("bar"))
assert.NoError(err)
}
})
t.Run("Get", func(t *testing.T) {
for i := 0; i < 32; i++ {
err = db.Put(string(i), []byte(strings.Repeat(" ", 1024)))
assert.NoError(err)
val, err := db.Get(string(i))
assert.NoError(err)
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
}
s2, err := db.Stats()
assert.NoError(err)
assert.Equal(5, s2.Datafiles)
assert.Equal(1, s2.Keys)
assert.True(s2.Size > s1.Size)
t.Run("Merge", func(t *testing.T) {
err := db.Merge()
assert.NoError(err)
})
s3, err := db.Stats()
assert.NoError(err)
assert.Equal(1, s3.Datafiles)
assert.Equal(1, s3.Keys)
assert.True(s3.Size > s1.Size)
assert.True(s3.Size < s2.Size)
t.Run("Sync", func(t *testing.T) {
err = db.Sync()
assert.NoError(err)
@ -326,31 +320,6 @@ func TestMergeOpen(t *testing.T) {
assert.NoError(err)
})
})
t.Run("Merge", func(t *testing.T) {
t.Run("Merge", func(t *testing.T) {
err = Merge(testdir, true)
assert.NoError(err)
})
t.Run("Open", func(t *testing.T) {
db, err = Open(testdir)
assert.NoError(err)
})
t.Run("Get", func(t *testing.T) {
for i := 0; i < 32; i++ {
val, err := db.Get(string(i))
assert.NoError(err)
assert.Equal([]byte(strings.Repeat(" ", 1024)), val)
}
})
t.Run("Close", func(t *testing.T) {
err = db.Close()
assert.NoError(err)
})
})
}
func TestConcurrent(t *testing.T) {

@ -20,28 +20,23 @@ keys.`,
Args: cobra.ExactArgs(0),
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
force, err := cmd.Flags().GetBool("force")
if err != nil {
log.WithError(err).Error("error parsing force flag")
os.Exit(1)
}
os.Exit(merge(path, force))
os.Exit(merge(path))
},
}
func init() {
RootCmd.AddCommand(mergeCmd)
mergeCmd.Flags().BoolP(
"force", "f", false,
"Force a re-merge even if .hint files exist",
)
}
func merge(path string, force bool) int {
err := bitcask.Merge(path, force)
func merge(path string) int {
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
return 1
}
if err = db.Merge(); err != nil {
log.WithError(err).Error("error merging database")
return 1
}

55
cmd/bitcask/stats.go Normal file

@ -0,0 +1,55 @@
package main
import (
"encoding/json"
"fmt"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/prologic/bitcask"
)
var statsCmd = &cobra.Command{
Use: "stats",
Aliases: []string{},
Short: "Display statis about the Database",
Long: `This displays statistics about the Database"`,
Args: cobra.ExactArgs(0),
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
os.Exit(stats(path))
},
}
func init() {
RootCmd.AddCommand(statsCmd)
}
func stats(path string) int {
db, err := bitcask.Open(path)
if err != nil {
log.WithError(err).Error("error opening database")
return 1
}
defer db.Close()
stats, err := db.Stats()
if err != nil {
log.WithError(err).Error("error getting stats")
return 1
}
data, err := json.MarshalIndent(stats, "", " ")
if err != nil {
log.WithError(err).Error("error marshalling stats")
return 1
}
fmt.Println(string(data))
return 0
}

@ -5,6 +5,7 @@ import (
"encoding/gob"
"io"
"io/ioutil"
"os"
"sync"
)
@ -81,6 +82,21 @@ func (k *Keydir) Bytes() ([]byte, error) {
return buf.Bytes(), nil
}
func (k *Keydir) Load(fn string) error {
f, err := os.Open(fn)
if err != nil {
return err
}
defer f.Close()
dec := gob.NewDecoder(f)
if err := dec.Decode(&k.kv); err != nil {
return err
}
return nil
}
func (k *Keydir) Save(fn string) error {
data, err := k.Bytes()
if err != nil {

@ -2,12 +2,32 @@ package internal
import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
)
func Exists(path string) bool {
_, err := os.Stat(path)
return err == nil
}
func DirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
}
return err
})
return size, err
}
func GetDatafiles(path string) ([]string, error) {
fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path))
if err != nil {