diff --git a/bitcask.go b/bitcask.go index e37ce9c..61cb557 100644 --- a/bitcask.go +++ b/bitcask.go @@ -14,6 +14,9 @@ import ( "github.com/gofrs/flock" art "github.com/plar/go-adaptive-radix-tree" "github.com/prologic/bitcask/internal" + "github.com/prologic/bitcask/internal/config" + "github.com/prologic/bitcask/internal/data" + "github.com/prologic/bitcask/internal/index" ) var ( @@ -45,11 +48,11 @@ type Bitcask struct { *flock.Flock - config *config + config *config.Config options []Option path string - curr *internal.Datafile - datafiles map[int]*internal.Datafile + curr *data.Datafile + datafiles map[int]*data.Datafile trie art.Tree } @@ -94,7 +97,7 @@ func (b *Bitcask) Close() error { } defer f.Close() - if err := internal.WriteIndex(b.trie, f); err != nil { + if err := index.WriteIndex(b.trie, f); err != nil { return err } if err := f.Sync(); err != nil { @@ -118,7 +121,7 @@ func (b *Bitcask) Sync() error { // Get retrieves the value of the given key. If the key is not found or an/I/O // error occurs a null byte slice is returned along with the error. func (b *Bitcask) Get(key []byte) ([]byte, error) { - var df *internal.Datafile + var df *data.Datafile b.mu.RLock() value, found := b.trie.Search(key) @@ -158,10 +161,10 @@ func (b *Bitcask) Has(key []byte) bool { // Put stores the key and value in the database. func (b *Bitcask) Put(key, value []byte) error { - if len(key) > b.config.maxKeySize { + if len(key) > b.config.MaxKeySize { return ErrKeyTooLarge } - if len(value) > b.config.maxValueSize { + if len(value) > b.config.MaxValueSize { return ErrValueTooLarge } @@ -170,7 +173,7 @@ func (b *Bitcask) Put(key, value []byte) error { return err } - if b.config.sync { + if b.config.Sync { if err := b.curr.Sync(); err != nil { return err } @@ -269,7 +272,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) { defer b.mu.Unlock() size := b.curr.Size() - if size >= int64(b.config.maxDatafileSize) { + if size >= int64(b.config.MaxDatafileSize) { err := b.curr.Close() if err != nil { return -1, 0, err @@ -277,7 +280,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) { id := b.curr.FileID() - df, err := internal.NewDatafile(b.path, id, true) + df, err := data.NewDatafile(b.path, id, true) if err != nil { return -1, 0, err } @@ -285,7 +288,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) { b.datafiles[id] = df id = b.curr.FileID() + 1 - curr, err := internal.NewDatafile(b.path, id, false) + curr, err := data.NewDatafile(b.path, id, false) if err != nil { return -1, 0, err } @@ -318,29 +321,21 @@ func (b *Bitcask) reopen() error { return err } - datafiles := make(map[int]*internal.Datafile, len(ids)) + datafiles := make(map[int]*data.Datafile, len(ids)) for _, id := range ids { - df, err := internal.NewDatafile(b.path, id, true) + df, err := data.NewDatafile(b.path, id, true) if err != nil { return err } datafiles[id] = df } - t := art.New() - - if internal.Exists(path.Join(b.path, "index")) { - f, err := os.Open(path.Join(b.path, "index")) - if err != nil { - return err - } - defer f.Close() - - if err := internal.ReadIndex(f, t, b.config.maxKeySize); err != nil { - return err - } - } else { + t, found, err := index.ReadFromFile(b.path, b.config.MaxKeySize) + if err != nil { + return err + } + if !found { for i, df := range datafiles { var offset int64 for { @@ -358,7 +353,6 @@ func (b *Bitcask) reopen() error { offset += n continue } - item := internal.Item{FileID: ids[i], Offset: offset, Size: n} t.Insert(e.Key, item) offset += n @@ -371,7 +365,7 @@ func (b *Bitcask) reopen() error { id = ids[(len(ids) - 1)] } - curr, err := internal.NewDatafile(b.path, id, false) + curr, err := data.NewDatafile(b.path, id, false) if err != nil { return err } @@ -468,7 +462,7 @@ func (b *Bitcask) Merge() error { // configuration options as functions. func Open(path string, options ...Option) (*Bitcask, error) { var ( - cfg *config + cfg *config.Config err error ) @@ -476,7 +470,7 @@ func Open(path string, options ...Option) (*Bitcask, error) { return nil, err } - cfg, err = getConfig(path) + cfg, err = config.Decode(path) if err != nil { cfg = newDefaultConfig() } diff --git a/cmd/bitcask/recover.go b/cmd/bitcask/recover.go new file mode 100644 index 0000000..a905495 --- /dev/null +++ b/cmd/bitcask/recover.go @@ -0,0 +1,84 @@ +package main + +import ( + "os" + + "github.com/prologic/bitcask" + "github.com/prologic/bitcask/internal/config" + "github.com/prologic/bitcask/internal/index" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +var recoveryCmd = &cobra.Command{ + Use: "recover", + Aliases: []string{"recovery"}, + Short: "Analyzes and recovers the index file for corruption scenarios", + Long: `This analyze files to detect different forms of persistence corruption in +persisted files. It also allows to recover the files to the latest point of integrity. +Recovered files have the .recovered extension`, + Args: cobra.ExactArgs(0), + PreRun: func(cmd *cobra.Command, args []string) { + viper.BindPFlag("dry-run", cmd.Flags().Lookup("dry-run")) + }, + Run: func(cmd *cobra.Command, args []string) { + path := viper.GetString("path") + dryRun := viper.GetBool("dry-run") + os.Exit(recover(path, dryRun)) + }, +} + +func init() { + RootCmd.AddCommand(recoveryCmd) + recoveryCmd.Flags().BoolP("dry-run", "n", false, "Will only check files health without applying recovery if unhealthy") +} + +func recover(path string, dryRun bool) int { + maxKeySize := bitcask.DefaultMaxKeySize + if cfg, err := config.Decode(path); err == nil { + maxKeySize = cfg.MaxKeySize + } + + t, found, err := index.ReadFromFile(path, maxKeySize) + if err != nil && !index.IsIndexCorruption(err) { + log.WithError(err).Info("error while opening the index file") + } + if !found { + log.Info("index file doesn't exist, will be recreated on next run.") + return 0 + } + + if err == nil { + log.Debug("index file is not corrupted") + return 0 + } + log.Debugf("index file is corrupted: %v", err) + + if dryRun { + log.Debug("dry-run mode, not writing to a file") + return 0 + } + + fi, err := os.OpenFile("index.recovered", os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + log.WithError(err).Info("error while creating recovered index file") + return 1 + } + + // Leverage that t has the partiatially read tree even on corrupted files + err = index.WriteIndex(t, fi) + if err != nil { + log.WithError(err).Info("error while writing the recovered index file") + + fi.Close() + return 1 + } + err = fi.Close() + if err != nil { + log.WithError(err).Info("the recovered file index coudn't be saved correctly") + } + log.Debug("the index was recovered in the index.recovered new file") + + return 0 +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..3fd2f63 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,36 @@ +package config + +import ( + "encoding/json" + "io/ioutil" + "path/filepath" +) + +// Config contains the bitcask configuration parameters +type Config struct { + MaxDatafileSize int `json:"max_datafile_size"` + MaxKeySize int `json:"max_key_size"` + MaxValueSize int `json:"max_value_size"` + Sync bool `json:"sync"` +} + +// Decode decodes a serialized configuration +func Decode(path string) (*Config, error) { + var cfg Config + + data, err := ioutil.ReadFile(filepath.Join(path, "config.json")) + if err != nil { + return nil, err + } + + if err := json.Unmarshal(data, &cfg); err != nil { + return nil, err + } + + return &cfg, nil +} + +// Encode encodes the configuration for storage +func (c *Config) Encode() ([]byte, error) { + return json.Marshal(c) +} diff --git a/internal/codec.go b/internal/data/codec.go similarity index 91% rename from internal/codec.go rename to internal/data/codec.go index e34373f..a49f45c 100644 --- a/internal/codec.go +++ b/internal/data/codec.go @@ -1,4 +1,4 @@ -package internal +package data import ( "bufio" @@ -6,6 +6,7 @@ import ( "io" "github.com/pkg/errors" + "github.com/prologic/bitcask/internal" ) const ( @@ -27,7 +28,7 @@ type Encoder struct { // Encode takes any Entry and streams it to the underlying writer. // Messages are framed with a key-length and value-length prefix. -func (e *Encoder) Encode(msg Entry) (int64, error) { +func (e *Encoder) Encode(msg internal.Entry) (int64, error) { var bufKeyValue = make([]byte, KeySize+ValueSize) binary.BigEndian.PutUint32(bufKeyValue[:KeySize], uint32(len(msg.Key))) binary.BigEndian.PutUint64(bufKeyValue[KeySize:KeySize+ValueSize], uint64(len(msg.Value))) @@ -66,7 +67,7 @@ type Decoder struct { r io.Reader } -func (d *Decoder) Decode(v *Entry) (int64, error) { +func (d *Decoder) Decode(v *internal.Entry) (int64, error) { prefixBuf := make([]byte, KeySize+ValueSize) _, err := io.ReadFull(d.r, prefixBuf) @@ -91,7 +92,7 @@ func GetKeyValueSizes(buf []byte) (uint64, uint64) { return uint64(actualKeySize), actualValueSize } -func DecodeWithoutPrefix(buf []byte, valueOffset uint64, v *Entry) { +func DecodeWithoutPrefix(buf []byte, valueOffset uint64, v *internal.Entry) { v.Key = buf[:valueOffset] v.Value = buf[valueOffset : len(buf)-checksumSize] v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:]) diff --git a/internal/datafile.go b/internal/data/datafile.go similarity index 90% rename from internal/datafile.go rename to internal/data/datafile.go index b7df731..9990f5c 100644 --- a/internal/datafile.go +++ b/internal/data/datafile.go @@ -1,4 +1,4 @@ -package internal +package data import ( "fmt" @@ -7,6 +7,7 @@ import ( "sync" "github.com/pkg/errors" + "github.com/prologic/bitcask/internal" "golang.org/x/exp/mmap" ) @@ -119,7 +120,7 @@ func (df *Datafile) Size() int64 { return df.offset } -func (df *Datafile) Read() (e Entry, n int64, err error) { +func (df *Datafile) Read() (e internal.Entry, n int64, err error) { df.Lock() defer df.Unlock() @@ -131,7 +132,7 @@ func (df *Datafile) Read() (e Entry, n int64, err error) { return } -func (df *Datafile) ReadAt(index, size int64) (e Entry, err error) { +func (df *Datafile) ReadAt(index, size int64) (e internal.Entry, err error) { var n int b := make([]byte, size) @@ -155,7 +156,7 @@ func (df *Datafile) ReadAt(index, size int64) (e Entry, err error) { return } -func (df *Datafile) Write(e Entry) (int64, int64, error) { +func (df *Datafile) Write(e internal.Entry) (int64, int64, error) { if df.w == nil { return -1, 0, ErrReadonly } diff --git a/internal/codec_index.go b/internal/index/codec_index.go similarity index 78% rename from internal/codec_index.go rename to internal/index/codec_index.go index f163005..2a60a45 100644 --- a/internal/codec_index.go +++ b/internal/index/codec_index.go @@ -1,4 +1,4 @@ -package internal +package index import ( "encoding/binary" @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" art "github.com/plar/go-adaptive-radix-tree" + "github.com/prologic/bitcask/internal" ) var ( @@ -59,21 +60,21 @@ func writeBytes(b []byte, w io.Writer) error { return nil } -func readItem(r io.Reader) (Item, error) { +func readItem(r io.Reader) (internal.Item, error) { buf := make([]byte, (fileIDSize + offsetSize + sizeSize)) _, err := io.ReadFull(r, buf) if err != nil { - return Item{}, errors.Wrap(errTruncatedData, err.Error()) + return internal.Item{}, errors.Wrap(errTruncatedData, err.Error()) } - return Item{ + return internal.Item{ FileID: int(binary.BigEndian.Uint32(buf[:fileIDSize])), Offset: int64(binary.BigEndian.Uint64(buf[fileIDSize:(fileIDSize + offsetSize)])), Size: int64(binary.BigEndian.Uint64(buf[(fileIDSize + offsetSize):])), }, nil } -func writeItem(item Item, w io.Writer) error { +func writeItem(item internal.Item, w io.Writer) error { buf := make([]byte, (fileIDSize + offsetSize + sizeSize)) binary.BigEndian.PutUint32(buf[:fileIDSize], uint32(item.FileID)) binary.BigEndian.PutUint64(buf[fileIDSize:(fileIDSize+offsetSize)], uint64(item.Offset)) @@ -86,7 +87,7 @@ func writeItem(item Item, w io.Writer) error { } // ReadIndex reads a persisted from a io.Reader into a Tree -func ReadIndex(r io.Reader, t art.Tree, maxKeySize int) error { +func readIndex(r io.Reader, t art.Tree, maxKeySize int) error { for { key, err := readKeyBytes(r, maxKeySize) if err != nil { @@ -115,7 +116,7 @@ func WriteIndex(t art.Tree, w io.Writer) (err error) { return false } - item := node.Value().(Item) + item := node.Value().(internal.Item) err := writeItem(item, w) if err != nil { return false @@ -125,3 +126,14 @@ func WriteIndex(t art.Tree, w io.Writer) (err error) { }) return } + +// IsIndexCorruption returns a boolean indicating whether the error +// is known to report a corruption data issue +func IsIndexCorruption(err error) bool { + cause := errors.Cause(err) + switch cause { + case errKeySizeTooLarge, errTruncatedData, errTruncatedKeyData, errTruncatedKeySize: + return true + } + return false +} diff --git a/internal/codec_index_test.go b/internal/index/codec_index_test.go similarity index 89% rename from internal/codec_index_test.go rename to internal/index/codec_index_test.go index f78cc19..5fbb64a 100644 --- a/internal/codec_index_test.go +++ b/internal/index/codec_index_test.go @@ -1,4 +1,4 @@ -package internal +package index import ( "bytes" @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" art "github.com/plar/go-adaptive-radix-tree" + "github.com/prologic/bitcask/internal" ) const ( @@ -36,7 +37,7 @@ func TestReadIndex(t *testing.T) { b := bytes.NewBuffer(sampleTreeBytes) at := art.New() - err := ReadIndex(b, at, 1024) + err := readIndex(b, at, 1024) if err != nil { t.Fatalf("error while deserializing correct sample tree: %v", err) } @@ -74,7 +75,7 @@ func TestReadCorruptedData(t *testing.T) { t.Run(table[i].name, func(t *testing.T) { bf := bytes.NewBuffer(table[i].data) - if err := ReadIndex(bf, art.New(), 1024); errors.Cause(err) != table[i].err { + if err := readIndex(bf, art.New(), 1024); !IsIndexCorruption(err) || errors.Cause(err) != table[i].err { t.Fatalf("expected %v, got %v", table[i].err, err) } }) @@ -103,7 +104,7 @@ func TestReadCorruptedData(t *testing.T) { t.Run(table[i].name, func(t *testing.T) { bf := bytes.NewBuffer(table[i].data) - if err := ReadIndex(bf, art.New(), table[i].maxKeySize); errors.Cause(err) != table[i].err { + if err := readIndex(bf, art.New(), table[i].maxKeySize); !IsIndexCorruption(err) || errors.Cause(err) != table[i].err { t.Fatalf("expected %v, got %v", table[i].err, err) } }) @@ -117,7 +118,7 @@ func getSampleTree() (art.Tree, int) { keys := [][]byte{[]byte("abcd"), []byte("abce"), []byte("abcf"), []byte("abgd")} expectedSerializedSize := 0 for i := range keys { - at.Insert(keys[i], Item{FileID: i, Offset: int64(i), Size: int64(i)}) + at.Insert(keys[i], internal.Item{FileID: i, Offset: int64(i), Size: int64(i)}) expectedSerializedSize += int32Size + len(keys[i]) + fileIDSize + offsetSize + sizeSize } diff --git a/internal/index/index.go b/internal/index/index.go new file mode 100644 index 0000000..fb23edd --- /dev/null +++ b/internal/index/index.go @@ -0,0 +1,27 @@ +package index + +import ( + "os" + "path" + + art "github.com/plar/go-adaptive-radix-tree" + "github.com/prologic/bitcask/internal" +) + +// ReadFromFile reads an index from a persisted file +func ReadFromFile(filePath string, maxKeySize int) (art.Tree, bool, error) { + t := art.New() + if !internal.Exists(path.Join(filePath, "index")) { + return t, false, nil + } + + f, err := os.Open(path.Join(filePath, "index")) + if err != nil { + return t, true, err + } + defer f.Close() + if err := readIndex(f, t, maxKeySize); err != nil { + return t, true, err + } + return t, true, nil +} diff --git a/options.go b/options.go index edd7f67..1c32400 100644 --- a/options.go +++ b/options.go @@ -1,10 +1,6 @@ package bitcask -import ( - "encoding/json" - "io/ioutil" - "path/filepath" -) +import "github.com/prologic/bitcask/internal/config" const ( // DefaultMaxDatafileSize is the default maximum datafile size in bytes @@ -15,87 +11,34 @@ const ( // DefaultMaxValueSize is the default value size in bytes DefaultMaxValueSize = 1 << 16 // 65KB + + // DefaultSync is the default file synchronization action + DefaultSync = false ) // Option is a function that takes a config struct and modifies it -type Option func(*config) error - -type config struct { - maxDatafileSize int - maxKeySize int - maxValueSize int - sync bool -} - -func (c *config) MarshalJSON() ([]byte, error) { - return json.Marshal(struct { - MaxDatafileSize int `json:"max_datafile_size"` - MaxKeySize int `json:"max_key_size"` - MaxValueSize int `json:"max_value_size"` - Sync bool `json:"sync"` - }{ - MaxDatafileSize: c.maxDatafileSize, - MaxKeySize: c.maxKeySize, - MaxValueSize: c.maxValueSize, - Sync: c.sync, - }) -} - -func getConfig(path string) (*config, error) { - type Config struct { - MaxDatafileSize int `json:"max_datafile_size"` - MaxKeySize int `json:"max_key_size"` - MaxValueSize int `json:"max_value_size"` - Sync bool `json:"sync"` - } - - var cfg Config - - data, err := ioutil.ReadFile(filepath.Join(path, "config.json")) - if err != nil { - return nil, err - } - - if err := json.Unmarshal(data, &cfg); err != nil { - return nil, err - } - - return &config{ - maxDatafileSize: cfg.MaxDatafileSize, - maxKeySize: cfg.MaxKeySize, - maxValueSize: cfg.MaxValueSize, - sync: cfg.Sync, - }, nil -} - -func newDefaultConfig() *config { - return &config{ - maxDatafileSize: DefaultMaxDatafileSize, - maxKeySize: DefaultMaxKeySize, - maxValueSize: DefaultMaxValueSize, - } -} +type Option func(*config.Config) error // WithMaxDatafileSize sets the maximum datafile size option func WithMaxDatafileSize(size int) Option { - return func(cfg *config) error { - cfg.maxDatafileSize = size + return func(cfg *config.Config) error { + cfg.MaxDatafileSize = size return nil } } // WithMaxKeySize sets the maximum key size option func WithMaxKeySize(size int) Option { - return func(cfg *config) error { - cfg.maxKeySize = size + return func(cfg *config.Config) error { + cfg.MaxKeySize = size return nil } } // WithMaxValueSize sets the maximum value size option func WithMaxValueSize(size int) Option { - return func(cfg *config) error { - cfg.maxValueSize = size + return func(cfg *config.Config) error { + cfg.MaxValueSize = size return nil } } @@ -103,8 +46,17 @@ func WithMaxValueSize(size int) Option { // WithSync causes Sync() to be called on every key/value written increasing // durability and safety at the expense of performance func WithSync(sync bool) Option { - return func(cfg *config) error { - cfg.sync = sync + return func(cfg *config.Config) error { + cfg.Sync = sync return nil } } + +func newDefaultConfig() *config.Config { + return &config.Config{ + MaxDatafileSize: DefaultMaxDatafileSize, + MaxKeySize: DefaultMaxKeySize, + MaxValueSize: DefaultMaxValueSize, + Sync: DefaultSync, + } +}