mirror of
https://git.mills.io/kayos/bitraft.git
synced 2024-06-25 00:09:04 +00:00
444 lines
10 KiB
Go
444 lines
10 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"compress/gzip"
|
|
"encoding/binary"
|
|
"errors"
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.tcp.direct/Mirrors/bitcask-mirror"
|
|
"github.com/rs/zerolog"
|
|
"github.com/tidwall/finn"
|
|
"github.com/tidwall/redcon"
|
|
)
|
|
|
|
const defaultTCPKeepAlive = time.Minute * 5
|
|
|
|
var (
|
|
errWrongNumberOfArguments = errors.New("wrong number of arguments")
|
|
)
|
|
|
|
func trimPattern(glob string) string {
|
|
escaped := false
|
|
pattern := ""
|
|
for _, char := range glob {
|
|
switch char {
|
|
case '\\':
|
|
escaped = !escaped
|
|
if !escaped {
|
|
pattern = pattern + string(char)
|
|
}
|
|
case '*':
|
|
if !escaped {
|
|
goto out
|
|
}
|
|
pattern = pattern + string(char)
|
|
case '?':
|
|
if !escaped {
|
|
goto out
|
|
}
|
|
pattern = pattern + string(char)
|
|
case '[':
|
|
if !escaped {
|
|
goto out
|
|
}
|
|
pattern = pattern + string(char)
|
|
default:
|
|
pattern = pattern + string(char)
|
|
}
|
|
}
|
|
out:
|
|
return pattern
|
|
}
|
|
|
|
func ListenAndServe(addr, join, dir, logdir string, consistency, durability finn.Level) error {
|
|
opts := finn.Options{
|
|
Backend: finn.FastLog,
|
|
Consistency: consistency,
|
|
Durability: durability,
|
|
ConnAccept: func(conn redcon.Conn) bool {
|
|
if tcp, ok := conn.NetConn().(*net.TCPConn); ok {
|
|
if err := tcp.SetKeepAlive(true); err != nil {
|
|
log.Warn().Err(err).Caller().Str("caller", tcp.RemoteAddr().String()).
|
|
Msg("could not set keepalive")
|
|
} else {
|
|
err := tcp.SetKeepAlivePeriod(defaultTCPKeepAlive)
|
|
if err != nil {
|
|
log.Warn().Err(err).Caller().Str("caller", tcp.RemoteAddr().String()).
|
|
Msg("could not set keepalive period")
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
},
|
|
}
|
|
m, err := NewMachine(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n, err := finn.Open(logdir, addr, join, m, &opts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer n.Close()
|
|
|
|
select {
|
|
// blocking, there's no way out
|
|
}
|
|
}
|
|
|
|
type cmdHandler func(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error)
|
|
|
|
// Machine is the FSM for the Raft consensus
|
|
type Machine struct {
|
|
mu sync.RWMutex
|
|
dir string
|
|
db *bitcask.Bitcask
|
|
dbPath string
|
|
// TODO: what was "addr" for?
|
|
// addr string
|
|
closed bool
|
|
cmdMapper map[string]cmdHandler
|
|
}
|
|
|
|
func NewMachine(dir string) (*Machine, error) {
|
|
kvm := &Machine{
|
|
dir: dir,
|
|
// addr: addr,
|
|
}
|
|
kvm.cmdMapper = map[string]cmdHandler{
|
|
"echo": kvm.cmdEcho, "set": kvm.cmdSet,
|
|
"get": kvm.cmdGet, "del": kvm.cmdDel,
|
|
"keys": kvm.cmdKeys, "flushdb": kvm.cmdFlushdb,
|
|
}
|
|
var err error
|
|
kvm.dbPath = filepath.Join(dir, "node.db")
|
|
kvm.db, err = bitcask.Open(kvm.dir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return kvm, nil
|
|
}
|
|
|
|
// Close shuts down our finite state machine and presumably, our database.
|
|
func (kvm *Machine) Close() (err error) {
|
|
kvm.mu.Lock()
|
|
defer kvm.mu.Unlock()
|
|
err = kvm.db.Close()
|
|
kvm.closed = true
|
|
return
|
|
}
|
|
|
|
func newSubLog(conn redcon.Conn, cmd redcon.Command) *zerolog.Logger {
|
|
slog := log.With().Logger()
|
|
if conn != nil {
|
|
slog = slog.With().Str("caller", conn.NetConn().RemoteAddr().String()).Logger()
|
|
}
|
|
if len(cmd.Raw) > 1 {
|
|
slog = slog.With().Str("cmd", string(cmd.Raw)).Logger()
|
|
}
|
|
return &slog
|
|
}
|
|
|
|
// Command is a callback for incoming Redis commands.
|
|
func (kvm *Machine) Command(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
|
|
slog := newSubLog(conn, cmd)
|
|
slog.Trace().Msg(string(cmd.Raw))
|
|
|
|
strCmd := strings.ToLower(string(cmd.Args[0]))
|
|
handler, ok := kvm.cmdMapper[strCmd]
|
|
switch {
|
|
case ok:
|
|
return handler(m, conn, cmd)
|
|
case strCmd == "shutdown":
|
|
slog.Warn().Msg("shutting down")
|
|
conn.WriteString("OK")
|
|
err := conn.Close()
|
|
if err != nil {
|
|
slog.Debug().Err(err).Caller().Msg("failed to close connection")
|
|
}
|
|
os.Exit(0)
|
|
return nil, nil
|
|
default:
|
|
// TODO: do we need to log here if we are returning the error type?
|
|
slog.Warn().Msg("unknown command")
|
|
return nil, finn.ErrUnknownCommand
|
|
}
|
|
}
|
|
|
|
// Restore attempts to restore a database from rd, which implements an io.Reader.
|
|
// This is meant to restore data exported by the Snapshot function.
|
|
func (kvm *Machine) Restore(rd io.Reader) error {
|
|
kvm.mu.Lock()
|
|
defer kvm.mu.Unlock()
|
|
var err error
|
|
if err := kvm.db.Close(); err != nil {
|
|
return err
|
|
}
|
|
if err := os.RemoveAll(kvm.dbPath); err != nil {
|
|
return err
|
|
}
|
|
kvm.db = nil
|
|
kvm.db, err = bitcask.Open(kvm.dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
num := make([]byte, 8)
|
|
gzr, err := gzip.NewReader(rd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r := bufio.NewReader(gzr)
|
|
for {
|
|
if _, err := io.ReadFull(r, num); err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
key := make([]byte, int(binary.LittleEndian.Uint64(num)))
|
|
if _, err := io.ReadFull(r, key); err != nil {
|
|
return err
|
|
}
|
|
if _, err := io.ReadFull(r, num); err != nil {
|
|
return err
|
|
}
|
|
value := make([]byte, int(binary.LittleEndian.Uint64(num)))
|
|
if _, err := io.ReadFull(r, value); err != nil {
|
|
return err
|
|
}
|
|
if err := kvm.db.Put(key, value); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return gzr.Close()
|
|
}
|
|
|
|
// WriteRedisCommandsFromSnapshot will read a snapshot and write all the
|
|
// Redis SET commands needed to rebuild the entire database.
|
|
// The commands are written to wr.
|
|
func WriteRedisCommandsFromSnapshot(wr io.Writer, snapshotPath string) error {
|
|
f, err := os.Open(snapshotPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
var cmd []byte
|
|
num := make([]byte, 8)
|
|
var gzclosed bool
|
|
gzr, err := gzip.NewReader(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if !gzclosed {
|
|
gzr.Close()
|
|
}
|
|
}()
|
|
r := bufio.NewReader(gzr)
|
|
for {
|
|
if _, err := io.ReadFull(r, num); err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
key := make([]byte, int(binary.LittleEndian.Uint64(num)))
|
|
if _, err := io.ReadFull(r, key); err != nil {
|
|
return err
|
|
}
|
|
if _, err := io.ReadFull(r, num); err != nil {
|
|
return err
|
|
}
|
|
value := make([]byte, int(binary.LittleEndian.Uint64(num)))
|
|
if _, err := io.ReadFull(r, value); err != nil {
|
|
return err
|
|
}
|
|
if len(key) == 0 || key[0] != 'k' {
|
|
// do not accept keys that do not start with 'k'
|
|
continue
|
|
}
|
|
key = key[1:]
|
|
cmd = cmd[:0]
|
|
cmd = append(cmd, "*3\r\n$3\r\nSET\r\n$"...)
|
|
cmd = strconv.AppendInt(cmd, int64(len(key)), 10)
|
|
cmd = append(cmd, '\r', '\n')
|
|
cmd = append(cmd, key...)
|
|
cmd = append(cmd, '\r', '\n', '$')
|
|
cmd = strconv.AppendInt(cmd, int64(len(value)), 10)
|
|
cmd = append(cmd, '\r', '\n')
|
|
cmd = append(cmd, value...)
|
|
cmd = append(cmd, '\r', '\n')
|
|
if _, err := wr.Write(cmd); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
err = gzr.Close()
|
|
gzclosed = true
|
|
return err
|
|
}
|
|
|
|
// Snapshot writes a snapshot of the database to wr, which implements io.Writer.
|
|
func (kvm *Machine) Snapshot(wr io.Writer) error {
|
|
kvm.mu.RLock()
|
|
defer kvm.mu.RUnlock()
|
|
gzw := gzip.NewWriter(wr)
|
|
|
|
err := kvm.db.Fold(func(key []byte) error {
|
|
var buf []byte
|
|
value, err := kvm.db.Get(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
num := make([]byte, 8)
|
|
binary.LittleEndian.PutUint64(num, uint64(len(key)))
|
|
buf = append(buf, num...)
|
|
buf = append(buf, key...)
|
|
binary.LittleEndian.PutUint64(num, uint64(len(value)))
|
|
buf = append(buf, num...)
|
|
buf = append(buf, value...)
|
|
if _, err := gzw.Write(buf); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return gzw.Close()
|
|
}
|
|
|
|
func (kvm *Machine) cmdSet(
|
|
m finn.Applier, conn redcon.Conn, cmd redcon.Command,
|
|
) (interface{}, error) {
|
|
if len(cmd.Args) != 3 {
|
|
return nil, finn.ErrWrongNumberOfArguments
|
|
}
|
|
return m.Apply(conn, cmd,
|
|
func() (interface{}, error) {
|
|
kvm.mu.Lock()
|
|
defer kvm.mu.Unlock()
|
|
return nil, kvm.db.Put(cmd.Args[1], cmd.Args[2])
|
|
},
|
|
func(v interface{}) (interface{}, error) {
|
|
conn.WriteString("OK")
|
|
return nil, nil
|
|
},
|
|
)
|
|
}
|
|
|
|
func (kvm *Machine) cmdEcho(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
|
|
if len(cmd.Args) != 2 {
|
|
return nil, finn.ErrWrongNumberOfArguments
|
|
}
|
|
conn.WriteBulk(cmd.Args[1])
|
|
return nil, nil
|
|
}
|
|
|
|
func (kvm *Machine) cmdGet(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
|
|
if len(cmd.Args) != 2 {
|
|
return nil, finn.ErrWrongNumberOfArguments
|
|
}
|
|
key := cmd.Args[1]
|
|
return m.Apply(conn, cmd, nil,
|
|
func(interface{}) (interface{}, error) {
|
|
kvm.mu.RLock()
|
|
defer kvm.mu.RUnlock()
|
|
value, err := kvm.db.Get(key)
|
|
if err != nil {
|
|
if err == bitcask.ErrKeyNotFound {
|
|
conn.WriteNull()
|
|
return nil, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
conn.WriteBulk(value)
|
|
return nil, nil
|
|
},
|
|
)
|
|
}
|
|
|
|
func (kvm *Machine) cmdDel(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
|
|
var startIdx = 1
|
|
return m.Apply(conn, cmd,
|
|
func() (interface{}, error) {
|
|
kvm.mu.Lock()
|
|
defer kvm.mu.Unlock()
|
|
var n int
|
|
for i := startIdx; i < len(cmd.Args); i++ {
|
|
key := cmd.Args[i]
|
|
err := kvm.db.Delete(key)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
n++
|
|
}
|
|
return n, nil
|
|
},
|
|
func(v interface{}) (interface{}, error) {
|
|
n := v.(int)
|
|
conn.WriteInt(n)
|
|
return nil, nil
|
|
},
|
|
)
|
|
}
|
|
|
|
func (kvm *Machine) cmdKeys(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
|
|
if len(cmd.Args) != 2 {
|
|
return nil, errWrongNumberOfArguments
|
|
}
|
|
pattern := string(cmd.Args[1])
|
|
scanPattern := trimPattern(pattern)
|
|
return m.Apply(conn, cmd, nil,
|
|
func(interface{}) (interface{}, error) {
|
|
kvm.mu.RLock()
|
|
defer kvm.mu.RUnlock()
|
|
var keys [][]byte
|
|
err := kvm.db.Scan([]byte(scanPattern), func(key []byte) error {
|
|
if ok, _ := filepath.Match(pattern, string(key)); ok {
|
|
keys = append(keys, []byte(key))
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conn.WriteArray(len(keys))
|
|
for i := 0; i < len(keys); i++ {
|
|
conn.WriteBulk(keys[i])
|
|
}
|
|
return nil, nil
|
|
},
|
|
)
|
|
}
|
|
|
|
func (kvm *Machine) cmdFlushdb(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
|
|
if len(cmd.Args) != 1 {
|
|
return nil, finn.ErrWrongNumberOfArguments
|
|
}
|
|
return m.Apply(conn, cmd,
|
|
func() (interface{}, error) {
|
|
kvm.mu.Lock()
|
|
defer kvm.mu.Unlock()
|
|
if err := kvm.db.Sync(); err != nil {
|
|
panic(err.Error())
|
|
}
|
|
return nil, nil
|
|
},
|
|
func(v interface{}) (interface{}, error) {
|
|
conn.WriteString("OK")
|
|
return nil, nil
|
|
},
|
|
)
|
|
}
|