bitcask-mirror/bitcask.go

1071 lines
25 KiB
Go

package bitcask
import (
"bytes"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"
"github.com/abcum/lcp"
"github.com/gofrs/flock"
art "github.com/plar/go-adaptive-radix-tree"
"github.com/rs/zerolog/log"
"git.tcp.direct/Mirrors/bitcask-mirror/internal"
"git.tcp.direct/Mirrors/bitcask-mirror/internal/config"
"git.tcp.direct/Mirrors/bitcask-mirror/internal/data"
"git.tcp.direct/Mirrors/bitcask-mirror/internal/data/codec"
"git.tcp.direct/Mirrors/bitcask-mirror/internal/index"
"git.tcp.direct/Mirrors/bitcask-mirror/internal/metadata"
"git.tcp.direct/Mirrors/bitcask-mirror/scripts/migrations"
)
const (
lockfile = "lock"
ttlIndexFile = "ttl_index"
)
// Bitcask is a struct that represents a on-disk LSM and WAL data structure
// and in-memory hash of key/value pairs as per the Bitcask paper and seen
// in the Riak database.
type Bitcask struct {
mu sync.RWMutex
flock *flock.Flock
config *config.Config
options []Option
path string
curr data.Datafile
datafiles map[int]data.Datafile
trie art.Tree
indexer index.Indexer
ttlIndexer index.Indexer
ttlIndex art.Tree
metadata *metadata.MetaData
isMerging bool
}
// Stats is a struct returned by Stats() on an open Bitcask instance
type Stats struct {
Datafiles int
Keys int
Size int64
}
// Stats returns statistics about the database including the number of
// data files, keys and overall size on disk of the data
func (b *Bitcask) Stats() (stats Stats, err error) {
if stats.Size, err = internal.DirSize(b.path); err != nil {
return
}
b.mu.RLock()
stats.Datafiles = len(b.datafiles)
stats.Keys = b.trie.Size()
b.mu.RUnlock()
return
}
// Close closes the database and removes the lock. It is important to call
// Close() as this is the only way to cleanup the lock held by the open
// database.
func (b *Bitcask) Close() error {
b.mu.RLock()
defer func() {
b.mu.RUnlock()
b.flock.Unlock()
}()
return b.close()
}
func (b *Bitcask) close() error {
if err := b.saveIndexes(); err != nil {
return err
}
b.metadata.IndexUpToDate = true
if err := b.saveMetadata(); err != nil {
return err
}
for _, df := range b.datafiles {
if err := df.Close(); err != nil {
return err
}
}
return b.curr.Close()
}
// Sync flushes all buffers to disk ensuring all data is written
func (b *Bitcask) Sync() error {
b.mu.RLock()
defer b.mu.RUnlock()
if err := b.saveMetadata(); err != nil {
return err
}
return b.curr.Sync()
}
// Get fetches value for a key
func (b *Bitcask) Get(key []byte) ([]byte, error) {
b.mu.RLock()
defer b.mu.RUnlock()
e, err := b.get(key)
if err != nil {
return nil, err
}
return e.Value, nil
}
// Has returns true if the key exists in the database, false otherwise.
func (b *Bitcask) Has(key []byte) bool {
b.mu.RLock()
defer b.mu.RUnlock()
_, found := b.trie.Search(key)
if found {
return !b.isExpired(key)
}
return found
}
// Put stores the key and value in the database.
func (b *Bitcask) Put(key, value []byte) error {
if len(key) == 0 {
return ErrEmptyKey
}
if b.config.MaxKeySize > 0 && uint32(len(key)) > b.config.MaxKeySize {
return ErrKeyTooLarge
}
if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize {
return ErrValueTooLarge
}
b.mu.Lock()
defer b.mu.Unlock()
offset, n, err := b.put(key, value)
if err != nil {
return err
}
if b.config.Sync {
if err := b.curr.Sync(); err != nil {
return err
}
}
// in case of successful `put`, IndexUpToDate will be always be false
b.metadata.IndexUpToDate = false
if oldItem, found := b.trie.Search(key); found {
b.metadata.ReclaimableSpace += oldItem.(internal.Item).Size
}
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
b.trie.Insert(key, item)
return nil
}
// PutWithTTL stores the key and value in the database with the given TTL
func (b *Bitcask) PutWithTTL(key, value []byte, ttl time.Duration) error {
if len(key) == 0 {
return ErrEmptyKey
}
if b.config.MaxKeySize > 0 && uint32(len(key)) > b.config.MaxKeySize {
return ErrKeyTooLarge
}
if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize {
return ErrValueTooLarge
}
expiry := time.Now().Add(ttl)
b.mu.Lock()
defer b.mu.Unlock()
offset, n, err := b.putWithExpiry(key, value, expiry)
if err != nil {
return err
}
if b.config.Sync {
if err := b.curr.Sync(); err != nil {
return err
}
}
// in case of successful `put`, IndexUpToDate will be always be false
b.metadata.IndexUpToDate = false
if oldItem, found := b.trie.Search(key); found {
b.metadata.ReclaimableSpace += oldItem.(internal.Item).Size
}
item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
b.trie.Insert(key, item)
b.ttlIndex.Insert(key, expiry)
return nil
}
// Delete deletes the named key.
func (b *Bitcask) Delete(key []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
return b.delete(key)
}
// delete deletes the named key. If the key doesn't exist or an I/O error
// occurs the error is returned.
func (b *Bitcask) delete(key []byte) error {
_, _, err := b.put(key, []byte{})
if err != nil {
return err
}
if item, found := b.trie.Search(key); found {
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(key))
}
b.trie.Delete(key)
b.ttlIndex.Delete(key)
return nil
}
// Sift iterates over all keys in the database calling the function `f` for
// each key. If the KV pair is expired or the function returns true, that key is
// deleted from the database.
// If the function returns an error on any key, no further keys are processed, no
// keys are deleted, and the first error is returned.
func (b *Bitcask) Sift(f func(key []byte) (bool, error)) (err error) {
keysToDelete := art.New()
b.mu.RLock()
b.trie.ForEach(func(node art.Node) bool {
if b.isExpired(node.Key()) {
keysToDelete.Insert(node.Key(), true)
return true
}
var shouldDelete bool
if shouldDelete, err = f(node.Key()); err != nil {
return false
} else if shouldDelete {
keysToDelete.Insert(node.Key(), true)
}
return true
})
b.mu.RUnlock()
if err != nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return
}
// DeleteAll deletes all the keys. If an I/O error occurs the error is returned.
func (b *Bitcask) DeleteAll() (err error) {
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEach(func(node art.Node) bool {
_, _, err = b.put(node.Key(), []byte{})
if err != nil {
return false
}
item, _ := b.trie.Search(node.Key())
b.metadata.ReclaimableSpace += item.(internal.Item).Size + codec.MetaInfoSize + int64(len(node.Key()))
return true
})
b.trie = art.New()
b.ttlIndex = art.New()
return
}
// Scan performs a prefix scan of keys matching the given prefix and calling
// the function `f` with the keys found. If the function returns an error
// no further keys are processed and the first error is returned.
func (b *Bitcask) Scan(prefix []byte, f func(key []byte) error) (err error) {
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
// Skip the root node
if len(node.Key()) == 0 {
return true
}
if err = f(node.Key()); err != nil {
return false
}
return true
})
return
}
// SiftScan iterates over all keys in the database beginning with the given
// prefix, calling the function `f` for each key. If the KV pair is expired or
// the function returns true, that key is deleted from the database.
// If the function returns an error on any key, no further keys are processed,
// no keys are deleted, and the first error is returned.
func (b *Bitcask) SiftScan(prefix []byte, f func(key []byte) (bool, error)) (err error) {
keysToDelete := art.New()
b.mu.RLock()
b.trie.ForEachPrefix(prefix, func(node art.Node) bool {
// Skip the root node
if len(node.Key()) == 0 {
return true
}
if b.isExpired(node.Key()) {
keysToDelete.Insert(node.Key(), true)
return true
}
var shouldDelete bool
if shouldDelete, err = f(node.Key()); err != nil {
return false
} else if shouldDelete {
keysToDelete.Insert(node.Key(), true)
}
return true
})
b.mu.RUnlock()
if err != nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return
}
// Range performs a range scan of keys matching a range of keys between the
// start key and end key and calling the function `f` with the keys found.
// If the function returns an error no further keys are processed and the
// first error returned.
func (b *Bitcask) Range(start, end []byte, f func(key []byte) error) (err error) {
if bytes.Compare(start, end) == 1 {
return ErrInvalidRange
}
commonPrefix := lcp.LCP(start, end)
if commonPrefix == nil {
return ErrInvalidRange
}
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEachPrefix(commonPrefix, func(node art.Node) bool {
if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) <= 0 {
if err = f(node.Key()); err != nil {
return false
}
return true
} else if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) > 0 {
return false
}
return true
})
return
}
// SiftRange performs a range scan of keys matching a range of keys between the
// start key and end key and calling the function `f` with the keys found.
// If the KV pair is expired or the function returns true, that key is deleted
// from the database.
// If the function returns an error on any key, no further keys are processed, no
// keys are deleted, and the first error is returned.
func (b *Bitcask) SiftRange(start, end []byte, f func(key []byte) (bool, error)) (err error) {
if bytes.Compare(start, end) == 1 {
return ErrInvalidRange
}
commonPrefix := lcp.LCP(start, end)
if commonPrefix == nil {
return ErrInvalidRange
}
keysToDelete := art.New()
b.mu.RLock()
b.trie.ForEachPrefix(commonPrefix, func(node art.Node) bool {
if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) <= 0 {
if b.isExpired(node.Key()) {
keysToDelete.Insert(node.Key(), true)
return true
}
var shouldDelete bool
if shouldDelete, err = f(node.Key()); err != nil {
return false
} else if shouldDelete {
keysToDelete.Insert(node.Key(), true)
}
return true
} else if bytes.Compare(node.Key(), start) >= 0 && bytes.Compare(node.Key(), end) > 0 {
return false
}
return true
})
b.mu.RUnlock()
if err != nil {
return
}
b.mu.Lock()
defer b.mu.Unlock()
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return
}
// Len returns the total number of keys in the database
func (b *Bitcask) Len() int {
b.mu.RLock()
defer b.mu.RUnlock()
return b.trie.Size()
}
// Keys returns all keys in the database as a channel of keys
func (b *Bitcask) Keys() chan []byte {
ch := make(chan []byte)
go func() {
b.mu.RLock()
defer b.mu.RUnlock()
for it := b.trie.Iterator(); it.HasNext(); {
node, _ := it.Next()
if b.isExpired(node.Key()) {
continue
}
ch <- node.Key()
}
close(ch)
}()
return ch
}
// RunGC deletes all expired keys
func (b *Bitcask) RunGC() error {
b.mu.Lock()
defer b.mu.Unlock()
return b.runGC()
}
// runGC deletes all keys that are expired
// caller function should take care of the locking when calling this method
func (b *Bitcask) runGC() (err error) {
keysToDelete := art.New()
b.ttlIndex.ForEach(func(node art.Node) (cont bool) {
if !b.isExpired(node.Key()) {
// later, return false here when the ttlIndex is sorted
return true
}
keysToDelete.Insert(node.Key(), true)
//keysToDelete = append(keysToDelete, node.Key())
return true
})
keysToDelete.ForEach(func(node art.Node) (cont bool) {
b.delete(node.Key())
return true
})
return nil
}
// Fold iterates over all keys in the database calling the function `f` for
// each key. If the function returns an error, no further keys are processed
// and the error is returned.
func (b *Bitcask) Fold(f func(key []byte) error) (err error) {
b.mu.RLock()
defer b.mu.RUnlock()
b.trie.ForEach(func(node art.Node) bool {
if err = f(node.Key()); err != nil {
return false
}
return true
})
return
}
// get retrieves the value of the given key
func (b *Bitcask) get(key []byte) (internal.Entry, error) {
var df data.Datafile
value, found := b.trie.Search(key)
if !found {
return internal.Entry{}, ErrKeyNotFound
}
if b.isExpired(key) {
return internal.Entry{}, ErrKeyExpired
}
item := value.(internal.Item)
if item.FileID == b.curr.FileID() {
df = b.curr
} else {
df = b.datafiles[item.FileID]
}
e, err := df.ReadAt(item.Offset, item.Size)
if err != nil {
return internal.Entry{}, err
}
checksum := crc32.ChecksumIEEE(e.Value)
if checksum != e.Checksum {
return internal.Entry{}, ErrChecksumFailed
}
return e, nil
}
func (b *Bitcask) maybeRotate() error {
size := b.curr.Size()
if size < int64(b.config.MaxDatafileSize) {
return nil
}
err := b.curr.Close()
if err != nil {
return err
}
id := b.curr.FileID()
df, err := data.NewDatafile(
b.path, id, true,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
b.datafiles[id] = df
id = b.curr.FileID() + 1
curr, err := data.NewDatafile(
b.path, id, false,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
b.curr = curr
err = b.saveIndexes()
if err != nil {
return err
}
return nil
}
// put inserts a new (key, value). Both key and value are valid inputs.
func (b *Bitcask) put(key, value []byte) (int64, int64, error) {
if err := b.maybeRotate(); err != nil {
return -1, 0, fmt.Errorf("error rotating active datafile: %w", err)
}
return b.curr.Write(internal.NewEntry(key, value, nil))
}
// putWithExpiry inserts a new (key, value, expiry).
// Both key and value are valid inputs.
func (b *Bitcask) putWithExpiry(key, value []byte, expiry time.Time) (int64, int64, error) {
if err := b.maybeRotate(); err != nil {
return -1, 0, fmt.Errorf("error rotating active datafile: %w", err)
}
return b.curr.Write(internal.NewEntry(key, value, &expiry))
}
// closeCurrentFile closes current datafile and makes it read only.
func (b *Bitcask) closeCurrentFile() error {
if err := b.curr.Close(); err != nil {
return err
}
id := b.curr.FileID()
df, err := data.NewDatafile(
b.path, id, true,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
b.datafiles[id] = df
return nil
}
// openNewWritableFile opens new datafile for writing data
func (b *Bitcask) openNewWritableFile() error {
id := b.curr.FileID() + 1
curr, err := data.NewDatafile(
b.path, id, false,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
b.curr = curr
return nil
}
// Reopen closes and reopsns the database
func (b *Bitcask) Reopen() error {
b.mu.Lock()
defer b.mu.Unlock()
return b.reopen()
}
// reopen reloads a bitcask object with index and datafiles
// caller of this method should take care of locking
func (b *Bitcask) reopen() error {
datafiles, lastID, err := loadDatafiles(
b.path,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
t, ttlIndex, err := loadIndexes(b, datafiles, lastID)
if err != nil {
return err
}
curr, err := data.NewDatafile(
b.path, lastID, false,
b.config.MaxKeySize,
b.config.MaxValueSize,
b.config.FileFileModeBeforeUmask,
)
if err != nil {
return err
}
b.trie = t
b.curr = curr
b.ttlIndex = ttlIndex
b.datafiles = datafiles
return nil
}
// Merge merges all datafiles in the database. Old keys are squashed
// and deleted keys removes. Duplicate key/value pairs are also removed.
// Call this function periodically to reclaim disk space.
func (b *Bitcask) Merge() error {
b.mu.Lock()
if b.isMerging {
b.mu.Unlock()
return ErrMergeInProgress
}
b.isMerging = true
b.mu.Unlock()
defer func() {
b.isMerging = false
}()
b.mu.RLock()
err := b.closeCurrentFile()
if err != nil {
b.mu.RUnlock()
return err
}
filesToMerge := make([]int, 0, len(b.datafiles))
for k := range b.datafiles {
filesToMerge = append(filesToMerge, k)
}
err = b.openNewWritableFile()
if err != nil {
b.mu.RUnlock()
return err
}
b.mu.RUnlock()
sort.Ints(filesToMerge)
// Temporary merged database path
temp, err := ioutil.TempDir(b.path, "merge")
if err != nil {
return err
}
defer os.RemoveAll(temp)
// Create a merged database
mdb, err := Open(temp, withConfig(b.config))
if err != nil {
return err
}
// Rewrite all key/value pairs into merged database
// Doing this automatically strips deleted keys and
// old key/value pairs
err = b.Fold(func(key []byte) error {
item, _ := b.trie.Search(key)
// if key was updated after start of merge operation, nothing to do
if item.(internal.Item).FileID > filesToMerge[len(filesToMerge)-1] {
return nil
}
e, err := b.get(key)
if err != nil {
return err
}
if e.Expiry != nil {
if err := mdb.PutWithTTL(key, e.Value, time.Until(*e.Expiry)); err != nil {
return err
}
} else {
if err := mdb.Put(key, e.Value); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
if err = mdb.Close(); err != nil {
return err
}
// no reads and writes till we reopen
b.mu.Lock()
defer b.mu.Unlock()
if err = b.close(); err != nil {
return err
}
// Remove data files
files, err := ioutil.ReadDir(b.path)
if err != nil {
return err
}
for _, file := range files {
if file.IsDir() || file.Name() == lockfile {
continue
}
ids, err := internal.ParseIds([]string{file.Name()})
if err != nil {
return err
}
// if datafile was created after start of merge, skip
if len(ids) > 0 && ids[0] > filesToMerge[len(filesToMerge)-1] {
continue
}
err = os.RemoveAll(path.Join(b.path, file.Name()))
if err != nil {
return err
}
}
// Rename all merged data files
files, err = ioutil.ReadDir(mdb.path)
if err != nil {
return err
}
for _, file := range files {
// see #225
if file.Name() == lockfile {
continue
}
err := os.Rename(
path.Join([]string{mdb.path, file.Name()}...),
path.Join([]string{b.path, file.Name()}...),
)
if err != nil {
return err
}
}
b.metadata.ReclaimableSpace = 0
// And finally reopen the database
return b.reopen()
}
// Open opens the database at the given path with optional options.
// Options can be provided with the `WithXXX` functions that provide
// configuration options as functions.
func Open(path string, options ...Option) (*Bitcask, error) {
var (
cfg *config.Config
err error
meta *metadata.MetaData
)
configPath := filepath.Join(path, "config.json")
if internal.Exists(configPath) {
cfg, err = config.Load(configPath)
if err != nil {
return nil, &ErrBadConfig{err}
}
} else {
cfg = newDefaultConfig()
}
if err := checkAndUpgrade(cfg, configPath); err != nil {
return nil, err
}
for _, opt := range options {
if err := opt(cfg); err != nil {
return nil, err
}
}
if err := os.MkdirAll(path, cfg.DirFileModeBeforeUmask); err != nil {
return nil, err
}
meta, err = loadMetadata(path)
if err != nil {
return nil, &ErrBadMetadata{err}
}
bitcask := &Bitcask{
flock: flock.New(filepath.Join(path, lockfile)),
config: cfg,
options: options,
path: path,
indexer: index.NewIndexer(),
ttlIndexer: index.NewTTLIndexer(),
metadata: meta,
}
ok, err := bitcask.flock.TryLock()
if err != nil {
return nil, err
}
if !ok {
return nil, ErrDatabaseLocked
}
if err := cfg.Save(configPath); err != nil {
return nil, err
}
if cfg.AutoRecovery {
if err := data.CheckAndRecover(path, cfg); err != nil {
return nil, fmt.Errorf("recovering database: %s", err)
}
}
if err := bitcask.Reopen(); err != nil {
return nil, err
}
return bitcask, nil
}
// checkAndUpgrade checks if DB upgrade is required
// if yes, then applies version upgrade and saves updated config
func checkAndUpgrade(cfg *config.Config, configPath string) error {
if cfg.DBVersion == CurrentDBVersion {
return nil
}
if cfg.DBVersion > CurrentDBVersion {
return ErrInvalidVersion
}
// for v0 to v1 upgrade, we need to append 8 null bytes after each encoded entry in datafiles
if cfg.DBVersion == uint32(0) && CurrentDBVersion == uint32(1) {
log.Warn().Msg("upgrading db version, might take some time....")
cfg.DBVersion = CurrentDBVersion
return migrations.ApplyV0ToV1(filepath.Dir(configPath), cfg.MaxDatafileSize)
}
return nil
}
// Backup copies db directory to given path
// it creates path if it does not exist
func (b *Bitcask) Backup(path string) error {
if !internal.Exists(path) {
if err := os.MkdirAll(path, b.config.DirFileModeBeforeUmask); err != nil {
return err
}
}
return internal.Copy(b.path, path, []string{lockfile})
}
// saveIndex saves index and ttl_index currently in RAM to disk
func (b *Bitcask) saveIndexes() error {
tempIdx := "temp_index"
if err := b.indexer.Save(b.trie, filepath.Join(b.path, tempIdx)); err != nil {
return err
}
if err := os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, "index")); err != nil {
return err
}
if err := b.ttlIndexer.Save(b.ttlIndex, filepath.Join(b.path, tempIdx)); err != nil {
return err
}
return os.Rename(filepath.Join(b.path, tempIdx), filepath.Join(b.path, ttlIndexFile))
}
// saveMetadata saves metadata into disk
func (b *Bitcask) saveMetadata() error {
return b.metadata.Save(filepath.Join(b.path, "meta.json"), b.config.DirFileModeBeforeUmask)
}
// Reclaimable returns space that can be reclaimed
func (b *Bitcask) Reclaimable() int64 {
return b.metadata.ReclaimableSpace
}
// isExpired returns true if a key has expired
// it returns false if key does not exist in ttl index
func (b *Bitcask) isExpired(key []byte) bool {
expiry, found := b.ttlIndex.Search(key)
if !found {
return false
}
return expiry.(time.Time).Before(time.Now().UTC())
}
func loadDatafiles(path string, maxKeySize uint32, maxValueSize uint64, fileModeBeforeUmask os.FileMode) (datafiles map[int]data.Datafile, lastID int, err error) {
fns, err := internal.GetDatafiles(path)
if err != nil {
return nil, 0, err
}
ids, err := internal.ParseIds(fns)
if err != nil {
return nil, 0, err
}
datafiles = make(map[int]data.Datafile, len(ids))
for _, id := range ids {
datafiles[id], err = data.NewDatafile(
path, id, true,
maxKeySize,
maxValueSize,
fileModeBeforeUmask,
)
if err != nil {
return
}
}
if len(ids) > 0 {
lastID = ids[len(ids)-1]
}
return
}
func getSortedDatafiles(datafiles map[int]data.Datafile) []data.Datafile {
out := make([]data.Datafile, len(datafiles))
idx := 0
for _, df := range datafiles {
out[idx] = df
idx++
}
sort.Slice(out, func(i, j int) bool {
return out[i].FileID() < out[j].FileID()
})
return out
}
// loadIndexes loads index from disk to memory. If index is not available or partially available (last bitcask process crashed)
// then it iterates over last datafile and construct index
// we construct ttl_index here also along with normal index
func loadIndexes(b *Bitcask, datafiles map[int]data.Datafile, lastID int) (art.Tree, art.Tree, error) {
t, found, err := b.indexer.Load(filepath.Join(b.path, "index"), b.config.MaxKeySize)
if err != nil {
return nil, nil, err
}
ttlIndex, _, err := b.ttlIndexer.Load(filepath.Join(b.path, ttlIndexFile), b.config.MaxKeySize)
if err != nil {
return nil, nil, err
}
if found && b.metadata.IndexUpToDate {
return t, ttlIndex, nil
}
if found {
if err := loadIndexFromDatafile(t, ttlIndex, datafiles[lastID]); err != nil {
return nil, ttlIndex, err
}
return t, ttlIndex, nil
}
sortedDatafiles := getSortedDatafiles(datafiles)
for _, df := range sortedDatafiles {
if err := loadIndexFromDatafile(t, ttlIndex, df); err != nil {
return nil, ttlIndex, err
}
}
return t, ttlIndex, nil
}
func loadIndexFromDatafile(t art.Tree, ttlIndex art.Tree, df data.Datafile) error {
var offset int64
for {
e, n, err := df.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}
// Tombstone value (deleted key)
if len(e.Value) == 0 {
t.Delete(e.Key)
offset += n
continue
}
item := internal.Item{FileID: df.FileID(), Offset: offset, Size: n}
t.Insert(e.Key, item)
if e.Expiry != nil {
ttlIndex.Insert(e.Key, *e.Expiry)
}
offset += n
}
return nil
}
func loadMetadata(path string) (*metadata.MetaData, error) {
if !internal.Exists(filepath.Join(path, "meta.json")) {
meta := new(metadata.MetaData)
return meta, nil
}
return metadata.Load(filepath.Join(path, "meta.json"))
}