commit a42cd20ddf0ec00266189c1f87891d7a776d1ac8 Author: James Mills <1290234+prologic@users.noreply.github.com> Date: Sat Mar 9 22:41:59 2019 +1000 Initial Commit diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..d8fc6ae --- /dev/null +++ b/.drone.yml @@ -0,0 +1,22 @@ +kind: pipeline +name: default + +steps: + - name: build + image: golang:latest + commands: + - go test -v -short -cover -coverprofile=coverage.txt ./... + + - name: coverage + image: plugins/codecov + settings: + token: + from_secret: codecov-token + + - name: notify + image: plugins/webhook + urls: https://msgbus.mills.io/ci.mills.io + when: + status: + - success + - failure diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6cc7d39 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +*~* +*.bak + +/coverage.txt +/bitcask +/tmp diff --git a/.goreleaser.yml b/.goreleaser.yml new file mode 100644 index 0000000..eacfa03 --- /dev/null +++ b/.goreleaser.yml @@ -0,0 +1,25 @@ +builds: + - + flags: -tags "static_build" + ldflags: -w -X .Version={{.Version}} -X .Commit={{.Commit}} + env: + - CGO_ENABLED=0 +sign: + artifacts: checksum +archive: + replacements: + darwin: Darwin + linux: Linux + windows: Windows + 386: i386 + amd64: x86_64 +checksum: + name_template: 'checksums.txt' +snapshot: + name_template: "{{ .Tag }}-next" +changelog: + sort: asc + filters: + exclude: + - '^docs:' + - '^test:' diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e232a10 --- /dev/null +++ b/Makefile @@ -0,0 +1,39 @@ +.PHONY: dev build generate install image release profile bench test clean + +CGO_ENABLED=0 +COMMIT=$(shell git rev-parse --short HEAD) + +all: dev + +dev: build + @./bitcask --version + +build: clean generate + @go build \ + -tags "netgo static_build" -installsuffix netgo \ + -ldflags "-w -X $(shell go list)/.Commit=$(COMMIT)" \ + ./cmd/bitcask/... + +generate: + @go generate $(shell go list)/... + +install: build + @go install ./cmd/bitcask/... + +image: + @docker build -t prologic/bitcask . + +release: + @./tools/release.sh + +profile: build + @go test -cpuprofile cpu.prof -memprofile mem.prof -v -bench ./... + +bench: build + @go test -v -benchmem -bench=. ./... + +test: build + @go test -v -cover -coverprofile=coverage.txt -covermode=atomic -coverpkg=./... -race ./... + +clean: + @git clean -f -d -X diff --git a/README.md b/README.md new file mode 100644 index 0000000..ac20195 --- /dev/null +++ b/README.md @@ -0,0 +1,73 @@ +# bitcask + +[![Build Status](https://cloud.drone.io/api/badges/prologic/bitcask/status.svg)](https://cloud.drone.io/prologic/bitcask) +[![CodeCov](https://codecov.io/gh/prologic/bitcask/branch/master/graph/badge.svg)](https://codecov.io/gh/prologic/bitcask) +[![Go Report Card](https://goreportcard.com/badge/prologic/bitcask)](https://goreportcard.com/report/prologic/bitcask) +[![GoDoc](https://godoc.org/github.com/prologic/bitcask?status.svg)](https://godoc.org/github.com/prologic/bitcask) +[![Sourcegraph](https://sourcegraph.com/github.com/prologic/msgbus/-/badge.svg)](https://sourcegraph.com/github.com/prologic/msgbus?badge) + +A Bitcask (LSM+WAL) Key/Value Store written in Go. + +## Features + +* Embeddable +* Builtin CLI + +## Install + +```#!bash +$ go get github.com/prologic/bitcask +``` + +## Usage (library) + +Install the package into your project: + +```#!bash +$ go get github.com/prologic/bitcask +``` + +```#!go +package main + +import ( + "log" + + "github.com/prologic/bitcask" +) + +func main() { + db, _ := bitcask.Open("/tmp/db") + db.Set("Hello", []byte("World")) + db.Close() +} +``` + +See the [godoc](https://godoc.org/github.com/prologic/bitcask) for further +documentation and other examples. + +## Usage (tool) + +```#!bash +$ bitcask -p /tmp/db set Hello World +$ bitcask -p /tmp/db get Hello +World +``` + +## Performance + +Benchmarks run on a 11" Macbook with a 1.4Ghz Intel Core i7: + +``` +$ make bench +... +BenchmarkGet-4 50000 33185 ns/op 600 B/op 14 allocs/op +BenchmarkPut-4 100000 16757 ns/op 699 B/op 7 allocs/op +``` + +* ~30,000 reads/sec +* ~60,000 writes/sec + +## License + +bitcask is licensed under the [MIT License](https://github.com/prologic/msgbus/blob/master/LICENSE) diff --git a/bitcask.go b/bitcask.go new file mode 100644 index 0000000..4a8b923 --- /dev/null +++ b/bitcask.go @@ -0,0 +1,349 @@ +package bitcask + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "time" +) + +const ( + DefaultMaxDatafileSize = 1 << 20 // 1MB +) + +var ( + ErrKeyNotFound = errors.New("error: key not found") +) + +type Bitcask struct { + path string + curr *Datafile + keydir *Keydir + + maxDatafileSize int64 +} + +func (b *Bitcask) Close() error { + return b.curr.Close() +} + +func (b *Bitcask) Sync() error { + return b.curr.Sync() +} + +func (b *Bitcask) Get(key string) ([]byte, error) { + item, ok := b.keydir.Get(key) + if !ok { + return nil, ErrKeyNotFound + } + + df, err := NewDatafile(b.path, item.FileID, true) + if err != nil { + return nil, err + } + defer df.Close() + + e, err := df.ReadAt(item.Index) + if err != nil { + return nil, err + } + + return e.Value, nil +} + +func (b *Bitcask) Put(key string, value []byte) error { + index, err := b.put(key, value) + if err != nil { + return err + } + + b.keydir.Add(key, b.curr.id, index, time.Now().Unix()) + + return nil +} + +func (b *Bitcask) Delete(key string) error { + _, err := b.put(key, []byte{}) + if err != nil { + return err + } + + b.keydir.Delete(key) + + return nil +} + +func (b *Bitcask) Fold(f func(key string) error) error { + for key := range b.keydir.Keys() { + if err := f(key); err != nil { + return err + } + } + return nil +} + +func (b *Bitcask) put(key string, value []byte) (int64, error) { + size, err := b.curr.Size() + if err != nil { + return -1, err + } + + if size >= b.maxDatafileSize { + err := b.curr.Close() + if err != nil { + return -1, err + } + + id := b.curr.id + 1 + curr, err := NewDatafile(b.path, id, false) + if err != nil { + return -1, err + } + b.curr = curr + } + + e := NewEntry(key, value) + return b.curr.Write(e) +} + +func (b *Bitcask) setMaxDatafileSize(size int64) error { + b.maxDatafileSize = size + return nil +} + +func MaxDatafileSize(size int64) func(*Bitcask) error { + return func(b *Bitcask) error { + return b.setMaxDatafileSize(size) + } +} + +func getDatafiles(path string) ([]string, error) { + fns, err := filepath.Glob(fmt.Sprintf("%s/*.data", path)) + if err != nil { + return nil, err + } + sort.Strings(fns) + return fns, nil +} + +func parseIds(fns []string) ([]int, error) { + var ids []int + for _, fn := range fns { + fn = filepath.Base(fn) + ext := filepath.Ext(fn) + if ext != ".data" { + continue + } + id, err := strconv.ParseInt(strings.TrimSuffix(fn, ext), 10, 32) + if err != nil { + return nil, err + } + ids = append(ids, int(id)) + } + sort.Ints(ids) + return ids, nil +} + +func Merge(path string, force bool) error { + fns, err := getDatafiles(path) + if err != nil { + return err + } + + ids, err := parseIds(fns) + if err != nil { + return err + } + + // Do not merge if we only have 1 Datafile + if len(ids) <= 1 { + return nil + } + + // Don't merge the Active Datafile (the last one) + fns = fns[:len(fns)-1] + ids = ids[:len(ids)-1] + + temp, err := ioutil.TempDir("", "bitcask") + if err != nil { + return err + } + + for i, fn := range fns { + // Don't merge Datafiles whose .hint files we've already generated + // (they are already merged); unless we set the force flag to true + // (forcing a re-merge). + if filepath.Ext(fn) == ".hint" && !force { + // Already merged + continue + } + + id := ids[i] + + keydir := NewKeydir() + + df, err := NewDatafile(path, id, true) + if err != nil { + return err + } + defer df.Close() + + for { + e, err := df.Read() + if err != nil { + if err == io.EOF { + break + } + return err + } + + // Tombstone value (deleted key) + if len(e.Value) == 0 { + keydir.Delete(e.Key) + continue + } + + keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) + } + + tempdf, err := NewDatafile(temp, id, false) + if err != nil { + return err + } + defer tempdf.Close() + + for key := range keydir.Keys() { + item, _ := keydir.Get(key) + e, err := df.ReadAt(item.Index) + if err != nil { + return err + } + + _, err = tempdf.Write(e) + if err != nil { + return err + } + } + + err = tempdf.Close() + if err != nil { + return err + } + + err = df.Close() + if err != nil { + return err + } + + err = os.Rename(tempdf.Name(), df.Name()) + if err != nil { + return err + } + + hint := strings.TrimSuffix(df.Name(), ".data") + ".hint" + err = keydir.Save(hint) + if err != nil { + return err + } + } + + return nil +} + +func Open(path string, options ...func(*Bitcask) error) (*Bitcask, error) { + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + + err := Merge(path, false) + if err != nil { + return nil, err + } + + keydir := NewKeydir() + + fns, err := getDatafiles(path) + if err != nil { + return nil, err + } + + ids, err := parseIds(fns) + if err != nil { + return nil, err + } + + for i, fn := range fns { + if filepath.Ext(fn) == ".hint" { + f, err := os.Open(filepath.Join(path, fn)) + if err != nil { + return nil, err + } + defer f.Close() + + hint, err := NewKeydirFromBytes(f) + if err != nil { + return nil, err + } + + for key := range hint.Keys() { + item, _ := hint.Get(key) + keydir.Add(key, item.FileID, item.Index, item.Timestamp) + } + } else { + df, err := NewDatafile(path, ids[i], true) + if err != nil { + return nil, err + } + + for { + e, err := df.Read() + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + + // Tombstone value (deleted key) + if len(e.Value) == 0 { + keydir.Delete(e.Key) + continue + } + + keydir.Add(e.Key, ids[i], e.Index, e.Timestamp) + } + } + } + + var id int + if len(ids) > 0 { + id = ids[(len(ids) - 1)] + } + curr, err := NewDatafile(path, id, false) + if err != nil { + return nil, err + } + + bitcask := &Bitcask{ + path: path, + curr: curr, + keydir: keydir, + + maxDatafileSize: DefaultMaxDatafileSize, + } + + for _, option := range options { + err = option(bitcask) + if err != nil { + return nil, err + } + } + + return bitcask, nil +} diff --git a/bitcask_test.go b/bitcask_test.go new file mode 100644 index 0000000..b7bbf8e --- /dev/null +++ b/bitcask_test.go @@ -0,0 +1,249 @@ +package bitcask + +import ( + "fmt" + "io/ioutil" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAll(t *testing.T) { + var ( + db *Bitcask + testdir string + err error + ) + + assert := assert.New(t) + + testdir, err = ioutil.TempDir("", "bitcask") + assert.NoError(err) + + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Put", func(t *testing.T) { + err = db.Put("foo", []byte("bar")) + assert.NoError(err) + }) + + t.Run("Get", func(t *testing.T) { + val, err := db.Get("foo") + assert.NoError(err) + assert.Equal([]byte("bar"), val) + }) + + t.Run("Delete", func(t *testing.T) { + err := db.Delete("foo") + assert.NoError(err) + _, err = db.Get("foo") + assert.Error(err) + assert.Equal(err.Error(), "error: key not found") + }) + + t.Run("Sync", func(t *testing.T) { + err = db.Sync() + assert.NoError(err) + }) + + t.Run("Close", func(t *testing.T) { + err = db.Close() + assert.NoError(err) + }) +} + +func TestDeletedKeys(t *testing.T) { + assert := assert.New(t) + + testdir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + + t.Run("Setup", func(t *testing.T) { + var ( + db *Bitcask + err error + ) + + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Put", func(t *testing.T) { + err = db.Put("foo", []byte("bar")) + assert.NoError(err) + }) + + t.Run("Get", func(t *testing.T) { + val, err := db.Get("foo") + assert.NoError(err) + assert.Equal([]byte("bar"), val) + }) + + t.Run("Delete", func(t *testing.T) { + err := db.Delete("foo") + assert.NoError(err) + _, err = db.Get("foo") + assert.Error(err) + assert.Equal("error: key not found", err.Error()) + }) + + t.Run("Sync", func(t *testing.T) { + err = db.Sync() + assert.NoError(err) + }) + + t.Run("Close", func(t *testing.T) { + err = db.Close() + assert.NoError(err) + }) + }) + + t.Run("Reopen", func(t *testing.T) { + var ( + db *Bitcask + err error + ) + + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Get", func(t *testing.T) { + _, err = db.Get("foo") + assert.Error(err) + assert.Equal("error: key not found", err.Error()) + }) + + t.Run("Close", func(t *testing.T) { + err = db.Close() + assert.NoError(err) + }) + }) +} + +func TestMerge(t *testing.T) { + assert := assert.New(t) + + testdir, err := ioutil.TempDir("", "bitcask") + assert.NoError(err) + + t.Run("Setup", func(t *testing.T) { + var ( + db *Bitcask + err error + ) + + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir, MaxDatafileSize(1024)) + assert.NoError(err) + }) + + t.Run("Put", func(t *testing.T) { + for i := 0; i < 1024; i++ { + err = db.Put(string(i), []byte(strings.Repeat(" ", 1024))) + assert.NoError(err) + } + }) + + t.Run("Get", func(t *testing.T) { + for i := 0; i < 32; i++ { + err = db.Put(string(i), []byte(strings.Repeat(" ", 1024))) + assert.NoError(err) + val, err := db.Get(string(i)) + assert.NoError(err) + assert.Equal([]byte(strings.Repeat(" ", 1024)), val) + } + }) + + t.Run("Sync", func(t *testing.T) { + err = db.Sync() + assert.NoError(err) + }) + + t.Run("Close", func(t *testing.T) { + err = db.Close() + assert.NoError(err) + }) + }) + + t.Run("Merge", func(t *testing.T) { + var ( + db *Bitcask + err error + ) + + t.Run("Open", func(t *testing.T) { + db, err = Open(testdir) + assert.NoError(err) + }) + + t.Run("Get", func(t *testing.T) { + for i := 0; i < 32; i++ { + val, err := db.Get(string(i)) + assert.NoError(err) + assert.Equal([]byte(strings.Repeat(" ", 1024)), val) + } + }) + + t.Run("Close", func(t *testing.T) { + err = db.Close() + assert.NoError(err) + }) + }) +} + +func BenchmarkGet(b *testing.B) { + testdir, err := ioutil.TempDir("", "bitcask") + if err != nil { + b.Fatal(err) + } + + db, err := Open(testdir) + if err != nil { + b.Fatal(err) + } + defer db.Close() + + err = db.Put("foo", []byte("bar")) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + val, err := db.Get("foo") + if err != nil { + b.Fatal(err) + } + if string(val) != "bar" { + b.Errorf("expected val=bar got=%s", val) + } + } +} + +func BenchmarkPut(b *testing.B) { + testdir, err := ioutil.TempDir("", "bitcask") + if err != nil { + b.Fatal(err) + } + + db, err := Open(testdir) + if err != nil { + b.Fatal(err) + } + defer db.Close() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := db.Put(fmt.Sprintf("key%d", i), []byte("bar")) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/bitcask/del.go b/cmd/bitcask/del.go new file mode 100644 index 0000000..a6f98eb --- /dev/null +++ b/cmd/bitcask/del.go @@ -0,0 +1,46 @@ +package main + +import ( + "os" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/prologic/bitcask" +) + +var delCmd = &cobra.Command{ + Use: "del ", + Aliases: []string{"delete", "remove", "rm"}, + Short: "Delete a key and its value", + Long: `This deletes a key and its value`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + path := viper.GetString("path") + + key := args[0] + + os.Exit(del(path, key)) + }, +} + +func init() { + RootCmd.AddCommand(delCmd) +} + +func del(path, key string) int { + db, err := bitcask.Open(path) + if err != nil { + log.WithError(err).Error("error opening database") + return 1 + } + + err = db.Delete(key) + if err != nil { + log.WithError(err).Error("error deleting key") + return 1 + } + + return 0 +} diff --git a/cmd/bitcask/get.go b/cmd/bitcask/get.go new file mode 100644 index 0000000..97bd1a3 --- /dev/null +++ b/cmd/bitcask/get.go @@ -0,0 +1,50 @@ +package main + +import ( + "fmt" + "os" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/prologic/bitcask" +) + +var getCmd = &cobra.Command{ + Use: "get ", + Aliases: []string{"view"}, + Short: "Get a new Key and display its Value", + Long: `This retrieves a key and display its value`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + path := viper.GetString("path") + + key := args[0] + + os.Exit(get(path, key)) + }, +} + +func init() { + RootCmd.AddCommand(getCmd) +} + +func get(path, key string) int { + db, err := bitcask.Open(path) + if err != nil { + log.WithError(err).Error("error opening database") + return 1 + } + + value, err := db.Get(key) + if err != nil { + log.WithError(err).Error("error reading key") + return 1 + } + + fmt.Printf("%s\n", string(value)) + log.WithField("key", key).WithField("value", value).Debug("key/value") + + return 0 +} diff --git a/cmd/bitcask/keys.go b/cmd/bitcask/keys.go new file mode 100644 index 0000000..f3ccf08 --- /dev/null +++ b/cmd/bitcask/keys.go @@ -0,0 +1,48 @@ +package main + +import ( + "fmt" + "os" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/prologic/bitcask" +) + +var keysCmd = &cobra.Command{ + Use: "keys", + Aliases: []string{"list", "ls"}, + Short: "Display all keys in Database", + Long: `This displays all known keys in the Database"`, + Args: cobra.ExactArgs(0), + Run: func(cmd *cobra.Command, args []string) { + path := viper.GetString("path") + + os.Exit(keys(path)) + }, +} + +func init() { + RootCmd.AddCommand(keysCmd) +} + +func keys(path string) int { + db, err := bitcask.Open(path) + if err != nil { + log.WithError(err).Error("error opening database") + return 1 + } + + err = db.Fold(func(key string) error { + fmt.Printf("%s\n", key) + return nil + }) + if err != nil { + log.WithError(err).Error("error listing keys") + return 1 + } + + return 0 +} diff --git a/cmd/bitcask/main.go b/cmd/bitcask/main.go new file mode 100644 index 0000000..736ef31 --- /dev/null +++ b/cmd/bitcask/main.go @@ -0,0 +1,5 @@ +package main + +func main() { + Execute() +} diff --git a/cmd/bitcask/merge.go b/cmd/bitcask/merge.go new file mode 100644 index 0000000..2090b9b --- /dev/null +++ b/cmd/bitcask/merge.go @@ -0,0 +1,50 @@ +package main + +import ( + "os" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/prologic/bitcask" +) + +var mergeCmd = &cobra.Command{ + Use: "merge", + Aliases: []string{"clean", "compact", "defrag"}, + Short: "Merges the Datafiles in the Database", + Long: `This merges all non-active Datafiles in the Database and +compacts the data stored on disk. Old values are removed as well as deleted +keys.`, + Args: cobra.ExactArgs(0), + Run: func(cmd *cobra.Command, args []string) { + path := viper.GetString("path") + force, err := cmd.Flags().GetBool("force") + if err != nil { + log.WithError(err).Error("error parsing force flag") + os.Exit(1) + } + + os.Exit(merge(path, force)) + }, +} + +func init() { + RootCmd.AddCommand(mergeCmd) + + mergeCmd.Flags().BoolP( + "force", "f", false, + "Force a re-merge even if .hint files exist", + ) +} + +func merge(path string, force bool) int { + err := bitcask.Merge(path, force) + if err != nil { + log.WithError(err).Error("error merging database") + return 1 + } + + return 0 +} diff --git a/cmd/bitcask/root.go b/cmd/bitcask/root.go new file mode 100644 index 0000000..6fd1684 --- /dev/null +++ b/cmd/bitcask/root.go @@ -0,0 +1,60 @@ +package main + +import ( + "fmt" + "os" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/prologic/bitcask" +) + +// RootCmd represents the base command when called without any subcommands +var RootCmd = &cobra.Command{ + Use: "bitcask", + Version: bitcask.FullVersion(), + Short: "Command-line tools for bitcask", + Long: `This is the command-line tool to interact with a bitcask database. + +This lets you get, set and delete key/value pairs as well as perform merge +(or compaction) operations. This tool serves as an example implementation +however is also intended to be useful in shell scripts.`, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + // set logging level + if viper.GetBool("debug") { + log.SetLevel(log.DebugLevel) + } else { + log.SetLevel(log.InfoLevel) + } + }, +} + +// Execute adds all child commands to the root command +// and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the rootCmd. +func Execute() { + if err := RootCmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +func init() { + RootCmd.PersistentFlags().BoolP( + "debug", "d", false, + "Enable debug logging", + ) + + RootCmd.PersistentFlags().StringP( + "path", "p", "/tmp/bitcask", + "Path to Bitcask database", + ) + + viper.BindPFlag("path", RootCmd.PersistentFlags().Lookup("path")) + viper.SetDefault("path", "/tmp/bitcask") + + viper.BindPFlag("debug", RootCmd.PersistentFlags().Lookup("debug")) + viper.SetDefault("debug", false) +} diff --git a/cmd/bitcask/set.go b/cmd/bitcask/set.go new file mode 100644 index 0000000..96ae3c6 --- /dev/null +++ b/cmd/bitcask/set.go @@ -0,0 +1,64 @@ +package main + +import ( + "bytes" + "io" + "io/ioutil" + "os" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "github.com/prologic/bitcask" +) + +var setCmd = &cobra.Command{ + Use: "set []", + Aliases: []string{"add"}, + Short: "Add/Set a new Key/Value pair", + Long: `This adds or sets a new key/value pair. + +If the value is not specified as an argument it is read from standard input.`, + Args: cobra.MinimumNArgs(1), + Run: func(cmd *cobra.Command, args []string) { + path := viper.GetString("path") + + key := args[0] + + var value io.Reader + if len(args) > 1 { + value = bytes.NewBufferString(args[1]) + } else { + value = os.Stdin + } + + os.Exit(set(path, key, value)) + }, +} + +func init() { + RootCmd.AddCommand(setCmd) +} + +func set(path, key string, value io.Reader) int { + db, err := bitcask.Open(path) + if err != nil { + log.WithError(err).Error("error opening database") + return 1 + } + + data, err := ioutil.ReadAll(value) + if err != nil { + log.WithError(err).Error("error writing key") + return 1 + } + + err = db.Put(key, data) + if err != nil { + log.WithError(err).Error("error writing key") + return 1 + } + + return 0 +} diff --git a/datafile.go b/datafile.go new file mode 100644 index 0000000..0ebabd0 --- /dev/null +++ b/datafile.go @@ -0,0 +1,139 @@ +package bitcask + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "time" + + pb "github.com/prologic/bitcask/proto" + "github.com/prologic/bitcask/streampb" +) + +const ( + DefaultDatafileFilename = "%09d.data" +) + +var ( + ErrReadonly = errors.New("error: read only datafile") +) + +type Datafile struct { + id int + r *os.File + w *os.File + dec *streampb.Decoder + enc *streampb.Encoder +} + +func NewDatafile(path string, id int, readonly bool) (*Datafile, error) { + var ( + r *os.File + w *os.File + err error + ) + + fn := filepath.Join(path, fmt.Sprintf(DefaultDatafileFilename, id)) + + if !readonly { + w, err = os.OpenFile(fn, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) + if err != nil { + return nil, err + } + } + + r, err = os.Open(fn) + if err != nil { + return nil, err + } + + dec := streampb.NewDecoder(r) + enc := streampb.NewEncoder(w) + + return &Datafile{ + id: id, + r: r, + w: w, + dec: dec, + enc: enc, + }, nil +} + +func (df *Datafile) Name() string { + return df.r.Name() +} + +func (df *Datafile) Close() error { + if df.w == nil { + return df.r.Close() + } + + err := df.Sync() + if err != nil { + return err + } + return df.w.Close() +} + +func (df *Datafile) Sync() error { + if df.w == nil { + return nil + } + return df.w.Sync() +} + +func (df *Datafile) Size() (int64, error) { + var ( + stat os.FileInfo + err error + ) + + if df.w == nil { + stat, err = df.r.Stat() + } else { + stat, err = df.w.Stat() + } + + if err != nil { + return -1, err + } + + return stat.Size(), nil +} + +func (df *Datafile) Read() (pb.Entry, error) { + var e pb.Entry + return e, df.dec.Decode(&e) +} + +func (df *Datafile) ReadAt(index int64) (e pb.Entry, err error) { + _, err = df.r.Seek(index, os.SEEK_SET) + if err != nil { + return + } + return df.Read() +} + +func (df *Datafile) Write(e pb.Entry) (int64, error) { + if df.w == nil { + return -1, ErrReadonly + } + + stat, err := df.w.Stat() + if err != nil { + return -1, err + } + + index := stat.Size() + + e.Index = index + e.Timestamp = time.Now().Unix() + + err = df.enc.Encode(&e) + if err != nil { + return -1, err + } + + return index, nil +} diff --git a/entry.go b/entry.go new file mode 100644 index 0000000..3d14597 --- /dev/null +++ b/entry.go @@ -0,0 +1,17 @@ +package bitcask + +import ( + "hash/crc32" + + pb "github.com/prologic/bitcask/proto" +) + +func NewEntry(key string, value []byte) pb.Entry { + crc := crc32.ChecksumIEEE(value) + + return pb.Entry{ + CRC: crc, + Key: key, + Value: value, + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..96256e1 --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module github.com/prologic/bitcask + +require ( + github.com/gogo/protobuf v1.2.1 + github.com/golang/protobuf v1.2.0 + github.com/gorilla/websocket v1.4.0 // indirect + github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 // indirect + github.com/mitchellh/go-homedir v1.1.0 + github.com/pkg/errors v0.8.1 + github.com/prologic/msgbus v0.1.1 + github.com/prometheus/client_golang v0.9.2 // indirect + github.com/sirupsen/logrus v1.3.0 + github.com/spf13/cobra v0.0.3 + github.com/spf13/viper v1.3.1 + github.com/stretchr/testify v1.3.0 + gopkg.in/vmihailenco/msgpack.v2 v2.9.1 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..03ddc48 --- /dev/null +++ b/go.sum @@ -0,0 +1,86 @@ +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= +github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 h1:K//n/AqR5HjG3qxbrBCL4vJPW0MVFSs9CPK1OOJdRME= +github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prologic/msgbus v0.1.1 h1:aXfEYW+09k3cwY0z9RhITLM7JDn+b/mm/cxgbZOivS0= +github.com/prologic/msgbus v0.1.1/go.mod h1:B3Qu4/U2FP08x93jUzp9E8bl155+cIgDH2DUGRK6OZk= +github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= +github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME= +github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= +github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38= +github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/vmihailenco/msgpack.v2 v2.9.1 h1:kb0VV7NuIojvRfzwslQeP3yArBqJHW9tOl4t38VS1jM= +gopkg.in/vmihailenco/msgpack.v2 v2.9.1/go.mod h1:/3Dn1Npt9+MYyLpYYXjInO/5jvMLamn+AEGwNEOatn8= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/keydir.go b/keydir.go new file mode 100644 index 0000000..ed09ff6 --- /dev/null +++ b/keydir.go @@ -0,0 +1,92 @@ +package bitcask + +import ( + "bytes" + "encoding/gob" + "io" + "io/ioutil" + "sync" +) + +type Item struct { + FileID int + Index int64 + Timestamp int64 +} + +type Keydir struct { + sync.RWMutex + kv map[string]Item +} + +func NewKeydir() *Keydir { + return &Keydir{ + kv: make(map[string]Item), + } +} + +func (k *Keydir) Add(key string, fileid int, index, timestamp int64) { + k.Lock() + defer k.Unlock() + + k.kv[key] = Item{ + FileID: fileid, + Index: index, + Timestamp: timestamp, + } +} + +func (k *Keydir) Get(key string) (Item, bool) { + k.RLock() + defer k.RUnlock() + + item, ok := k.kv[key] + return item, ok +} + +func (k *Keydir) Delete(key string) { + k.Lock() + defer k.Unlock() + + delete(k.kv, key) +} + +func (k *Keydir) Keys() chan string { + ch := make(chan string) + go func() { + for k := range k.kv { + ch <- k + } + close(ch) + }() + return ch +} + +func (k *Keydir) Bytes() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(k.kv) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (k *Keydir) Save(fn string) error { + data, err := k.Bytes() + if err != nil { + return err + } + + return ioutil.WriteFile(fn, data, 0644) +} + +func NewKeydirFromBytes(r io.Reader) (*Keydir, error) { + k := NewKeydir() + dec := gob.NewDecoder(r) + err := dec.Decode(&k.kv) + if err != nil { + return nil, err + } + return k, nil +} diff --git a/proto/doc.go b/proto/doc.go new file mode 100644 index 0000000..3fe484d --- /dev/null +++ b/proto/doc.go @@ -0,0 +1,3 @@ +package proto + +//go:generate protoc --go_out=. entry.proto diff --git a/proto/entry.pb.go b/proto/entry.pb.go new file mode 100644 index 0000000..89f4b44 --- /dev/null +++ b/proto/entry.pb.go @@ -0,0 +1,108 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: entry.proto + +package proto + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Entry struct { + CRC uint32 `protobuf:"varint,1,opt,name=CRC,proto3" json:"CRC,omitempty"` + Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"` + Index int64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"` + Value []byte `protobuf:"bytes,4,opt,name=Value,proto3" json:"Value,omitempty"` + Timestamp int64 `protobuf:"varint,5,opt,name=Timestamp,proto3" json:"Timestamp,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Entry) Reset() { *m = Entry{} } +func (m *Entry) String() string { return proto.CompactTextString(m) } +func (*Entry) ProtoMessage() {} +func (*Entry) Descriptor() ([]byte, []int) { + return fileDescriptor_entry_4f5906245d08394f, []int{0} +} +func (m *Entry) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Entry.Unmarshal(m, b) +} +func (m *Entry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Entry.Marshal(b, m, deterministic) +} +func (dst *Entry) XXX_Merge(src proto.Message) { + xxx_messageInfo_Entry.Merge(dst, src) +} +func (m *Entry) XXX_Size() int { + return xxx_messageInfo_Entry.Size(m) +} +func (m *Entry) XXX_DiscardUnknown() { + xxx_messageInfo_Entry.DiscardUnknown(m) +} + +var xxx_messageInfo_Entry proto.InternalMessageInfo + +func (m *Entry) GetCRC() uint32 { + if m != nil { + return m.CRC + } + return 0 +} + +func (m *Entry) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *Entry) GetIndex() int64 { + if m != nil { + return m.Index + } + return 0 +} + +func (m *Entry) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func (m *Entry) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + +func init() { + proto.RegisterType((*Entry)(nil), "proto.Entry") +} + +func init() { proto.RegisterFile("entry.proto", fileDescriptor_entry_4f5906245d08394f) } + +var fileDescriptor_entry_4f5906245d08394f = []byte{ + // 134 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29, + 0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xa5, 0x5c, 0xac, 0xae, + 0x20, 0x51, 0x21, 0x01, 0x2e, 0x66, 0xe7, 0x20, 0x67, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xde, 0x20, + 0x10, 0x13, 0x24, 0xe2, 0x9d, 0x5a, 0x29, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x62, 0x0a, + 0x89, 0x70, 0xb1, 0x7a, 0xe6, 0xa5, 0xa4, 0x56, 0x48, 0x30, 0x2b, 0x30, 0x6a, 0x30, 0x07, 0x41, + 0x38, 0x20, 0xd1, 0xb0, 0xc4, 0x9c, 0xd2, 0x54, 0x09, 0x16, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x08, + 0x47, 0x48, 0x86, 0x8b, 0x33, 0x24, 0x33, 0x37, 0xb5, 0xb8, 0x24, 0x31, 0xb7, 0x40, 0x82, 0x15, + 0xac, 0x1e, 0x21, 0x90, 0xc4, 0x06, 0xb6, 0xdd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x07, 0x99, + 0x47, 0xb9, 0x93, 0x00, 0x00, 0x00, +} diff --git a/proto/entry.proto b/proto/entry.proto new file mode 100644 index 0000000..2e59b48 --- /dev/null +++ b/proto/entry.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package proto; + +message Entry { + uint32 CRC = 1; + string Key = 2; + int64 Index = 3; + bytes Value = 4; + int64 Timestamp = 5; +} diff --git a/streampb/stream.go b/streampb/stream.go new file mode 100644 index 0000000..25d1602 --- /dev/null +++ b/streampb/stream.go @@ -0,0 +1,89 @@ +package streampb + +import ( + "encoding/binary" + "io" + + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" +) + +const ( + // prefixSize is the number of bytes we preallocate for storing + // our big endian lenth prefix buffer. + prefixSize = 8 +) + +// NewEncoder creates a streaming protobuf encoder. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w: w, prefixBuf: make([]byte, prefixSize)} +} + +// Encoder wraps an underlying io.Writer and allows you to stream +// proto encodings on it. +type Encoder struct { + w io.Writer + prefixBuf []byte +} + +// Encode takes any proto.Message and streams it to the underlying writer. +// Messages are framed with a length prefix. +func (e *Encoder) Encode(msg proto.Message) error { + buf, err := proto.Marshal(msg) + if err != nil { + return err + } + binary.BigEndian.PutUint64(e.prefixBuf, uint64(len(buf))) + + if _, err := e.w.Write(e.prefixBuf); err != nil { + return errors.Wrap(err, "failed writing length prefix") + } + + _, err = e.w.Write(buf) + return errors.Wrap(err, "failed writing marshaled data") +} + +// NewDecoder creates a streaming protobuf decoder. +func NewDecoder(r io.Reader) *Decoder { + return &Decoder{ + r: r, + prefixBuf: make([]byte, prefixSize), + } +} + +// Decoder wraps an underlying io.Reader and allows you to stream +// proto decodings on it. +type Decoder struct { + r io.Reader + prefixBuf []byte +} + +// Decode takes a proto.Message and unmarshals the next payload in the +// underlying io.Reader. It returns an EOF when it's done. +func (d *Decoder) Decode(v proto.Message) error { + _, err := io.ReadFull(d.r, d.prefixBuf) + if err != nil { + return err + } + + n := binary.BigEndian.Uint64(d.prefixBuf) + + buf := make([]byte, n) + + idx := uint64(0) + for idx < n { + m, err := d.r.Read(buf[idx:n]) + if err != nil { + return errors.Wrap(translateError(err), "failed reading marshaled data") + } + idx += uint64(m) + } + return proto.Unmarshal(buf[:n], v) +} + +func translateError(err error) error { + if err == io.EOF { + return io.ErrUnexpectedEOF + } + return err +} diff --git a/tools/release.sh b/tools/release.sh new file mode 100755 index 0000000..a916b43 --- /dev/null +++ b/tools/release.sh @@ -0,0 +1,25 @@ +#!/bin/sh + +# Get the highest tag number +VERSION="$(git describe --abbrev=0 --tags)" +VERSION=${VERSION:-'0.0.0'} + +# Get number parts +MAJOR="${VERSION%%.*}"; VERSION="${VERSION#*.}" +MINOR="${VERSION%%.*}"; VERSION="${VERSION#*.}" +PATCH="${VERSION%%.*}"; VERSION="${VERSION#*.}" + +# Increase version +PATCH=$((PATCH+1)) + +TAG="${1}" + +if [ "${TAG}" = "" ]; then + TAG="${MAJOR}.${MINOR}.${PATCH}" +fi + +echo "Releasing ${TAG} ..." + +git tag -a -s -m "Relase ${TAG}" "${TAG}" +git push --tags +goreleaser release --rm-dist diff --git a/version.go b/version.go new file mode 100644 index 0000000..d12297b --- /dev/null +++ b/version.go @@ -0,0 +1,18 @@ +package bitcask + +import ( + "fmt" +) + +var ( + // Version release version + Version = "0.0.1" + + // Commit will be overwritten automatically by the build system + Commit = "HEAD" +) + +// FullVersion returns the full version and commit hash +func FullVersion() string { + return fmt.Sprintf("%s@%s", Version, Commit) +} diff --git a/version_test.go b/version_test.go new file mode 100644 index 0000000..c263ffe --- /dev/null +++ b/version_test.go @@ -0,0 +1,15 @@ +package bitcask + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFullVersion(t *testing.T) { + assert := assert.New(t) + + expected := fmt.Sprintf("%s@%s", Version, Commit) + assert.Equal(expected, FullVersion()) +}