Skip to content

Commit

Permalink
xattrs: use unix flocks when writing multiple xattrs (#2528)
Browse files Browse the repository at this point in the history
* xattrs: use unix flocks when writing multiple xattrs

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

* Implement locking of files when setting and reading xattrs

To lock, a file with the ending .flock is created beside the to
be locked node (which also can be a directory) which is locked
with a Lock. Other ocis processes respect that.

* Improve error handling of the xattr functions handling multiple data.

* Move file lock functions to their own module.

* Fix doc comments of exported funcs

* Return err instead of nil to get error from the defer func.

Co-authored-by: David Christofas <[email protected]>

* Wait if a file is locked and make up to ten attempts.

* start for loop with i=1 to have a proper factor for the wait period.

* return error

Co-authored-by: David Christofas <[email protected]>

Co-authored-by: Klaas Freitag <[email protected]>
Co-authored-by: David Christofas <[email protected]>
Co-authored-by: David Christofas <[email protected]>
  • Loading branch information
4 people authored Feb 18, 2022
1 parent 2c8ec9c commit 6901ae6
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 28 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/flock-xattr-set-multiple.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: use an exclcusive write lock when writing multiple attributes

The xattr package can use an exclusive write lock when writing multiple extended attributes

https://github.com/cs3org/reva/pull/2528
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/go-openapi/errors v0.20.1 // indirect
github.com/go-openapi/strfmt v0.20.2 // indirect
github.com/go-sql-driver/mysql v1.6.0
github.com/gofrs/flock v0.8.1
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/golang/protobuf v1.5.2
github.com/gomodule/redigo v1.8.8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6Wezm
github.com/gobwas/ws v1.0.4/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
Expand Down
158 changes: 130 additions & 28 deletions pkg/storage/utils/decomposedfs/xattrs/xattrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"strings"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/storage/utils/filelocks"
"github.com/gofrs/flock"
"github.com/pkg/errors"
"github.com/pkg/xattr"
)

