Use package github.com/gofrs/flock as flock implementation. (#224)

Supercesd #219 after rebasing on master after migrating off Github.

Co-authored-by: Nicolò Santamaria <nicolo.santamaria@protonmail.com>
Co-authored-by: James Mills <prologic@shortcircuit.net.au>
Co-authored-by: Tai Groot <taigrr@noreply@mills.io>
Reviewed-on: https://git.mills.io/prologic/bitcask/pulls/224
Co-authored-by: James Mills <prologic@noreply@mills.io>
Co-committed-by: James Mills <prologic@noreply@mills.io>
This commit is contained in:
James Mills 2021-07-15 21:33:20 +00:00
parent a49bbf666a
commit 5e4d863ab7
8 changed files with 16 additions and 579 deletions

@ -13,8 +13,10 @@ import (
"sync"
"time"
"github.com/gofrs/flock"
art "github.com/plar/go-adaptive-radix-tree"
"git.mills.io/prologic/bitcask/flock"
log "github.com/sirupsen/logrus"
"git.mills.io/prologic/bitcask/internal"
"git.mills.io/prologic/bitcask/internal/config"
"git.mills.io/prologic/bitcask/internal/data"
@ -22,7 +24,6 @@ import (
"git.mills.io/prologic/bitcask/internal/index"
"git.mills.io/prologic/bitcask/internal/metadata"
"git.mills.io/prologic/bitcask/scripts/migrations"
log "github.com/sirupsen/logrus"
)
const (
@ -68,10 +69,8 @@ var (
// and in-memory hash of key/value pairs as per the Bitcask paper and seen
// in the Riak database.
type Bitcask struct {
mu sync.RWMutex
*flock.Flock
mu sync.RWMutex
flock *flock.Flock
config *config.Config
options []Option
path string
@ -114,7 +113,7 @@ func (b *Bitcask) Close() error {
b.mu.RLock()
defer func() {
b.mu.RUnlock()
b.Flock.Unlock()
b.flock.Unlock()
}()
return b.close()
@ -669,6 +668,10 @@ func (b *Bitcask) Merge() error {
return err
}
for _, file := range files {
// see #225
if file.Name() == lockfile {
continue
}
err := os.Rename(
path.Join([]string{mdb.path, file.Name()}...),
path.Join([]string{b.path, file.Name()}...),
@ -723,7 +726,7 @@ func Open(path string, options ...Option) (*Bitcask, error) {
}
bitcask := &Bitcask{
Flock: flock.New(filepath.Join(path, lockfile)),
flock: flock.New(filepath.Join(path, lockfile)),
config: cfg,
options: options,
path: path,
@ -732,12 +735,12 @@ func Open(path string, options ...Option) (*Bitcask, error) {
metadata: meta,
}
locked, err := bitcask.Flock.TryLock()
ok, err := bitcask.flock.TryLock()
if err != nil {
return nil, err
}
if !locked {
if !ok {
return nil, ErrDatabaseLocked
}

@ -54,13 +54,6 @@ func (s *server) handleSet(cmd redcon.Command, conn redcon.Conn) {
ttl = &d
}
err := s.db.Lock()
if err != nil {
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
return
}
defer s.db.Unlock()
if ttl != nil {
if err := s.db.PutWithTTL(key, value, *ttl); err != nil {
conn.WriteString(fmt.Sprintf("ERR: %s", err))
@ -82,13 +75,6 @@ func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) {
key := cmd.Args[1]
err := s.db.Lock()
if err != nil {
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
return
}
defer s.db.Unlock()
value, err := s.db.Get(key)
if err != nil {
conn.WriteNull()
@ -98,13 +84,6 @@ func (s *server) handleGet(cmd redcon.Command, conn redcon.Conn) {
}
func (s *server) handleKeys(cmd redcon.Command, conn redcon.Conn) {
err := s.db.Lock()
if err != nil {
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
return
}
defer s.db.Unlock()
conn.WriteArray(s.db.Len())
for key := range s.db.Keys() {
conn.WriteBulk(key)
@ -119,13 +98,6 @@ func (s *server) handleExists(cmd redcon.Command, conn redcon.Conn) {
key := cmd.Args[1]
err := s.db.Lock()
if err != nil {
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
return
}
defer s.db.Unlock()
if s.db.Has(key) {
conn.WriteInt(1)
} else {
@ -141,13 +113,6 @@ func (s *server) handleDel(cmd redcon.Command, conn redcon.Conn) {
key := cmd.Args[1]
err := s.db.Lock()
if err != nil {
conn.WriteError("ERR " + fmt.Errorf("failed to lock db: %v", err).Error() + "")
return
}
defer s.db.Unlock()
if err := s.db.Delete(key); err != nil {
conn.WriteInt(0)
} else {

@ -1,97 +0,0 @@
package flock
import (
"errors"
"os"
"sync"
)
type Flock struct {
path string
m sync.Mutex
fh *os.File
}
var (
ErrAlreadyLocked = errors.New("Double lock: already own the lock")
ErrLockFailed = errors.New("Could not acquire lock")
ErrLockNotHeld = errors.New("Could not unlock, lock is not held")
ErrInodeChangedAtPath = errors.New("Inode changed at path")
)
// New returns a new instance of *Flock. The only parameter
// it takes is the path to the desired lockfile.
func New(path string) *Flock {
return &Flock{path: path}
}
// Path returns the file path linked to this lock.
func (f *Flock) Path() string {
return f.path
}
// Lock will acquire the lock. This function may block indefinitely if some other process holds the lock. For a non-blocking version, see Flock.TryLock().
func (f *Flock) Lock() error {
f.m.Lock()
defer f.m.Unlock()
if f.fh != nil {
return ErrAlreadyLocked
}
var fh *os.File
fh, err := lock_sys(f.path, false)
// treat "ErrInodeChangedAtPath" as "some other process holds the lock, retry locking"
for err == ErrInodeChangedAtPath {
fh, err = lock_sys(f.path, false)
}
if err != nil {
return err
}
if fh == nil {
return ErrLockFailed
}
f.fh = fh
return nil
}
// TryLock will try to acquire the lock, and returns immediately if the lock is already owned by another process.
func (f *Flock) TryLock() (bool, error) {
f.m.Lock()
defer f.m.Unlock()
if f.fh != nil {
return false, ErrAlreadyLocked
}
fh, err := lock_sys(f.path, true)
if err != nil {
return false, ErrLockFailed
}
f.fh = fh
return true, nil
}
// Unlock removes the lock file from disk and releases the lock.
// Whatever the result of `.Unlock()`, the caller must assume that it does not hold the lock anymore.
func (f *Flock) Unlock() error {
f.m.Lock()
defer f.m.Unlock()
if f.fh == nil {
return ErrLockNotHeld
}
err1 := rm_if_match(f.fh, f.path)
err2 := f.fh.Close()
if err1 != nil {
return err1
}
return err2
}

@ -1,121 +0,0 @@
package flock
import (
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// WARNING : this test will delete the file located at "testLockPath". Choose an adequate temporary file name.
const testLockPath = "/tmp/bitcask_unit_test_lock" // file path to use for the lock
func TestTryLock(t *testing.T) {
// test that basic locking functionalities are consistent
// make sure there is no present lock when starting this test
os.Remove(testLockPath)
assert := assert.New(t)
lock1 := New(testLockPath)
lock2 := New(testLockPath)
// 1- take the first lock
locked1, err := lock1.TryLock()
assert.True(locked1)
assert.NoError(err)
// 2- check that the second lock cannot acquire the lock
locked2, err := lock2.TryLock()
assert.False(locked2)
assert.Error(err)
// 3- release the first lock
err = lock1.Unlock()
assert.NoError(err)
// 4- check that the second lock can acquire and then release the lock without error
locked2, err = lock2.TryLock()
assert.True(locked2)
assert.NoError(err)
err = lock2.Unlock()
assert.NoError(err)
}
func TestLock(t *testing.T) {
assert := assert.New(t)
// make sure there is no present lock when starting this test
os.Remove(testLockPath)
syncChan := make(chan bool)
// main goroutine: take lock on testPath
lock := New(testLockPath)
err := lock.Lock()
assert.NoError(err)
go func() {
// sub routine:
lock := New(testLockPath)
// before entering the block '.Lock()' call, signal we are about to do it
// see below : the main goroutine will wait for a small delay before releasing the lock
syncChan <- true
// '.Lock()' should ultimately return without error :
err := lock.Lock()
assert.NoError(err)
err = lock.Unlock()
assert.NoError(err)
close(syncChan)
}()
// wait for the "ready" signal from the sub routine,
<-syncChan
// after that signal wait for a small delay before releasing the lock
<-time.After(100 * time.Microsecond)
err = lock.Unlock()
assert.NoError(err)
// wait for the sub routine to finish
<-syncChan
}
func TestErrorConditions(t *testing.T) {
// error conditions implemented in this version :
// - you can't release a lock you do not hold
// - you can't lock twice the same lock
// -- setup
assert := assert.New(t)
// make sure there is no present lock when starting this test
os.Remove(testLockPath)
lock := New(testLockPath)
// -- run tests :
err := lock.Unlock()
assert.Error(err, "you can't release a lock you do not hold")
// take the lock once:
lock.TryLock()
locked, err := lock.TryLock()
assert.False(locked)
assert.Error(err, "you can't lock twice the same lock (using .TryLock())")
err = lock.Lock()
assert.Error(err, "you can't lock twice the same lock (using .Lock())")
// -- teardown
lock.Unlock()
}

@ -1,79 +0,0 @@
// +build !aix,!windows
package flock
import (
"os"
"golang.org/x/sys/unix"
)
func lock_sys(path string, nonBlocking bool) (_ *os.File, err error) {
var fh *os.File
fh, err = os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
fh.Close()
}
}()
flag := unix.LOCK_EX
if nonBlocking {
flag |= unix.LOCK_NB
}
err = unix.Flock(int(fh.Fd()), flag)
if err != nil {
return nil, err
}
if !sameInodes(fh, path) {
return nil, ErrInodeChangedAtPath
}
return fh, nil
}
func rm_if_match(fh *os.File, path string) error {
// Sanity check :
// before running "rm", check that the file pointed at by the
// filehandle has the same inode as the path in the filesystem
//
// If this sanity check doesn't pass, store a "ErrInodeChangedAtPath" error,
// if the check passes, run os.Remove, and store the error if any.
//
// note : this sanity check is in no way atomic, but :
// - as long as only cooperative processes are involved, it will work as intended
// - it allows to avoid 99.9% the major pitfall case: "root user forcefully removed the lockfile"
if !sameInodes(fh, path) {
return ErrInodeChangedAtPath
}
return os.Remove(path)
}
func sameInodes(f *os.File, path string) bool {
// get inode from opened file f:
var fstat unix.Stat_t
err := unix.Fstat(int(f.Fd()), &fstat)
if err != nil {
return false
}
fileIno := fstat.Ino
// get inode for path on disk:
var dstat unix.Stat_t
err = unix.Stat(path, &dstat)
if err != nil {
return false
}
pathIno := dstat.Ino
return pathIno == fileIno
}

@ -1,236 +0,0 @@
package flock
// the "nd" in "nd_test.go" stands for non-deterministic
import (
"errors"
"os"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// The two tests in this file are test some concurrency scenarios :
// 1- TestRaceLock() runs several threads racing for the same lock
// 2- TestShatteredLock() runs racing racing threads, along with another threads which forcibly remove the file from disk
//
// Note that these tests are non-deterministic : the coverage produced by each test depends
// on how the runtime chooses to schedule the concurrent goroutines.
var lockerCount int64
// lockAndCount tries to take a lock on "lockpath"
// if it fails : it returns 0 and stop there
// if it succeeds :
// 1- it sets a defer function to release the lock in the same fashion as "func (b *Bitcask) Close()"
// 2- it increments the shared "lockerCount" above
// 3- it waits for a short amount of time
// 4- it decrements "lockerCount"
// 5- it returns the value it has seen at step 2.
//
// If the locking and unlocking behave as we expect them to,
// instructions 1-5 should be in a critical section,
// and the only possible value at step 2 should be "1".
//
// Returning a value > 0 indicates this function successfully acquired the lock,
// returning a value == 0 indicates that TryLock failed.
func lockAndCount(lockpath string) int64 {
lock := New(lockpath)
ok, _ := lock.TryLock()
if !ok {
return 0
}
defer func() {
lock.Unlock()
}()
x := atomic.AddInt64(&lockerCount, 1)
// emulate a workload :
<-time.After(1 * time.Microsecond)
atomic.AddInt64(&lockerCount, -1)
return x
}
// locker will call the lock function above in a loop, until one of the following holds :
// - reading from the "timeout" channel doesn't block
// - the number of calls to "lock()" that indicate the lock was successfully taken reaches "successfullLockCount"
func locker(t *testing.T, id int, lockPath string, successfulLockCount int, timeout <-chan struct{}) {
timedOut := false
failCount := 0
max := int64(0)
lockloop:
for successfulLockCount > 0 {
select {
case <-timeout:
timedOut = true
break lockloop
default:
}
x := lockAndCount(lockPath)
if x > 0 {
// if x indicates the lock was taken : decrement the counter
successfulLockCount--
}
if x > 1 {
// if x indicates an invalid value : increase the failCount and update max accordingly
failCount++
if x > max {
max = x
}
}
}
// check failure cases :
if timedOut {
t.Fail()
t.Logf("[runner %02d] timed out", id)
}
if failCount > 0 {
t.Fail()
t.Logf("[runner %02d] lockCounter was > 1 on %2.d occasions, max seen value was %2.d", id, failCount, max)
}
}
// TestRaceLock checks that no error occurs when several concurrent actors (goroutines in this case) race for the same lock.
func TestRaceLock(t *testing.T) {
// test parameters, written in code :
// you may want to tweak these values for testing
goroutines := 20 // number of concurrent "locker" goroutines to launch
successfulLockCount := 50 // how many times a "locker" will successfully take the lock before halting
// make sure there is no present lock when startng this test
os.Remove(testLockPath)
// timeout implemented in code
// (the lock acquisition depends on the behavior of the filesystem,
// avoid sending CI in endless loop if something fishy happens on the test server ...)
// tweak this value if needed ; comment out the "close(ch)" instruction below
timeout := 10 * time.Second
ch := make(chan struct{})
go func() {
<-time.After(timeout)
close(ch)
}()
wg := &sync.WaitGroup{}
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(id int) {
locker(t, id, testLockPath, successfulLockCount, ch)
wg.Done()
}(i)
}
wg.Wait()
}
func isExpectedError(err error) bool {
switch {
case err == nil:
return true
case err == ErrInodeChangedAtPath:
return true
case errors.Is(err, syscall.ENOENT):
return true
default:
return false
}
}
// TestShatteredLock runs concurrent goroutines on one lock, with an extra goroutine
// which removes the lock file from disk without checking the locks
// (e.g: a user who would run 'rm lockfile' in a loop while the program is running).
//
// In this scenario, errors may occur on .Unlock() ; this test checks that only errors
// relating to the file being deleted occur.
//
// This test additionally logs the number of errors that occurred, grouped by error message.
func TestShatteredLock(t *testing.T) {
// test parameters, written in code :
// you may want to tweak these values for testing
goroutines := 4 // number of concurrent "locker" and "remover" goroutines to launch
successfulLockCount := 10 // how many times a "locker" will successfully take the lock before halting
// make sure there is no present lock when startng this test
os.Remove(testLockPath)
assert := assert.New(t)
wg := &sync.WaitGroup{}
wg.Add(goroutines)
stopChan := make(chan struct{})
errChan := make(chan error, 10)
for i := 0; i < goroutines; i++ {
go func(id int, count int) {
for count > 0 {
lock := New(testLockPath)
ok, _ := lock.TryLock()
if !ok {
continue
}
count--
err := lock.Unlock()
if !isExpectedError(err) {
assert.Fail("goroutine %d - unexpected error: %v", id, err)
}
if err != nil {
errChan <- err
}
}
wg.Done()
}(i, successfulLockCount)
}
var wgCompanion = &sync.WaitGroup{}
wgCompanion.Add(2)
go func() {
defer wgCompanion.Done()
for {
os.Remove(testLockPath)
select {
case <-stopChan:
return
default:
}
}
}()
var errs = make(map[string]int)
go func() {
for err := range errChan {
errs[err.Error()]++
}
wgCompanion.Done()
}()
wg.Wait()
close(stopChan)
close(errChan)
wgCompanion.Wait()
for err, count := range errs {
t.Logf(" seen %d times: %s", count, err)
}
}

2
go.mod

@ -3,6 +3,7 @@ module git.mills.io/prologic/bitcask
go 1.13
require (
github.com/gofrs/flock v0.8.0
github.com/pkg/errors v0.9.1
github.com/plar/go-adaptive-radix-tree v1.0.4
github.com/sirupsen/logrus v1.8.1
@ -13,5 +14,4 @@ require (
github.com/stretchr/testify v1.7.0
github.com/tidwall/redcon v1.4.1
golang.org/x/exp v0.0.0-20200228211341-fcea875c7e85
golang.org/x/sys v0.0.0-20210510120138-977fb7262007
)

2
go.sum

@ -93,6 +93,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.8.0 h1:MSdYClljsF3PbENUUEx85nkWfJSGfzYI9yEBZOJz6CY=
github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=