bitcask-mirror/scripts/migrations/v0_to_v1.go

160 lines
3.3 KiB
Go

package migrations
import (
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"git.tcp.direct/Mirrors/bitcask-mirror/internal"
)
const (
keySize = 4
valueSize = 8
checksumSize = 4
ttlSize = 8
defaultDatafileFilename = "%09d.data"
)
func ApplyV0ToV1(dir string, maxDatafileSize int) error {
temp, err := prepare(dir)
if err != nil {
return err
}
defer os.RemoveAll(temp)
err = apply(dir, temp, maxDatafileSize)
if err != nil {
return err
}
return cleanup(dir, temp)
}
func prepare(dir string) (string, error) {
return ioutil.TempDir(dir, "migration")
}
func apply(dir, temp string, maxDatafileSize int) error {
datafilesPath, err := internal.GetDatafiles(dir)
if err != nil {
return err
}
var id, newOffset int
datafile, err := getNewDatafile(temp, id)
if err != nil {
return err
}
id++
for _, p := range datafilesPath {
df, err := os.Open(p)
if err != nil {
return err
}
var off int64
for {
entry, err := getSingleEntry(df, off)
if err == io.EOF {
break
}
if err != nil {
return err
}
if newOffset+len(entry) > maxDatafileSize {
err = datafile.Sync()
if err != nil {
return err
}
datafile, err = getNewDatafile(temp, id)
if err != nil {
return err
}
id++
newOffset = 0
}
newEntry := make([]byte, len(entry)+ttlSize)
copy(newEntry[:len(entry)], entry)
n, err := datafile.Write(newEntry)
if err != nil {
return err
}
newOffset += n
off += int64(len(entry))
}
}
return datafile.Sync()
}
func cleanup(dir, temp string) error {
files, err := ioutil.ReadDir(dir)
if err != nil {
return err
}
for _, file := range files {
if !file.IsDir() {
err := os.RemoveAll(path.Join([]string{dir, file.Name()}...))
if err != nil {
return err
}
}
}
files, err = ioutil.ReadDir(temp)
if err != nil {
return err
}
for _, file := range files {
err := os.Rename(
path.Join([]string{temp, file.Name()}...),
path.Join([]string{dir, file.Name()}...),
)
if err != nil {
return err
}
}
return nil
}
func getNewDatafile(path string, id int) (*os.File, error) {
fn := filepath.Join(path, fmt.Sprintf(defaultDatafileFilename, id))
return os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
}
func getSingleEntry(f *os.File, offset int64) ([]byte, error) {
prefixBuf, err := readPrefix(f, offset)
if err != nil {
return nil, err
}
actualKeySize, actualValueSize := getKeyValueSize(prefixBuf)
entryBuf, err := read(f, uint64(actualKeySize)+actualValueSize+checksumSize, offset+keySize+valueSize)
if err != nil {
return nil, err
}
return append(prefixBuf, entryBuf...), nil
}
func readPrefix(f *os.File, offset int64) ([]byte, error) {
prefixBuf := make([]byte, keySize+valueSize)
_, err := f.ReadAt(prefixBuf, offset)
if err != nil {
return nil, err
}
return prefixBuf, nil
}
func read(f *os.File, bufSize uint64, offset int64) ([]byte, error) {
buf := make([]byte, bufSize)
_, err := f.ReadAt(buf, offset)
if err != nil {
return nil, err
}
return buf, nil
}
func getKeyValueSize(buf []byte) (uint32, uint64) {
actualKeySize := binary.BigEndian.Uint32(buf[:keySize])
actualValueSize := binary.BigEndian.Uint64(buf[keySize:])
return actualKeySize, actualValueSize
}