Skip to content

Commit

Permalink
Parallelize the chunked dispatches in reachable resources
Browse files Browse the repository at this point in the history
This changes the datastore query to no longer have a limit, and instead produce a series of chunks for parallelized processing using the same parallelization code as the entrypoint cursor parallelization system.

This is done in service of performance: a single database query is faster than making multiple roundtrips.

This change also removes the limits on dispatching reachable resources and lookup resources in favor of grabbing as many results as possible and canceling dispatch once the overall limit has been reached. While this results in slightly more work being done than is necessary, it is far more performant from the caller's perspective (especially for no-limit calls), because there is no need to re-enter the reachable resources dispatch tree.

As per the above change around limits, this change also further adjusts how context cancelation functions and adds a slew of tests to cover these new cases
  • Loading branch information
josephschorr committed Jun 15, 2023
1 parent b086825 commit 7ffc816
Show file tree
Hide file tree
Showing 11 changed files with 985 additions and 316 deletions.
9 changes: 0 additions & 9 deletions internal/dispatch/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/authzed/spicedb/internal/dispatch/keys"
"github.com/authzed/spicedb/pkg/cache"
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
)

const (
Expand Down Expand Up @@ -224,10 +223,6 @@ func (cd *Dispatcher) DispatchExpand(ctx context.Context, req *v1.DispatchExpand
func (cd *Dispatcher) DispatchReachableResources(req *v1.DispatchReachableResourcesRequest, stream dispatch.ReachableResourcesStream) error {
cd.reachableResourcesTotalCounter.Inc()

if req.OptionalLimit == 0 {
return spiceerrors.MustBugf("a limit must be specified on reachable resources to use with the caching dispatcher")
}

requestKey, err := cd.keyHandler.ReachableResourcesCacheKey(stream.Context(), req)
if err != nil {
return err
Expand Down Expand Up @@ -296,10 +291,6 @@ func sliceSize(xs []byte) int64 {
func (cd *Dispatcher) DispatchLookupResources(req *v1.DispatchLookupResourcesRequest, stream dispatch.LookupResourcesStream) error {
cd.lookupResourcesTotalCounter.Inc()

if req.OptionalLimit == 0 {
return spiceerrors.MustBugf("a limit must be specified on lookup resources to use with the caching dispatcher")
}

requestKey, err := cd.keyHandler.LookupResourcesCacheKey(stream.Context(), req)
if err != nil {
return err
Expand Down
100 changes: 68 additions & 32 deletions internal/dispatch/graph/lookupresources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,44 +545,80 @@ func TestLookupResourcesOverSchema(t *testing.T) {
ONR("user", "tom", "..."),
genResourceIds("document", 150),
},
{
"big",
`definition user {}
definition document {
relation editor: user
relation viewer: user
permission view = viewer + editor
}`,
joinTuples(
genTuples("document", "viewer", "user", "tom", 15100),
genTuples("document", "editor", "user", "tom", 15100),
),
RR("document", "view"),
ONR("user", "tom", "..."),
genResourceIds("document", 15100),
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
require := require.New(t)

dispatcher := NewLocalOnlyDispatcher(10)

ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC)
require.NoError(err)

ds, revision := testfixtures.DatastoreFromSchemaAndTestRelationships(ds, tc.schema, tc.relationships, require)

ctx := datastoremw.ContextWithHandle(context.Background())
require.NoError(datastoremw.SetInContext(ctx, ds))

stream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResourcesResponse](ctx)
err = dispatcher.DispatchLookupResources(&v1.DispatchLookupResourcesRequest{
ObjectRelation: tc.permission,
Subject: tc.subject,
Metadata: &v1.ResolverMeta{
AtRevision: revision.String(),
DepthRemaining: 50,
},
}, stream)
require.NoError(err)

foundResourceIDs := util.NewSet[string]()
for _, result := range stream.Results() {
foundResourceIDs.Add(result.ResolvedResource.ResourceId)
for _, pageSize := range []int{0, 100, 1000} {
pageSize := pageSize
t.Run(fmt.Sprintf("ps-%d_", pageSize), func(t *testing.T) {
require := require.New(t)

dispatcher := NewLocalOnlyDispatcher(10)

ds, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC)
require.NoError(err)

ds, revision := testfixtures.DatastoreFromSchemaAndTestRelationships(ds, tc.schema, tc.relationships, require)

ctx := datastoremw.ContextWithHandle(context.Background())
require.NoError(datastoremw.SetInContext(ctx, ds))

var currentCursor *v1.Cursor
foundResourceIDs := util.NewSet[string]()
for {
stream := dispatch.NewCollectingDispatchStream[*v1.DispatchLookupResourcesResponse](ctx)
err = dispatcher.DispatchLookupResources(&v1.DispatchLookupResourcesRequest{
ObjectRelation: tc.permission,
Subject: tc.subject,
Metadata: &v1.ResolverMeta{
AtRevision: revision.String(),
DepthRemaining: 50,
},
OptionalLimit: uint32(pageSize),
OptionalCursor: currentCursor,
}, stream)
require.NoError(err)

if pageSize > 0 {
require.LessOrEqual(len(stream.Results()), pageSize)
}

for _, result := range stream.Results() {
foundResourceIDs.Add(result.ResolvedResource.ResourceId)
currentCursor = result.AfterResponseCursor
}

if pageSize == 0 || len(stream.Results()) < pageSize {
break
}
}

foundResourceIDsSlice := foundResourceIDs.AsSlice()
sort.Strings(foundResourceIDsSlice)
sort.Strings(tc.expectedResourceIDs)

require.Equal(tc.expectedResourceIDs, foundResourceIDsSlice)
})
}

foundResourceIDsSlice := foundResourceIDs.AsSlice()
sort.Strings(foundResourceIDsSlice)
sort.Strings(tc.expectedResourceIDs)

require.Equal(tc.expectedResourceIDs, foundResourceIDsSlice)
})
}
}
Expand Down
Loading

0 comments on commit 7ffc816

Please sign in to comment.