Expand Down Expand Up @@ -112,27 +115,73 @@ func refFromCS3(b []byte) (*provider.Reference, error) {

// CopyMetadata copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
func CopyMetadata(s, t string, filter func(attributeName string) bool) error {
var attrs []string
var err error
if attrs, err = xattr.List(s); err != nil {
return err
// For the source file, a shared lock is acquired. For the target, an exclusive
// write lock is acquired.
func CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) {
var writeLock, readLock *flock.Flock

// Acquire the write log on the target node first.
writeLock, err = filelocks.AcquireWriteLock(target)

if err != nil {
return errors.Wrap(err, "xattrs: Unable to lock target to write")
}
defer func() {
rerr := filelocks.ReleaseLock(writeLock)

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// now try to get a shared lock on the source
readLock, err = filelocks.AcquireReadLock(src)

if err != nil {
return errors.Wrap(err, "xattrs: Unable to lock file for read")
}
defer func() {
rerr := filelocks.ReleaseLock(readLock)

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// both locks are established. Copy.
var attrNameList []string
if attrNameList, err = xattr.List(src); err != nil {
return errors.Wrap(err, "Can not get xattr listing on src")
}
for i := range attrs {
if filter == nil || filter(attrs[i]) {
b, err := xattr.Get(s, attrs[i])
if err != nil {
return err

// error handling: We count errors of reads or writes of xattrs.
// if there were any read or write errors an error is returned.
var xerrs int = 0
var xerr error
for idx := range attrNameList {
attrName := attrNameList[idx]
if filter == nil || filter(attrName) {
var attrVal []byte
if attrVal, xerr = xattr.Get(src, attrName); xerr != nil {
xerrs++
}
if err := xattr.Set(t, attrs[i], b); err != nil {
return err
if xerr = xattr.Set(target, attrName, attrVal); xerr != nil {
xerrs++
}
}
}
return nil
if xerrs > 0 {
err = errors.Wrap(xerr, "Failed to copy all xattrs, last error returned.")
}

return err
}

// Set an extended attribute key to the given value
// No file locking is involved here as writing a single xattr is
// considered to be atomic.
func Set(filePath string, key string, val string) error {

if err := xattr.Set(filePath, key, []byte(val)); err != nil {
Expand All @@ -142,21 +191,48 @@ func Set(filePath string, key string, val string) error {
}

// SetMultiple allows setting multiple key value pairs at once
// TODO the changes are protected with an flock
func SetMultiple(filePath string, attribs map[string]string) error {
// the changes are protected with an file lock
// If the file lock can not be acquired the function returns a
// lock error.
func SetMultiple(filePath string, attribs map[string]string) (err error) {

// FIXME: Lock here
// h, err := lockedfile.OpenFile(filePath, os.O_WRONLY, 0) // 0? Open File only workn for files ... but we want to lock dirs ... or symlinks
// or we append .lock to the file and use https://github.com/gofrs/flock
var fileLock *flock.Flock
fileLock, err = filelocks.AcquireWriteLock(filePath)

if err != nil {
return errors.Wrap(err, "xattrs: Can not acquire write log")
}
defer func() {
rerr := filelocks.ReleaseLock(fileLock)

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// error handling: Count if there are errors while setting the attribs.
// if there were any, return an error.
var xerrs int = 0
var xerr error
for key, val := range attribs {
if err := Set(filePath, key, val); err != nil {
return err
if xerr = xattr.Set(filePath, key, []byte(val)); xerr != nil {
// log
xerrs++
}
}
return nil
if xerrs > 0 {
err = errors.Wrap(xerr, "Failed to set all xattrs")
}
return err
}

// Get an extended attribute value for the given key
// No file locking is involved here as reading a single xattr is
// considered to be atomic.
func Get(filePath, key string) (string, error) {

v, err := xattr.Get(filePath, key)
if err != nil {
return "", err
Expand All @@ -178,21 +254,47 @@ func GetInt64(filePath, key string) (int64, error) {
return v, nil
}

// All reads all extended attributes for a node
func All(filePath string) (map[string]string, error) {
// All reads all extended attributes for a node, protected by a
// shared file lock
func All(filePath string) (attribs map[string]string, err error) {
var fileLock *flock.Flock

fileLock, err = filelocks.AcquireReadLock(filePath)

if err != nil {
return nil, errors.Wrap(err, "xattrs: Unable to lock file for read")
}
defer func() {
rerr := filelocks.ReleaseLock(fileLock)

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

attrNames, err := xattr.List(filePath)
if err != nil {
return nil, err
}

attribs := make(map[string]string, len(attrNames))
var xerrs int = 0
var xerr error
// error handling: Count if there are errors while reading all attribs.
// if there were any, return an error.
attribs = make(map[string]string, len(attrNames))
for _, name := range attrNames {
val, err := xattr.Get(filePath, name)
if err != nil {
return nil, err
var val []byte
if val, xerr = xattr.Get(filePath, name); xerr != nil {
xerrs++
} else {
attribs[name] = string(val)
}
attribs[name] = string(val)
}

return attribs, nil
if xerrs > 0 {
err = errors.Wrap(xerr, "Failed to read all xattrs")
}

return attribs, err
}
101 changes: 101 additions & 0 deletions pkg/storage/utils/filelocks/filelocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package filelocks

import (
"errors"
"os"
"time"

"github.com/gofrs/flock"
)

// FlockFile returns the flock filename for a given file name
// it returns an empty string if the input is empty
func FlockFile(file string) string {
var n string
if len(file) > 0 {
n = file + ".flock"
}
return n
}

// acquireWriteLog acquires a lock on a file or directory.
// if the parameter write is true, it gets an exclusive write lock, otherwise a shared read lock.
// The function returns a Flock object, unlocking has to be done in the calling function.
func acquireLock(file string, write bool) (*flock.Flock, error) {
var err error

// Create a file to carry the log
n := FlockFile(file)
if len(n) == 0 {
return nil, errors.New("lock path is empty")
}
// Acquire the write log on the target node first.
lock := flock.New(n)

var ok bool
for i := 1; i <= 10; i++ {
if write {
ok, err = lock.TryLock()
} else {
ok, err = lock.TryRLock()
}

if ok {
break
}

time.Sleep(time.Duration(i*3) * time.Millisecond)
}

if !ok {
err = errors.New("could not acquire lock after wait")
}

if err != nil {
return nil, err
}
return lock, nil
}

// AcquireReadLock tries to acquire a shared lock to read from the
// file and returns a lock object or an error accordingly.
func AcquireReadLock(file string) (*flock.Flock, error) {
return acquireLock(file, false)
}

// AcquireWriteLock tries to acquire a shared lock to write from the
// file and returns a lock object or an error accordingly.
func AcquireWriteLock(file string) (*flock.Flock, error) {
return acquireLock(file, true)
}

// ReleaseLock releases a lock from a file that was previously created
// by AcquireReadLock or AcquireWriteLock.
func ReleaseLock(lock *flock.Flock) error {
// there is a probability that if the file can not be unlocked,
// we also can not remove the file. We will only try to remove if it
// was successfully unlocked.
err := lock.Unlock()
if err == nil {
err = os.Remove(lock.Path())
}
return err
}

0 comments on commit 6901ae6

Please sign in to comment.