Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

return error if the vlog writes exceeds more that 4GB. #1400

Merged
merged 5 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ import (
"golang.org/x/net/trace"
)

// maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
// uint32, so limiting at max uint32.
var maxVlogFileSize = math.MaxUint32

// Values have their first byte being byteData or byteDelete. This helps us distinguish between
// a key that has never been seen and a key that has been explicitly deleted.
const (
Expand Down Expand Up @@ -1364,11 +1368,52 @@ func (vlog *valueLog) woffset() uint32 {
return atomic.LoadUint32(&vlog.writableLogOffset)
}

// validateWrites will check whether the given requests can fit into 4GB vlog file.
// NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
// uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
func (vlog *valueLog) validateWrites(reqs []*request) error {
vlogOffset := uint64(vlog.woffset())
for _, req := range reqs {
// calculate size of the request.
size := estimateRequestSize(req)
estimatedVlogOffset := vlogOffset + size
if estimatedVlogOffset > uint64(maxVlogFileSize) {
return errors.Errorf("Request size offset %d is bigger than maximum offset %d",
estimatedVlogOffset, maxVlogFileSize)
}

if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
// We'll create a new vlog file if the estimated offset is greater or equal to
// max vlog size. So, resetting the vlogOffset.
vlogOffset = 0
continue
}
// Estimated vlog offset will become current vlog offset if the vlog is not rotated.
vlogOffset = estimatedVlogOffset
}
return nil
}

// estimateRequestSize returns the size that needed to be written for the given request.
func estimateRequestSize(req *request) uint64 {
size := uint64(0)
for _, e := range req.Entries {
size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
}
return size
}

// write is thread-unsafe by design and should not be called concurrently.
func (vlog *valueLog) write(reqs []*request) error {
if vlog.db.opt.InMemory {
return nil
}
// Validate writes before writing to vlog. Because, we don't want to partially write and return
// an error.
if err := vlog.validateWrites(reqs); err != nil {
return err
}

vlog.filesLock.RLock()
maxFid := vlog.maxFid
curlf := vlog.filesMap[maxFid]
Expand Down
62 changes: 62 additions & 0 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"reflect"
Expand Down Expand Up @@ -1237,3 +1238,64 @@ func TestValueEntryChecksum(t *testing.T) {
require.NoError(t, db.Close())
})
}

func TestValidateWrite(t *testing.T) {
// Mocking the file size, so that we don't allocate big memory while running test.
maxVlogFileSize = 400
defer func() {
maxVlogFileSize = math.MaxUint32
}()

bigBuf := make([]byte, maxVlogFileSize+1)
log := &valueLog{
opt: DefaultOptions("."),
}

// Sending a request with big values which will overflow uint32.
key := []byte("HelloKey")
req := &request{
Entries: []*Entry{
{
Key: key,
Value: bigBuf,
},
{
Key: key,
Value: bigBuf,
},
{
Key: key,
Value: bigBuf,
},
},
}

err := log.validateWrites([]*request{req})
require.Error(t, err)

// Testing with small values.
smallBuf := make([]byte, 4)
req1 := &request{
Entries: []*Entry{
{
Key: key,
Value: smallBuf,
},
{
Key: key,
Value: smallBuf,
},
{
Key: key,
Value: smallBuf,
},
},
}

err = log.validateWrites([]*request{req1})
require.NoError(t, err)

// Batching small and big request.
err = log.validateWrites([]*request{req1, req})
require.Error(t, err)
}