cmd/bitcask: recovery tool (#92)

* cmd/bitcask: recovery tool

* refactor configuration & use it in recover tool
This commit is contained in:
Ignacio Hagopian 2019-09-06 18:57:30 -03:00 committed by James Mills
parent f4fb4972ee
commit 0d3a9213ed
9 changed files with 228 additions and 120 deletions

@ -14,6 +14,9 @@ import (
"github.com/gofrs/flock"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
"github.com/prologic/bitcask/internal/config"
"github.com/prologic/bitcask/internal/data"
"github.com/prologic/bitcask/internal/index"
)
var (
@ -45,11 +48,11 @@ type Bitcask struct {
*flock.Flock
config *config
config *config.Config
options []Option
path string
curr *internal.Datafile
datafiles map[int]*internal.Datafile
curr *data.Datafile
datafiles map[int]*data.Datafile
trie art.Tree
}
@ -94,7 +97,7 @@ func (b *Bitcask) Close() error {
}
defer f.Close()
if err := internal.WriteIndex(b.trie, f); err != nil {
if err := index.WriteIndex(b.trie, f); err != nil {
return err
}
if err := f.Sync(); err != nil {
@ -118,7 +121,7 @@ func (b *Bitcask) Sync() error {
// Get retrieves the value of the given key. If the key is not found or an/I/O
// error occurs a null byte slice is returned along with the error.
func (b *Bitcask) Get(key []byte) ([]byte, error) {
var df *internal.Datafile
var df *data.Datafile
b.mu.RLock()
value, found := b.trie.Search(key)
@ -158,10 +161,10 @@ func (b *Bitcask) Has(key []byte) bool {
// Put stores the key and value in the database.
func (b *Bitcask) Put(key, value []byte) error {
if len(key) > b.config.maxKeySize {
if len(key) > b.config.MaxKeySize {
return ErrKeyTooLarge
}
if len(value) > b.config.maxValueSize {
if len(value) > b.config.MaxValueSize {
return ErrValueTooLarge
}
@ -170,7 +173,7 @@ func (b *Bitcask) Put(key, value []byte) error {
return err
}
if b.config.sync {
if b.config.Sync {
if err := b.curr.Sync(); err != nil {
return err
}
@ -269,7 +272,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
defer b.mu.Unlock()
size := b.curr.Size()
if size >= int64(b.config.maxDatafileSize) {
if size >= int64(b.config.MaxDatafileSize) {
err := b.curr.Close()
if err != nil {
return -1, 0, err
@ -277,7 +280,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
id := b.curr.FileID()
df, err := internal.NewDatafile(b.path, id, true)
df, err := data.NewDatafile(b.path, id, true)
if err != nil {
return -1, 0, err
}
@ -285,7 +288,7 @@ func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
b.datafiles[id] = df
id = b.curr.FileID() + 1
curr, err := internal.NewDatafile(b.path, id, false)
curr, err := data.NewDatafile(b.path, id, false)
if err != nil {
return -1, 0, err
}
@ -318,29 +321,21 @@ func (b *Bitcask) reopen() error {
return err
}
datafiles := make(map[int]*internal.Datafile, len(ids))
datafiles := make(map[int]*data.Datafile, len(ids))
for _, id := range ids {
df, err := internal.NewDatafile(b.path, id, true)
df, err := data.NewDatafile(b.path, id, true)
if err != nil {
return err
}
datafiles[id] = df
}
t := art.New()
if internal.Exists(path.Join(b.path, "index")) {
f, err := os.Open(path.Join(b.path, "index"))
if err != nil {
return err
}
defer f.Close()
if err := internal.ReadIndex(f, t, b.config.maxKeySize); err != nil {
return err
}
} else {
t, found, err := index.ReadFromFile(b.path, b.config.MaxKeySize)
if err != nil {
return err
}
if !found {
for i, df := range datafiles {
var offset int64
for {
@ -358,7 +353,6 @@ func (b *Bitcask) reopen() error {
offset += n
continue
}
item := internal.Item{FileID: ids[i], Offset: offset, Size: n}
t.Insert(e.Key, item)
offset += n
@ -371,7 +365,7 @@ func (b *Bitcask) reopen() error {
id = ids[(len(ids) - 1)]
}
curr, err := internal.NewDatafile(b.path, id, false)
curr, err := data.NewDatafile(b.path, id, false)
if err != nil {
return err
}
@ -468,7 +462,7 @@ func (b *Bitcask) Merge() error {
// configuration options as functions.
func Open(path string, options ...Option) (*Bitcask, error) {
var (
cfg *config
cfg *config.Config
err error
)
@ -476,7 +470,7 @@ func Open(path string, options ...Option) (*Bitcask, error) {
return nil, err
}
cfg, err = getConfig(path)
cfg, err = config.Decode(path)
if err != nil {
cfg = newDefaultConfig()
}

84
cmd/bitcask/recover.go Normal file

@ -0,0 +1,84 @@
package main
import (
"os"
"github.com/prologic/bitcask"
"github.com/prologic/bitcask/internal/config"
"github.com/prologic/bitcask/internal/index"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var recoveryCmd = &cobra.Command{
Use: "recover",
Aliases: []string{"recovery"},
Short: "Analyzes and recovers the index file for corruption scenarios",
Long: `This analyze files to detect different forms of persistence corruption in
persisted files. It also allows to recover the files to the latest point of integrity.
Recovered files have the .recovered extension`,
Args: cobra.ExactArgs(0),
PreRun: func(cmd *cobra.Command, args []string) {
viper.BindPFlag("dry-run", cmd.Flags().Lookup("dry-run"))
},
Run: func(cmd *cobra.Command, args []string) {
path := viper.GetString("path")
dryRun := viper.GetBool("dry-run")
os.Exit(recover(path, dryRun))
},
}
func init() {
RootCmd.AddCommand(recoveryCmd)
recoveryCmd.Flags().BoolP("dry-run", "n", false, "Will only check files health without applying recovery if unhealthy")
}
func recover(path string, dryRun bool) int {
maxKeySize := bitcask.DefaultMaxKeySize
if cfg, err := config.Decode(path); err == nil {
maxKeySize = cfg.MaxKeySize
}
t, found, err := index.ReadFromFile(path, maxKeySize)
if err != nil && !index.IsIndexCorruption(err) {
log.WithError(err).Info("error while opening the index file")
}
if !found {
log.Info("index file doesn't exist, will be recreated on next run.")
return 0
}
if err == nil {
log.Debug("index file is not corrupted")
return 0
}
log.Debugf("index file is corrupted: %v", err)
if dryRun {
log.Debug("dry-run mode, not writing to a file")
return 0
}
fi, err := os.OpenFile("index.recovered", os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
log.WithError(err).Info("error while creating recovered index file")
return 1
}
// Leverage that t has the partiatially read tree even on corrupted files
err = index.WriteIndex(t, fi)
if err != nil {
log.WithError(err).Info("error while writing the recovered index file")
fi.Close()
return 1
}
err = fi.Close()
if err != nil {
log.WithError(err).Info("the recovered file index coudn't be saved correctly")
}
log.Debug("the index was recovered in the index.recovered new file")
return 0
}

36
internal/config/config.go Normal file

@ -0,0 +1,36 @@
package config
import (
"encoding/json"
"io/ioutil"
"path/filepath"
)
// Config contains the bitcask configuration parameters
type Config struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
Sync bool `json:"sync"`
}
// Decode decodes a serialized configuration
func Decode(path string) (*Config, error) {
var cfg Config
data, err := ioutil.ReadFile(filepath.Join(path, "config.json"))
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}
// Encode encodes the configuration for storage
func (c *Config) Encode() ([]byte, error) {
return json.Marshal(c)
}

@ -1,4 +1,4 @@
package internal
package data
import (
"bufio"
@ -6,6 +6,7 @@ import (
"io"
"github.com/pkg/errors"
"github.com/prologic/bitcask/internal"
)
const (
@ -27,7 +28,7 @@ type Encoder struct {
// Encode takes any Entry and streams it to the underlying writer.
// Messages are framed with a key-length and value-length prefix.
func (e *Encoder) Encode(msg Entry) (int64, error) {
func (e *Encoder) Encode(msg internal.Entry) (int64, error) {
var bufKeyValue = make([]byte, KeySize+ValueSize)
binary.BigEndian.PutUint32(bufKeyValue[:KeySize], uint32(len(msg.Key)))
binary.BigEndian.PutUint64(bufKeyValue[KeySize:KeySize+ValueSize], uint64(len(msg.Value)))
@ -66,7 +67,7 @@ type Decoder struct {
r io.Reader
}
func (d *Decoder) Decode(v *Entry) (int64, error) {
func (d *Decoder) Decode(v *internal.Entry) (int64, error) {
prefixBuf := make([]byte, KeySize+ValueSize)
_, err := io.ReadFull(d.r, prefixBuf)
@ -91,7 +92,7 @@ func GetKeyValueSizes(buf []byte) (uint64, uint64) {
return uint64(actualKeySize), actualValueSize
}
func DecodeWithoutPrefix(buf []byte, valueOffset uint64, v *Entry) {
func DecodeWithoutPrefix(buf []byte, valueOffset uint64, v *internal.Entry) {
v.Key = buf[:valueOffset]
v.Value = buf[valueOffset : len(buf)-checksumSize]
v.Checksum = binary.BigEndian.Uint32(buf[len(buf)-checksumSize:])

@ -1,4 +1,4 @@
package internal
package data
import (
"fmt"
@ -7,6 +7,7 @@ import (
"sync"
"github.com/pkg/errors"
"github.com/prologic/bitcask/internal"
"golang.org/x/exp/mmap"
)
@ -119,7 +120,7 @@ func (df *Datafile) Size() int64 {
return df.offset
}
func (df *Datafile) Read() (e Entry, n int64, err error) {
func (df *Datafile) Read() (e internal.Entry, n int64, err error) {
df.Lock()
defer df.Unlock()
@ -131,7 +132,7 @@ func (df *Datafile) Read() (e Entry, n int64, err error) {
return
}
func (df *Datafile) ReadAt(index, size int64) (e Entry, err error) {
func (df *Datafile) ReadAt(index, size int64) (e internal.Entry, err error) {
var n int
b := make([]byte, size)
@ -155,7 +156,7 @@ func (df *Datafile) ReadAt(index, size int64) (e Entry, err error) {
return
}
func (df *Datafile) Write(e Entry) (int64, int64, error) {
func (df *Datafile) Write(e internal.Entry) (int64, int64, error) {
if df.w == nil {
return -1, 0, ErrReadonly
}

@ -1,4 +1,4 @@
package internal
package index
import (
"encoding/binary"
@ -6,6 +6,7 @@ import (
"github.com/pkg/errors"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
var (
@ -59,21 +60,21 @@ func writeBytes(b []byte, w io.Writer) error {
return nil
}
func readItem(r io.Reader) (Item, error) {
func readItem(r io.Reader) (internal.Item, error) {
buf := make([]byte, (fileIDSize + offsetSize + sizeSize))
_, err := io.ReadFull(r, buf)
if err != nil {
return Item{}, errors.Wrap(errTruncatedData, err.Error())
return internal.Item{}, errors.Wrap(errTruncatedData, err.Error())
}
return Item{
return internal.Item{
FileID: int(binary.BigEndian.Uint32(buf[:fileIDSize])),
Offset: int64(binary.BigEndian.Uint64(buf[fileIDSize:(fileIDSize + offsetSize)])),
Size: int64(binary.BigEndian.Uint64(buf[(fileIDSize + offsetSize):])),
}, nil
}
func writeItem(item Item, w io.Writer) error {
func writeItem(item internal.Item, w io.Writer) error {
buf := make([]byte, (fileIDSize + offsetSize + sizeSize))
binary.BigEndian.PutUint32(buf[:fileIDSize], uint32(item.FileID))
binary.BigEndian.PutUint64(buf[fileIDSize:(fileIDSize+offsetSize)], uint64(item.Offset))
@ -86,7 +87,7 @@ func writeItem(item Item, w io.Writer) error {
}
// ReadIndex reads a persisted from a io.Reader into a Tree
func ReadIndex(r io.Reader, t art.Tree, maxKeySize int) error {
func readIndex(r io.Reader, t art.Tree, maxKeySize int) error {
for {
key, err := readKeyBytes(r, maxKeySize)
if err != nil {
@ -115,7 +116,7 @@ func WriteIndex(t art.Tree, w io.Writer) (err error) {
return false
}
item := node.Value().(Item)
item := node.Value().(internal.Item)
err := writeItem(item, w)
if err != nil {
return false
@ -125,3 +126,14 @@ func WriteIndex(t art.Tree, w io.Writer) (err error) {
})
return
}
// IsIndexCorruption returns a boolean indicating whether the error
// is known to report a corruption data issue
func IsIndexCorruption(err error) bool {
cause := errors.Cause(err)
switch cause {
case errKeySizeTooLarge, errTruncatedData, errTruncatedKeyData, errTruncatedKeySize:
return true
}
return false
}

@ -1,4 +1,4 @@
package internal
package index
import (
"bytes"
@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
const (
@ -36,7 +37,7 @@ func TestReadIndex(t *testing.T) {
b := bytes.NewBuffer(sampleTreeBytes)
at := art.New()
err := ReadIndex(b, at, 1024)
err := readIndex(b, at, 1024)
if err != nil {
t.Fatalf("error while deserializing correct sample tree: %v", err)
}
@ -74,7 +75,7 @@ func TestReadCorruptedData(t *testing.T) {
t.Run(table[i].name, func(t *testing.T) {
bf := bytes.NewBuffer(table[i].data)
if err := ReadIndex(bf, art.New(), 1024); errors.Cause(err) != table[i].err {
if err := readIndex(bf, art.New(), 1024); !IsIndexCorruption(err) || errors.Cause(err) != table[i].err {
t.Fatalf("expected %v, got %v", table[i].err, err)
}
})
@ -103,7 +104,7 @@ func TestReadCorruptedData(t *testing.T) {
t.Run(table[i].name, func(t *testing.T) {
bf := bytes.NewBuffer(table[i].data)
if err := ReadIndex(bf, art.New(), table[i].maxKeySize); errors.Cause(err) != table[i].err {
if err := readIndex(bf, art.New(), table[i].maxKeySize); !IsIndexCorruption(err) || errors.Cause(err) != table[i].err {
t.Fatalf("expected %v, got %v", table[i].err, err)
}
})
@ -117,7 +118,7 @@ func getSampleTree() (art.Tree, int) {
keys := [][]byte{[]byte("abcd"), []byte("abce"), []byte("abcf"), []byte("abgd")}
expectedSerializedSize := 0
for i := range keys {
at.Insert(keys[i], Item{FileID: i, Offset: int64(i), Size: int64(i)})
at.Insert(keys[i], internal.Item{FileID: i, Offset: int64(i), Size: int64(i)})
expectedSerializedSize += int32Size + len(keys[i]) + fileIDSize + offsetSize + sizeSize
}

27
internal/index/index.go Normal file

@ -0,0 +1,27 @@
package index
import (
"os"
"path"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/prologic/bitcask/internal"
)
// ReadFromFile reads an index from a persisted file
func ReadFromFile(filePath string, maxKeySize int) (art.Tree, bool, error) {
t := art.New()
if !internal.Exists(path.Join(filePath, "index")) {
return t, false, nil
}
f, err := os.Open(path.Join(filePath, "index"))
if err != nil {
return t, true, err
}
defer f.Close()
if err := readIndex(f, t, maxKeySize); err != nil {
return t, true, err
}
return t, true, nil
}

@ -1,10 +1,6 @@
package bitcask
import (
"encoding/json"
"io/ioutil"
"path/filepath"
)
import "github.com/prologic/bitcask/internal/config"
const (
// DefaultMaxDatafileSize is the default maximum datafile size in bytes
@ -15,87 +11,34 @@ const (
// DefaultMaxValueSize is the default value size in bytes
DefaultMaxValueSize = 1 << 16 // 65KB
// DefaultSync is the default file synchronization action
DefaultSync = false
)
// Option is a function that takes a config struct and modifies it
type Option func(*config) error
type config struct {
maxDatafileSize int
maxKeySize int
maxValueSize int
sync bool
}
func (c *config) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
Sync bool `json:"sync"`
}{
MaxDatafileSize: c.maxDatafileSize,
MaxKeySize: c.maxKeySize,
MaxValueSize: c.maxValueSize,
Sync: c.sync,
})
}
func getConfig(path string) (*config, error) {
type Config struct {
MaxDatafileSize int `json:"max_datafile_size"`
MaxKeySize int `json:"max_key_size"`
MaxValueSize int `json:"max_value_size"`
Sync bool `json:"sync"`
}
var cfg Config
data, err := ioutil.ReadFile(filepath.Join(path, "config.json"))
if err != nil {
return nil, err
}
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, err
}
return &config{
maxDatafileSize: cfg.MaxDatafileSize,
maxKeySize: cfg.MaxKeySize,
maxValueSize: cfg.MaxValueSize,
sync: cfg.Sync,
}, nil
}
func newDefaultConfig() *config {
return &config{
maxDatafileSize: DefaultMaxDatafileSize,
maxKeySize: DefaultMaxKeySize,
maxValueSize: DefaultMaxValueSize,
}
}
type Option func(*config.Config) error
// WithMaxDatafileSize sets the maximum datafile size option
func WithMaxDatafileSize(size int) Option {
return func(cfg *config) error {
cfg.maxDatafileSize = size
return func(cfg *config.Config) error {
cfg.MaxDatafileSize = size
return nil
}
}
// WithMaxKeySize sets the maximum key size option
func WithMaxKeySize(size int) Option {
return func(cfg *config) error {
cfg.maxKeySize = size
return func(cfg *config.Config) error {
cfg.MaxKeySize = size
return nil
}
}
// WithMaxValueSize sets the maximum value size option
func WithMaxValueSize(size int) Option {
return func(cfg *config) error {
cfg.maxValueSize = size
return func(cfg *config.Config) error {
cfg.MaxValueSize = size
return nil
}
}
@ -103,8 +46,17 @@ func WithMaxValueSize(size int) Option {
// WithSync causes Sync() to be called on every key/value written increasing
// durability and safety at the expense of performance
func WithSync(sync bool) Option {
return func(cfg *config) error {
cfg.sync = sync
return func(cfg *config.Config) error {
cfg.Sync = sync
return nil
}
}
func newDefaultConfig() *config.Config {
return &config.Config{
MaxDatafileSize: DefaultMaxDatafileSize,
MaxKeySize: DefaultMaxKeySize,
MaxValueSize: DefaultMaxValueSize,
Sync: DefaultSync,
}
}