Skip to content

Commit

Permalink
feat: configurable request context propagation to datastore (#1838)
Browse files Browse the repository at this point in the history
Co-authored-by: Adrian Tam <[email protected]>
Co-authored-by: Maria Ines Parnisari <[email protected]>
  • Loading branch information
3 people authored Nov 5, 2024
1 parent ba9e25d commit f665bc2
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
"default": [50, 200],
"x-env-variable": "OPENFGA_REQUEST_DURATION_DISPATCH_COUNT_BUCKETS"
},
"contextPropagationToDatastore": {
"description": "Propagate a requests context to the datastore implementation. Settings this parameter can result in connection pool draining on request aborts and timeouts.",
"type": "boolean",
"default": false,
"x-env-variable": "OPENFGA_CONTEXT_PROPAGATION_TO_DATASTORE"
},
"experimentals": {
"description": "a list of experimental features to enable",
"type": "array",
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Try to keep listed changes to a concise bulleted list of simple explanations of
### Added
* Added `start_time` parameter to `ReadChanges` API to allow filtering by specific time [#2020](https://github.com/openfga/openfga/pull/2020)
* Added support for Contextual Tuples in the `Expand` API. [#2045](https://github.com/openfga/openfga/pull/2045)
* Added a flag `OPENFGA_CONTEXT_PROPAGATION_TO_DATASTORE` to control propagation of a request's context to the datastore. [#1838](https://github.com/openfga/openfga/pull/1838)
* Added OTEL measurement for access control store check latency and write latency due to authorization [#2069](https://github.com/openfga/openfga/pull/2069)

### Performance
Expand Down
3 changes: 3 additions & 0 deletions cmd/run/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ func bindRunFlagsFunc(flags *pflag.FlagSet) func(*cobra.Command, []string) {
util.MustBindPFlag("requestDurationDispatchCountBuckets", flags.Lookup("request-duration-dispatch-count-buckets"))
util.MustBindEnv("requestDurationDispatchCountBuckets", "OPENFGA_REQUEST_DURATION_DISPATCH_COUNT_BUCKETS")

util.MustBindPFlag("contextPropagationToDatastore", flags.Lookup("context-propagation-to-datastore"))
util.MustBindEnv("contextPropagationToDatastore", "OPENFGA_CONTEXT_PROPAGATION_TO_DATASTORE")

util.MustBindPFlag("checkDispatchThrottling.enabled", flags.Lookup("check-dispatch-throttling-enabled"))
util.MustBindEnv("checkDispatchThrottling.enabled", "OPENFGA_CHECK_DISPATCH_THROTTLING_ENABLED")

Expand Down
3 changes: 3 additions & 0 deletions cmd/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func NewRunCommand() *cobra.Command {

flags.StringSlice("request-duration-dispatch-count-buckets", defaultConfig.RequestDurationDispatchCountBuckets, "dispatch count (i.e number of concurrent traversals to resolve a query) buckets used in labelling request_duration_ms.")

flags.Bool("context-propagation-to-datastore", defaultConfig.ContextPropagationToDatastore, "enable propagation of a request's context to the datastore")

flags.Bool("check-dispatch-throttling-enabled", defaultConfig.CheckDispatchThrottling.Enabled, "enable throttling for Check requests when the request's number of dispatches is high. Enabling this feature will prioritize dispatched requests requiring less than the configured dispatch threshold over requests whose dispatch count exceeds the configured threshold.")

flags.Duration("check-dispatch-throttling-frequency", defaultConfig.CheckDispatchThrottling.Frequency, "defines how frequent Check dispatch throttling will be evaluated. This controls how frequently throttled dispatch Check requests are dispatched.")
Expand Down Expand Up @@ -639,6 +641,7 @@ func (s *ServerContext) Run(ctx context.Context, config *serverconfig.Config) er
server.WithRequestDurationByQueryHistogramBuckets(convertStringArrayToUintArray(config.RequestDurationDatastoreQueryCountBuckets)),
server.WithRequestDurationByDispatchCountHistogramBuckets(convertStringArrayToUintArray(config.RequestDurationDispatchCountBuckets)),
server.WithMaxAuthorizationModelSizeInBytes(config.MaxAuthorizationModelSizeInBytes),
server.WithContextPropagationToDatastore(config.ContextPropagationToDatastore),
server.WithDispatchThrottlingCheckResolverEnabled(checkDispatchThrottlingConfig.Enabled),
server.WithDispatchThrottlingCheckResolverFrequency(checkDispatchThrottlingConfig.Frequency),
server.WithDispatchThrottlingCheckResolverThreshold(checkDispatchThrottlingConfig.Threshold),
Expand Down
7 changes: 7 additions & 0 deletions cmd/run/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,10 @@ func TestDefaultConfig(t *testing.T) {
require.Equal(t, arrayVal.String(), cfg.RequestDurationDispatchCountBuckets[index])
}

val = res.Get("properties.contextPropagationToDatastore.default")
require.True(t, val.Exists())
require.False(t, val.Bool())

val = res.Get("properties.dispatchThrottling.properties.enabled.default")
require.True(t, val.Exists())
require.Equal(t, val.Bool(), cfg.CheckDispatchThrottling.Enabled)
Expand Down Expand Up @@ -1261,6 +1265,7 @@ func TestRunCommandNoConfigDefaultValues(t *testing.T) {
require.Equal(t, "", viper.GetString(datastoreEngineFlag))
require.Equal(t, "", viper.GetString(datastoreURIFlag))
require.False(t, viper.GetBool("check-query-cache-enabled"))
require.False(t, viper.GetBool("context-propagation-to-datastore"))
require.Equal(t, uint32(0), viper.GetUint32("check-query-cache-limit"))
require.Equal(t, 0*time.Second, viper.GetDuration("check-query-cache-ttl"))
require.Equal(t, []int{}, viper.GetIntSlice("request-duration-datastore-query-count-buckets"))
Expand Down Expand Up @@ -1339,6 +1344,7 @@ func TestRunCommandConfigIsMerged(t *testing.T) {
t.Setenv("OPENFGA_ACCESS_CONTROL_ENABLED", "true")
t.Setenv("OPENFGA_ACCESS_CONTROL_STORE_ID", "12345")
t.Setenv("OPENFGA_ACCESS_CONTROL_MODEL_ID", "67891")
t.Setenv("OPENFGA_CONTEXT_PROPAGATION_TO_DATASTORE", "true")

runCmd := NewRunCommand()
runCmd.RunE = func(cmd *cobra.Command, _ []string) error {
Expand All @@ -1359,6 +1365,7 @@ func TestRunCommandConfigIsMerged(t *testing.T) {
require.True(t, viper.GetBool("access-control-enabled"))
require.Equal(t, "12345", viper.GetString("access-control-store-id"))
require.Equal(t, "67891", viper.GetString("access-control-model-id"))
require.True(t, viper.GetBool("context-propagation-to-datastore"))

return nil
}
Expand Down
7 changes: 6 additions & 1 deletion internal/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ type Config struct {
// request timeout will be prioritized
RequestTimeout time.Duration

// ContextPropagationToDatastore enables propagation of a requests context to the datastore,
// thereby receiving API cancellation signals
ContextPropagationToDatastore bool

Datastore DatastoreConfig
GRPC GRPCConfig
HTTP HTTPConfig
Expand Down Expand Up @@ -665,7 +669,8 @@ func DefaultConfig() *Config {
Threshold: DefaultListUsersDispatchThrottlingDefaultThreshold,
MaxThreshold: DefaultListUsersDispatchThrottlingMaxThreshold,
},
RequestTimeout: DefaultRequestTimeout,
RequestTimeout: DefaultRequestTimeout,
ContextPropagationToDatastore: false,
}
}

Expand Down
21 changes: 19 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ type Server struct {

authorizer authz.AuthorizerInterface

ctx context.Context
ctx context.Context
contextPropagationToDatastore bool
}

type OpenFGAServiceV1Option func(s *Server)
Expand Down Expand Up @@ -521,6 +522,17 @@ func WithDispatchThrottlingCheckResolverMaxThreshold(maxThreshold uint32) OpenFG
}
}

// WithContextPropagationToDatastore determines whether the request context is propagated to the datastore.
// When enabled, the datastore receives cancellation signals when an API request is cancelled.
// When disabled, datastore operations continue even if the original request context is cancelled.
// Disabling context propagation is normally desirable to avoid unnecessary database connection churn.
// If not specified, the default value is false (separate storage and request contexts).
func WithContextPropagationToDatastore(enable bool) OpenFGAServiceV1Option {
return func(s *Server) {
s.contextPropagationToDatastore = enable
}
}

// MustNewServerWithOpts see NewServerWithOpts.
func MustNewServerWithOpts(opts ...OpenFGAServiceV1Option) *Server {
s, err := NewServerWithOpts(opts...)
Expand Down Expand Up @@ -720,7 +732,12 @@ func NewServerWithOpts(opts ...OpenFGAServiceV1Option) (*Server, error) {
}
}

s.datastore = storagewrappers.NewCachedOpenFGADatastore(storagewrappers.NewContextWrapper(s.datastore), s.maxAuthorizationModelCacheSize)
if !s.contextPropagationToDatastore {
// Creates a new [storagewrappers.ContextTracerWrapper] that will execute datastore queries using
// a new background context with the current trace context.
s.datastore = storagewrappers.NewContextWrapper(s.datastore)
}
s.datastore = storagewrappers.NewCachedOpenFGADatastore(s.datastore, s.maxAuthorizationModelCacheSize)

if s.cacheLimit > 0 && (s.checkQueryCacheEnabled || s.checkIteratorCacheEnabled) {
s.cache = storage.NewInMemoryLRUCache([]storage.InMemoryLRUCacheOpt[any]{
Expand Down
73 changes: 73 additions & 0 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,79 @@ func TestAvoidDeadlockWithinSingleCheckRequest(t *testing.T) {
require.False(t, resp.GetAllowed())
}

func TestRequestContextPropagation(t *testing.T) {
t.Cleanup(func() {
goleak.VerifyNone(t)
})

mockController := gomock.NewController(t)
defer mockController.Finish()

for _, tc := range []struct {
name string
shouldPropagateContext bool
}{
{name: "disabled", shouldPropagateContext: false},
{name: "enabled", shouldPropagateContext: true},
} {
t.Run(tc.name, func(t *testing.T) {
storeID := ulid.Make().String()
modelID := ulid.Make().String()

mockController := gomock.NewController(t)
t.Cleanup(mockController.Finish)

parentCtx, cancelParentCtx := context.WithCancel(context.Background())
t.Cleanup(cancelParentCtx)

mockDatastore := mockstorage.NewMockOpenFGADatastore(mockController)

model := testutils.MustTransformDSLToProtoWithID(`
model
schema 1.1
type user
type repo
relations
define reader: [user]`)

mockDatastore.EXPECT().
ReadAuthorizationModel(gomock.Any(), gomock.Eq(storeID), gomock.Eq(modelID)).
Return(model, nil)

mockDatastore.EXPECT().
ReadUserTuple(gomock.Any(), gomock.Eq(storeID), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, _, _, _ any) (*openfgav1.Tuple, error) {
cancelParentCtx()
if tc.shouldPropagateContext {
require.ErrorIs(t, ctx.Err(), context.Canceled, "storage context must get cancelled if the request context is propagated")
} else {
require.NoError(t, ctx.Err(), "storage context must not get canceled if request context propagation is disabled")
}
// Return dummy error, we don't care about the check result for this testcase
return nil, storage.ErrNotFound
})

s := MustNewServerWithOpts(
WithDatastore(mockDatastore),
WithContextPropagationToDatastore(tc.shouldPropagateContext),
)
t.Cleanup(func() {
mockDatastore.EXPECT().Close().Times(1)
s.Close()
})

// We do not care about the check result as we assert via mockDatastore.
_, _ = s.Check(parentCtx, &openfgav1.CheckRequest{
StoreId: storeID,
TupleKey: tuple.NewCheckRequestTupleKey("repo:openfga", "reader", "user:mike"),
AuthorizationModelId: modelID,
})
})
}
}

func TestThreeProngThroughVariousLayers(t *testing.T) {
t.Cleanup(func() {
goleak.VerifyNone(t)
Expand Down

0 comments on commit f665bc2

Please sign in to comment.