Skip to content

Commit

Permalink
Add property-based journal tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalloc committed Mar 19, 2024
1 parent c7a59ce commit 9b6803d
Show file tree
Hide file tree
Showing 12 changed files with 484 additions and 155 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/artifacts/
/.makefiles/
*.fail
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ The format is based on [Keep a Changelog], and this project adheres to
[Keep a Changelog]: https://keepachangelog.com/en/1.0.0/
[Semantic Versioning]: https://semver.org/spec/v2.0.0.html

## [Unreleased]

### Added

- Added property-based tests for journal implementations using
[`rapid`](https://github.com/flyingmutant/rapid), which uncovered the bugs
that are described below.

### Fixed

- Fixed issue with `memoryjournal` `Range()` implementation that would pass the
wrong position to the `RangeFunc` after the journal was truncated.
- Fixed issue with `dynamojournal` `Bounds()` implementation that could
potentially report the wrong lower bound after truncating (as of 0.9.0).

### Changed

- `dynamojournal` now cleans up (hard-deletes) records that have been marked as
truncated when scanning for lower bounds.

## [0.9.0] - 2024-03-18

### Added
Expand Down
275 changes: 155 additions & 120 deletions driver/aws/dynamojournal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"strconv"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/dogmatiq/persistencekit/driver/aws/internal/awsx"
Expand All @@ -33,85 +32,100 @@ type journ struct {
}

func (j *journ) Bounds(ctx context.Context) (begin, end journal.Position, err error) {
for {
// First we look for the most recent record by scanning the table
// backwards.
*j.boundsQueryRequest.ScanIndexForward = false

// We want to include all records, including those that have been
// truncated so that we can detect if the journal is empty.
j.boundsQueryRequest.FilterExpression = nil

out, err := awsx.Do(
ctx,
j.Client.Query,
j.OnRequest,
&j.boundsQueryRequest,
)
if err != nil {
return 0, 0, fmt.Errorf("unable to query last journal record: %w", err)
}
end, empty, err := j.upperBound(ctx)
if err != nil || empty {
return end, end, err
}

// There are no records at all, so the journal is truly empty.
if len(out.Items) == 0 {
return 0, 0, nil
}
begin, err = j.lowerBound(ctx, end)
return begin, end, err
}

end, err = parsePosition(out.Items[0])
if err != nil {
return 0, 0, err
}
// upperBound returns the (exclusive) upper bound of the records in the journal.
//
// If empty is true, the journal is either truly empty or all records have been
// truncated, and therefore the bounds are [end, end).
func (j *journ) upperBound(ctx context.Context) (end journal.Position, empty bool, err error) {
*j.boundsQueryRequest.ScanIndexForward = false

// The [begin, end) range is half-open, so the end position is the one
// AFTER the most recent record.
end++
empty = true

truncated, err := isTruncated(out.Items[0])
if err != nil {
return 0, 0, err
}
if err := dynamox.Range(
ctx,
j.Client,
j.OnRequest,
&j.boundsQueryRequest,
func(ctx context.Context, item map[string]types.AttributeValue) (bool, error) {
pos, err := unmarshalPosition(item)
if err != nil {
return false, err
}

// If the most recent record has been truncated, the journal is
// effectively empty with bounds of [end, end).
if truncated {
return end, end, nil
}
// The [begin, end) range is half-open, so the end position is the
// one AFTER the most recent record.
end = pos + 1

// We know there is at least one non-truncated record, so now we search
// for the oldest non-truncated record to find the lower bound.
*j.boundsQueryRequest.ScanIndexForward = true
j.boundsQueryRequest.FilterExpression = aws.String(`attribute_not_exists(#T)`)

out, err = awsx.Do(
ctx,
j.Client.Query,
j.OnRequest,
&j.boundsQueryRequest,
)
if err != nil {
return 0, 0, fmt.Errorf("unable to query first journal record: %w", err)
}
// If the most recent record has been truncated, the journal is
// effectively empty with bounds of [end, end).
empty, err = isTruncated(item)
return false, err
},
); err != nil {
return 0, false, fmt.Errorf("unable to query last journal record: %w", err)
}

// We were expecting to find a record, because we know that the most
// recent record exists and is not truncated. If we find nothing it
// likely means that the journal has been truncated at some point in
// between the two queries we've made. We'll retry the whole process to
// ensure we get a consistent result.
if len(out.Items) == 0 {
continue
}
return end, empty, nil
}

begin, err = parsePosition(out.Items[0])
if err != nil {
return 0, 0, err
}
// lowerBound returns the (inclusive) lower bound of the records in the journal.
func (j *journ) lowerBound(ctx context.Context, end journal.Position) (begin journal.Position, err error) {
*j.boundsQueryRequest.ScanIndexForward = true

if err := dynamox.Range(
ctx,
j.Client,
j.OnRequest,
&j.boundsQueryRequest,
func(ctx context.Context, item map[string]types.AttributeValue) (bool, error) {
pos, err := unmarshalPosition(item)
if err != nil {
return false, err
}

truncated, err := isTruncated(item)
if err != nil {
return false, err
}

// If we found a non-truncated record, it must be the lower bound.
if !truncated {
begin = pos
return false, nil
}

return begin, end, nil
// Otherwise, we found a truncated record.
//
// If it's the same record we used to identify the upper bound then
// all records are truncated.
if pos+1 == end {
begin = end
return false, nil
}

// Otherwise we've found a record that has been marked as truncated,
// but we know there are records after it, so we clean it up as we
// go.
return true, j.deleteRecord(ctx, pos)
},
); err != nil {
return 0, fmt.Errorf("unable to query first journal record: %w", err)
}

return begin, nil
}

func (j *journ) Get(ctx context.Context, pos journal.Position) ([]byte, error) {
j.position.Value = formatPosition(pos)
j.position.Value = marshalPosition(pos)

out, err := awsx.Do(
ctx,
Expand Down Expand Up @@ -147,58 +161,48 @@ func (j *journ) Range(
begin journal.Position,
fn journal.BinaryRangeFunc,
) error {
j.rangeQueryRequest.ExclusiveStartKey = nil
j.position.Value = formatPosition(begin)

j.position.Value = marshalPosition(begin)
expectPos := begin

for {
out, err := awsx.Do(
ctx,
j.Client.Query,
j.OnRequest,
&j.rangeQueryRequest,
)
if err != nil {
return fmt.Errorf("unable to query journal records: %w", err)
}

for _, item := range out.Items {
pos, err := parsePosition(item)
err := dynamox.Range(
ctx,
j.Client,
j.OnRequest,
&j.rangeQueryRequest,
func(ctx context.Context, item map[string]types.AttributeValue) (bool, error) {
pos, err := unmarshalPosition(item)
if err != nil {
return err
return false, err
}

if pos != expectPos {
return journal.ErrNotFound
return false, journal.ErrNotFound
}

expectPos++

rec, err := dynamox.AttrAs[*types.AttributeValueMemberB](item, recordAttr)
if err != nil {
return err
return false, err
}

ok, err := fn(ctx, pos, rec.Value)
if !ok || err != nil {
return err
}
}

if out.LastEvaluatedKey == nil {
if expectPos == begin {
return journal.ErrNotFound
}
return nil
}
return fn(ctx, pos, rec.Value)
},
)

j.rangeQueryRequest.ExclusiveStartKey = out.LastEvaluatedKey
if err == journal.ErrNotFound {
return err
} else if err != nil {
return fmt.Errorf("unable to range over journal records: %w", err)
} else if expectPos == begin {
return journal.ErrNotFound
}

return nil
}

func (j *journ) Append(ctx context.Context, end journal.Position, rec []byte) error {
j.position.Value = formatPosition(end)
j.position.Value = marshalPosition(end)
j.record.Value = rec

if _, err := awsx.Do(
Expand Down Expand Up @@ -227,23 +231,13 @@ func (j *journ) Truncate(ctx context.Context, end journal.Position) error {
}

for pos := begin; pos < end; pos++ {
j.position.Value = formatPosition(pos)
j.position.Value = marshalPosition(pos)

var err error
if pos+1 == actualEnd {
_, err = awsx.Do(
ctx,
j.Client.UpdateItem,
j.OnRequest,
&j.truncateRequest,
)
err = j.markRecordAsTruncated(ctx, pos)
} else {
_, err = awsx.Do(
ctx,
j.Client.DeleteItem,
j.OnRequest,
&j.deleteRequest,
)
err = j.deleteRecord(ctx, pos)
}

if err != nil {
Expand All @@ -254,12 +248,45 @@ func (j *journ) Truncate(ctx context.Context, end journal.Position) error {
return nil
}

func (j *journ) markRecordAsTruncated(ctx context.Context, pos journal.Position) error {
j.position.Value = marshalPosition(pos)

if _, err := awsx.Do(
ctx,
j.Client.UpdateItem,
j.OnRequest,
&j.truncateRequest,
); err != nil {
return fmt.Errorf("unable to mark journal record as truncated: %w", err)
}

return nil
}

func (j *journ) deleteRecord(ctx context.Context, pos journal.Position) error {
j.position.Value = marshalPosition(pos)

if _, err := awsx.Do(
ctx,
j.Client.DeleteItem,
j.OnRequest,
&j.deleteRequest,
); err != nil {
return fmt.Errorf("unable to delete journal record: %w", err)
}

return nil
}

func (j *journ) Close() error {
return nil
}

// parsePosition parses the position attribute in the given item.
func parsePosition(item map[string]types.AttributeValue) (journal.Position, error) {
func marshalPosition(pos journal.Position) string {
return strconv.FormatUint(uint64(pos), 10)
}

func unmarshalPosition(item map[string]types.AttributeValue) (journal.Position, error) {
attr, err := dynamox.AttrAs[*types.AttributeValueMemberN](item, positionAttr)
if err != nil {
return 0, err
Expand All @@ -273,11 +300,19 @@ func parsePosition(item map[string]types.AttributeValue) (journal.Position, erro
return journal.Position(pos), nil
}

func formatPosition(pos journal.Position) string {
return strconv.FormatUint(uint64(pos), 10)
}

func isTruncated(item map[string]types.AttributeValue) (bool, error) {
t, ok, err := dynamox.TryAttrAs[*types.AttributeValueMemberBOOL](item, truncatedAttr)
return ok && t.Value, err
if err != nil {
return false, err
}

if !ok {
return false, nil
}

if t.Value {
return true, nil
}

return false, errors.New("item is corrupt: truncated attribute is set to false, should be removed")
}
Loading

0 comments on commit 9b6803d

Please sign in to comment.