1
0
mirror of https://git.mills.io/kayos/bitraft.git synced 2024-06-25 00:09:04 +00:00
bitraft/server.go

444 lines
10 KiB
Go

package main
import (
"bufio"
"compress/gzip"
"encoding/binary"
"errors"
"io"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"git.tcp.direct/Mirrors/bitcask-mirror"
"github.com/rs/zerolog"
"github.com/tidwall/finn"
"github.com/tidwall/redcon"
)
const defaultTCPKeepAlive = time.Minute * 5
var (
errWrongNumberOfArguments = errors.New("wrong number of arguments")
)
func trimPattern(glob string) string {
escaped := false
pattern := ""
for _, char := range glob {
switch char {
case '\\':
escaped = !escaped
if !escaped {
pattern = pattern + string(char)
}
case '*':
if !escaped {
goto out
}
pattern = pattern + string(char)
case '?':
if !escaped {
goto out
}
pattern = pattern + string(char)
case '[':
if !escaped {
goto out
}
pattern = pattern + string(char)
default:
pattern = pattern + string(char)
}
}
out:
return pattern
}
func ListenAndServe(addr, join, dir, logdir string, consistency, durability finn.Level) error {
opts := finn.Options{
Backend: finn.FastLog,
Consistency: consistency,
Durability: durability,
ConnAccept: func(conn redcon.Conn) bool {
if tcp, ok := conn.NetConn().(*net.TCPConn); ok {
if err := tcp.SetKeepAlive(true); err != nil {
log.Warn().Err(err).Caller().Str("caller", tcp.RemoteAddr().String()).
Msg("could not set keepalive")
} else {
err := tcp.SetKeepAlivePeriod(defaultTCPKeepAlive)
if err != nil {
log.Warn().Err(err).Caller().Str("caller", tcp.RemoteAddr().String()).
Msg("could not set keepalive period")
}
}
}
return true
},
}
m, err := NewMachine(dir)
if err != nil {
return err
}
n, err := finn.Open(logdir, addr, join, m, &opts)
if err != nil {
return err
}
defer n.Close()
select {
// blocking, there's no way out
}
}
type cmdHandler func(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error)
// Machine is the FSM for the Raft consensus
type Machine struct {
mu sync.RWMutex
dir string
db *bitcask.Bitcask
dbPath string
// TODO: what was "addr" for?
// addr string
closed bool
cmdMapper map[string]cmdHandler
}
func NewMachine(dir string) (*Machine, error) {
kvm := &Machine{
dir: dir,
// addr: addr,
}
kvm.cmdMapper = map[string]cmdHandler{
"echo": kvm.cmdEcho, "set": kvm.cmdSet,
"get": kvm.cmdGet, "del": kvm.cmdDel,
"keys": kvm.cmdKeys, "flushdb": kvm.cmdFlushdb,
}
var err error
kvm.dbPath = filepath.Join(dir, "node.db")
kvm.db, err = bitcask.Open(kvm.dir)
if err != nil {
return nil, err
}
return kvm, nil
}
// Close shuts down our finite state machine and presumably, our database.
func (kvm *Machine) Close() (err error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
err = kvm.db.Close()
kvm.closed = true
return
}
func newSubLog(conn redcon.Conn, cmd redcon.Command) *zerolog.Logger {
slog := log.With().Logger()
if conn != nil {
slog = slog.With().Str("caller", conn.NetConn().RemoteAddr().String()).Logger()
}
if len(cmd.Raw) > 1 {
slog = slog.With().Str("cmd", string(cmd.Raw)).Logger()
}
return &slog
}
// Command is a callback for incoming Redis commands.
func (kvm *Machine) Command(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
slog := newSubLog(conn, cmd)
slog.Trace().Msg(string(cmd.Raw))
strCmd := strings.ToLower(string(cmd.Args[0]))
handler, ok := kvm.cmdMapper[strCmd]
switch {
case ok:
return handler(m, conn, cmd)
case strCmd == "shutdown":
slog.Warn().Msg("shutting down")
conn.WriteString("OK")
err := conn.Close()
if err != nil {
slog.Debug().Err(err).Caller().Msg("failed to close connection")
}
os.Exit(0)
return nil, nil
default:
// TODO: do we need to log here if we are returning the error type?
slog.Warn().Msg("unknown command")
return nil, finn.ErrUnknownCommand
}
}
// Restore attempts to restore a database from rd, which implements an io.Reader.
// This is meant to restore data exported by the Snapshot function.
func (kvm *Machine) Restore(rd io.Reader) error {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var err error
if err := kvm.db.Close(); err != nil {
return err
}
if err := os.RemoveAll(kvm.dbPath); err != nil {
return err
}
kvm.db = nil
kvm.db, err = bitcask.Open(kvm.dir)
if err != nil {
return err
}
num := make([]byte, 8)
gzr, err := gzip.NewReader(rd)
if err != nil {
return err
}
r := bufio.NewReader(gzr)
for {
if _, err := io.ReadFull(r, num); err != nil {
if err == io.EOF {
break
}
return err
}
key := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, key); err != nil {
return err
}
if _, err := io.ReadFull(r, num); err != nil {
return err
}
value := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, value); err != nil {
return err
}
if err := kvm.db.Put(key, value); err != nil {
return err
}
}
return gzr.Close()
}
// WriteRedisCommandsFromSnapshot will read a snapshot and write all the
// Redis SET commands needed to rebuild the entire database.
// The commands are written to wr.
func WriteRedisCommandsFromSnapshot(wr io.Writer, snapshotPath string) error {
f, err := os.Open(snapshotPath)
if err != nil {
return err
}
defer f.Close()
var cmd []byte
num := make([]byte, 8)
var gzclosed bool
gzr, err := gzip.NewReader(f)
if err != nil {
return err
}
defer func() {
if !gzclosed {
gzr.Close()
}
}()
r := bufio.NewReader(gzr)
for {
if _, err := io.ReadFull(r, num); err != nil {
if err == io.EOF {
break
}
return err
}
key := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, key); err != nil {
return err
}
if _, err := io.ReadFull(r, num); err != nil {
return err
}
value := make([]byte, int(binary.LittleEndian.Uint64(num)))
if _, err := io.ReadFull(r, value); err != nil {
return err
}
if len(key) == 0 || key[0] != 'k' {
// do not accept keys that do not start with 'k'
continue
}
key = key[1:]
cmd = cmd[:0]
cmd = append(cmd, "*3\r\n$3\r\nSET\r\n$"...)
cmd = strconv.AppendInt(cmd, int64(len(key)), 10)
cmd = append(cmd, '\r', '\n')
cmd = append(cmd, key...)
cmd = append(cmd, '\r', '\n', '$')
cmd = strconv.AppendInt(cmd, int64(len(value)), 10)
cmd = append(cmd, '\r', '\n')
cmd = append(cmd, value...)
cmd = append(cmd, '\r', '\n')
if _, err := wr.Write(cmd); err != nil {
return err
}
}
err = gzr.Close()
gzclosed = true
return err
}
// Snapshot writes a snapshot of the database to wr, which implements io.Writer.
func (kvm *Machine) Snapshot(wr io.Writer) error {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
gzw := gzip.NewWriter(wr)
err := kvm.db.Fold(func(key []byte) error {
var buf []byte
value, err := kvm.db.Get(key)
if err != nil {
return err
}
num := make([]byte, 8)
binary.LittleEndian.PutUint64(num, uint64(len(key)))
buf = append(buf, num...)
buf = append(buf, key...)
binary.LittleEndian.PutUint64(num, uint64(len(value)))
buf = append(buf, num...)
buf = append(buf, value...)
if _, err := gzw.Write(buf); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return gzw.Close()
}
func (kvm *Machine) cmdSet(
m finn.Applier, conn redcon.Conn, cmd redcon.Command,
) (interface{}, error) {
if len(cmd.Args) != 3 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
return nil, kvm.db.Put(cmd.Args[1], cmd.Args[2])
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}
func (kvm *Machine) cmdEcho(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 2 {
return nil, finn.ErrWrongNumberOfArguments
}
conn.WriteBulk(cmd.Args[1])
return nil, nil
}
func (kvm *Machine) cmdGet(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 2 {
return nil, finn.ErrWrongNumberOfArguments
}
key := cmd.Args[1]
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
value, err := kvm.db.Get(key)
if err != nil {
if err == bitcask.ErrKeyNotFound {
conn.WriteNull()
return nil, nil
}
return nil, err
}
conn.WriteBulk(value)
return nil, nil
},
)
}
func (kvm *Machine) cmdDel(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
var startIdx = 1
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
var n int
for i := startIdx; i < len(cmd.Args); i++ {
key := cmd.Args[i]
err := kvm.db.Delete(key)
if err != nil {
return 0, err
}
n++
}
return n, nil
},
func(v interface{}) (interface{}, error) {
n := v.(int)
conn.WriteInt(n)
return nil, nil
},
)
}
func (kvm *Machine) cmdKeys(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 2 {
return nil, errWrongNumberOfArguments
}
pattern := string(cmd.Args[1])
scanPattern := trimPattern(pattern)
return m.Apply(conn, cmd, nil,
func(interface{}) (interface{}, error) {
kvm.mu.RLock()
defer kvm.mu.RUnlock()
var keys [][]byte
err := kvm.db.Scan([]byte(scanPattern), func(key []byte) error {
if ok, _ := filepath.Match(pattern, string(key)); ok {
keys = append(keys, []byte(key))
}
return nil
})
if err != nil {
return nil, err
}
conn.WriteArray(len(keys))
for i := 0; i < len(keys); i++ {
conn.WriteBulk(keys[i])
}
return nil, nil
},
)
}
func (kvm *Machine) cmdFlushdb(m finn.Applier, conn redcon.Conn, cmd redcon.Command) (interface{}, error) {
if len(cmd.Args) != 1 {
return nil, finn.ErrWrongNumberOfArguments
}
return m.Apply(conn, cmd,
func() (interface{}, error) {
kvm.mu.Lock()
defer kvm.mu.Unlock()
if err := kvm.db.Sync(); err != nil {
panic(err.Error())
}
return nil, nil
},
func(v interface{}) (interface{}, error) {
conn.WriteString("OK")
return nil, nil
},
)
}