Skip to content

Commit

Permalink
Refactor x/lockfile into dbnode/server (#2862)
Browse files Browse the repository at this point in the history
* Refactor x/lockfile into dbnode/server

* Remove scripts/lockfile in favor of test case

* Convert lockfile APIs / struct to private

* Linting fixes + feedback

* Add Lockfile suffix to lockfile APIs

* Revert go mod changes
  • Loading branch information
wesleyk authored Nov 9, 2020
1 parent aa361f4 commit 5a40a30
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 98 deletions.
65 changes: 0 additions & 65 deletions scripts/lockfile/lockfile.go

This file was deleted.

24 changes: 12 additions & 12 deletions src/x/lockfile/lockfile.go → src/dbnode/server/lockfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package lockfile
package server

import (
"os"
Expand All @@ -28,15 +28,15 @@ import (
"golang.org/x/sys/unix"
)

// Lockfile represents an acquired lockfile.
type Lockfile struct {
// lockfile represents an acquired lockfile.
type lockfile struct {
file os.File
}

// Acquire creates the given file path if it doesn't exist and
// acquireLockfile creates the given file path if it doesn't exist and
// obtains an exclusive lock on it. An error is returned if the lock
// has been obtained by another process.
func Acquire(path string) (*Lockfile, error) {
func acquireLockfile(path string) (*lockfile, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
return nil, errors.Wrap(err, "failed opening lock path")
Expand All @@ -51,23 +51,23 @@ func Acquire(path string) (*Lockfile, error) {
return nil, errors.Wrap(err, "failed obtaining lock")
}

lf := Lockfile{*file}
lf := lockfile{*file}

return &lf, nil
}

// CreateAndAcquire creates any non-existing directories needed to
// create the lock file, then acquires a lock on it
func CreateAndAcquire(path string, newDirMode os.FileMode) (*Lockfile, error) {
// createAndAcquireLockfile creates any non-existing directories needed to
// create the lock file, then acquires a lock on it.
func createAndAcquireLockfile(path string, newDirMode os.FileMode) (*lockfile, error) {
if err := os.MkdirAll(paths.Dir(path), newDirMode); err != nil {
return nil, err
}

return Acquire(path)
return acquireLockfile(path)
}

// Release releases the lock on the file and removes the file.
func (lf Lockfile) Release() error {
// releaseLockfile releases the lock on the file and removes the file.
func (lf lockfile) releaseLockfile() error {
ft := &unix.Flock_t{
Pid: int32(os.Getpid()),
Type: unix.F_UNLCK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package lockfile
package server

import (
"fmt"
"io/ioutil"
"math/rand"
"os"
Expand All @@ -29,39 +30,38 @@ import (
"path/filepath"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestAcquire(t *testing.T) {
t.Run("process B can obtain the lock after A exits", func(t *testing.T) {
path := tempPath()
assert.NoError(t, newLockfileCommand(path, 0, true).Run())
assert.NoError(t, newLockfileCommand(path, "", true).Run())
_, err := os.Stat(path)
assert.True(t, os.IsNotExist(err)) // check temp file was removed
assert.NoError(t, newLockfileCommand(path, 0, true).Run())
assert.NoError(t, newLockfileCommand(path, "", true).Run())
})

t.Run("process B can obtain the lock after A exits, even if A didn't remove the lock file", func(t *testing.T) {
path := tempPath()
assert.NoError(t, newLockfileCommand(path, 0, false).Run())
assert.NoError(t, newLockfileCommand(path, "", false).Run())
_, err := os.Stat(path)
assert.False(t, os.IsNotExist(err)) // check temp file was *not* removed
assert.NoError(t, newLockfileCommand(path, 0, true).Run())
assert.NoError(t, newLockfileCommand(path, "", true).Run())
})

t.Run("if process A holds the lock, B must not be able to obtain it", func(t *testing.T) {
path := tempPath()

procA := newLockfileCommand(path, 1, true)
procB := newLockfileCommand(path, 1, true)
procA := newLockfileCommand(path, "1s", false)
procB := newLockfileCommand(path, "1s", false)

// to avoid sleeping until A obtains the lock (it takes some
// time for the process to boot and obtain the lock), we start
// both processes, then check exactly one of them failed
assert.NoError(t, procA.Start())
assert.NoError(t, procB.Start())

// one process will acquireLockfile and hold the lock, and the other will fail to acquireLockfile.
errA, errB := procA.Wait(), procB.Wait()

if errA != nil {
Expand All @@ -79,25 +79,74 @@ func TestCreateAndAcquire(t *testing.T) {

tempSubDir := path.Join(tempDir, "testDir")

lock, err := CreateAndAcquire(path.Join(tempSubDir, "testLockfile"), os.ModePerm)
lock, err := createAndAcquireLockfile(path.Join(tempSubDir, "testLockfile"), os.ModePerm)
assert.NoError(t, err)
err = lock.Release()
err = lock.releaseLockfile()
assert.NoError(t, err)

// check CreateAndAcquire() created the missing directory
// check createAndAcquireLockfile() created the missing directory
_, err = os.Stat(tempSubDir)
assert.False(t, os.IsNotExist(err))
}

// TestAcquireAndReleaseFile is invoked as a separate process by other tests in lockfile_test.go
// to exercise the file locking capabilities. The test is a no-op if run as part
// of the broader test suite. Given it's run as a separate process, we explicitly use error
// exit codes as opposed to failing assertions on errors
func TestAcquireAndReleaseFile(t *testing.T) {
// immediately return if this test wasn't invoked by another test in the
// nolint: goconst
if os.Getenv("LOCKFILE_SUPERVISED_PROCESS") != "true" {
t.Skip()
}

var (
lockPath = os.Getenv("WITH_LOCK_PATH")
removeLock = os.Getenv("WITH_REMOVE_LOCK")
sleepDuration = os.Getenv("WITH_SLEEP_DURATION")
)

lock, err := acquireLockfile(lockPath)
if err != nil {
os.Exit(1)
}

if sleepDuration != "" {
duration, err := time.ParseDuration(sleepDuration)
if err != nil {
os.Exit(1)
}

time.Sleep(duration)
}

if removeLock == "true" {
err := lock.releaseLockfile()
if err != nil {
os.Exit(1)
}
}
}

func tempPath() string {
return filepath.Join(os.TempDir(), "lockfile_test_"+strconv.Itoa(os.Getpid())+"_"+strconv.Itoa(rand.Intn(100000)))
}

func newLockfileCommand(lockPath string, sleep int, removeLock bool) *exec.Cmd {
removeLockStr := "0"
func newLockfileCommand(lockPath string, sleepDuration string, removeLock bool) *exec.Cmd {
removeLockStr := "false"
if removeLock {
removeLockStr = "1"
removeLockStr = "true"
}

return exec.Command("go", "run", "../../../scripts/lockfile/lockfile.go", lockPath, strconv.Itoa(sleep), removeLockStr)
cmd := exec.Command("go", "test", "-run", "TestAcquireAndReleaseFile")
cmd.Env = os.Environ()
cmd.Env = append(
cmd.Env,
"LOCKFILE_SUPERVISED_PROCESS=true",
fmt.Sprintf("WITH_LOCK_PATH=%s", lockPath),
fmt.Sprintf("WITH_SLEEP_DURATION=%s", sleepDuration),
fmt.Sprintf("WITH_REMOVE_LOCK=%s", removeLockStr),
)

return cmd
}
9 changes: 5 additions & 4 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Package server contains the code to run the dbnode server.
package server

import (
Expand Down Expand Up @@ -83,7 +84,6 @@ import (
xdocs "github.com/m3db/m3/src/x/docs"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/lockfile"
"github.com/m3db/m3/src/x/mmap"
xos "github.com/m3db/m3/src/x/os"
"github.com/m3db/m3/src/x/pool"
Expand Down Expand Up @@ -222,11 +222,12 @@ func Run(runOpts RunOptions) {
// file will remain on the file system. When a dbnode starts after an ungracefully stop,
// it will be able to acquire the lock despite the fact the the lock file exists.
lockPath := path.Join(cfg.Filesystem.FilePathPrefixOrDefault(), filePathPrefixLockFile)
fslock, err := lockfile.CreateAndAcquire(lockPath, newDirectoryMode)
fslock, err := createAndAcquireLockfile(lockPath, newDirectoryMode)
if err != nil {
logger.Fatal("could not acquire lock", zap.String("path", lockPath), zap.Error(err))
logger.Fatal("could not acqurie lock", zap.String("path", lockPath), zap.Error(err))
}
defer fslock.Release()
// nolint: errcheck
defer fslock.releaseLockfile()

go bgValidateProcessLimits(logger)
debug.SetGCPercent(cfg.GCPercentageOrDefault())
Expand Down

0 comments on commit 5a40a30

Please sign in to comment.