Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[User management] Terminate ongoing streams when user state has changed [DPP-830] #12437

Merged
merged 19 commits into from
Jan 28, 2022

Conversation

pbatko-da
Copy link
Contributor

No description provided.

@pbatko-da pbatko-da force-pushed the pbatko/persistent-user-management-stream-auth branch from 2e6e48f to 54a7efc Compare January 18, 2022 14:55
Base automatically changed from pbatko/persistent-user-management to main January 18, 2022 14:59
@pbatko-da pbatko-da force-pushed the pbatko/persistent-user-management-stream-auth branch from 54a7efc to 3bbc731 Compare January 18, 2022 15:20
* if any change in compared the original user claims has been detected
* or if user claims change check times out.
*/
private def validateClaimsResolvedFromUser(now: Instant): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core logic here

@pbatko-da pbatko-da force-pushed the pbatko/persistent-user-management-stream-auth branch from 3bbc731 to fb23f07 Compare January 20, 2022 19:31
@pbatko-da pbatko-da force-pushed the pbatko/persistent-user-management-stream-auth branch from fb23f07 to 16c30e1 Compare January 20, 2022 19:33
@pbatko-da pbatko-da marked this pull request as ready for review January 20, 2022 19:33
@pbatko-da pbatko-da requested review from stefanobaghino-da and a team as code owners January 20, 2022 19:33
@pbatko-da pbatko-da changed the title [User management] Ongoing stream authorization [User management] Terminate ongoing streams when user state has changed Jan 20, 2022
@pbatko-da pbatko-da changed the title [User management] Terminate ongoing streams when user state has changed [User management] Terminate ongoing streams when user state has changed [DPP-830] Jan 21, 2022
Copy link
Contributor

@meiersi-da meiersi-da left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff @pbatko-da ! This goes in exactly the right direction. I think we do though want to revisit the core a logic a bit wrt how it deals with quiescent streams. See my comments.

Comment on lines 79 to 87
private def authorize: Either[AuthorizationError, Unit] = {
val now = nowF()
for {
_ <- originalClaims.notExpired(now)
_ = validateClaimsResolvedFromUser(now)
} yield {
()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline the two calls into the onNext case distinction above for clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

Comment on lines 89 to 92
/** Aborts the stream by throwing an exception
* if any change in compared the original user claims has been detected
* or if user claims change check times out.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in here works with the limitation that a user rights check is only initiatiated on seeing a new stream element, which means that quiet streams won't be terminated in a timely fashion on user right changes or expiry.

If it's not too much effort, then I'd prefer to not have this problem and terminate in a timely fashion. This will require running the user rights check independently of the stream in a periodic fashion. That would also allow to adapt the tests to not require sending an extra element.

@nmarton-da : what is the recommended way to do a check periodically in Scala? If found this https://stackoverflow.com/a/39938355, but I'm sure we already have place where we do that.

Copy link
Contributor Author

@pbatko-da pbatko-da Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial question would be which and/or how many threads do want to spare with these checks.

If one new thread per all streams then java.util.Timer looks like a good fit.
If we predict needing more than one dedicated thread then should it be a dedicated thread pool or one that we already use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will probably start by looking at scheduling with Akka

Copy link
Contributor Author

@pbatko-da pbatko-da Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not too much effort, then I'd prefer to not have this problem and terminate in a timely fashion.

Ok

"definite_answer" -> "false",
),
),
ErrorDetails.RetryInfoDetail(retryDelayInSeconds = 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have a 1 second delay here? I'd expect we want to recommend an immediate retry wouldn't we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a default from ContentionOnSharedResources error category.

What do you think is better: 0.seconds or 1.millisecond

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to 0.seconds

@pbatko-da pbatko-da requested a review from meiersi-da January 24, 2022 15:24
@pbatko-da pbatko-da requested a review from tudor-da January 25, 2022 16:33
if (
originalClaims.resolvedFromUser &&
lastUserRightsCheckTime.isAfter(
now.plusSeconds(2 * userRightsCheckIntervalInSeconds.toLong)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the 2 * bit is a misunderstanding. We should check every userRightsCheckIntervalInSeconds. While doing so we guarantee that in the worst case we will be 2 * cacheExpiryAfterWriteInSeconds behind the update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline as staying with 2 *

throwOnFailure: AuthorizationError => Throwable,
) extends ServerCallStreamObserver[A] {
originalClaims: ClaimSet.Claims,
nowF: () => Instant,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how this could become useful, but AFAICT it is not set to a "fake" clock in any of the tests, is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful in a unit test for OngoingAuthorizationObserver but such test is missing from this PR

val delay = userRightsCheckIntervalInSeconds.seconds
// Note: https://doc.akka.io/docs/akka/2.6.13/scheduler.html states that:
// "All scheduled task will be executed when the ActorSystem is terminated, i.e. the task may execute before its timeout."
val c = akkaScheduler.scheduleWithFixedDelay(initialDelay = delay, delay = delay)(runnable =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Akka scheduler is a good approach, if you need it scalable, and don't care much about precision (I think this is the case here)

case Success(Right(userRights)) =>
val updatedClaims = AuthorizationInterceptor.convertUserRightsToClaims(userRights)
if (updatedClaims.toSet != originalClaims.claims.toSet) {
self.synchronized(observer.onError(staleStreamAuthError))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to sync for this? looks suspicious 🤔
(btw self.synchronized is just synchronized, no? or if you want to make it explicit porbably it would be better to pick the observer for synchronization)
also: if you need that synchronized, probably would be better to wrap the observer into a synchronized observer or something, and not worry about synchronization code in here (hard to prove every call is synchronized)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to sync for this?

I synchronized all calls to onNext, onError, onComplete and setMessageCompression. Other methods either are documented to be safe to be called from multiplethreads or have to be called early in the lifecycle.
From io.grpc.stub.ServerCallStreamObserver:

Like StreamObserver, implementations are not required to be thread-safe; if multiple threads will be writing to an instance concurrently, the application must synchronize its calls.

Synchronized observer sounds great

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed approach to synchronize methods of OngoingAuthorizationObserver rather than just mehtods on the delegate instance

@@ -324,8 +331,21 @@ object Authorizer {
ledgerId: String,
participantId: String,
errorCodesVersionSwitcher: ErrorCodesVersionSwitcher,
userManagementStore: UserManagementStore,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that apply is only ever used in the tests. Would it not make sense to hide it behind a method called testAuthorizer and then to encapsulate the repetitious boiler plate in this method? WDYT?

Copy link
Contributor

@mziolekda mziolekda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to address my remaining comment in a follow up PR or challenge it

@mziolekda mziolekda dismissed meiersi-da’s stale review January 28, 2022 10:41

All comments from @meiersi-da have been addressed


private val loggingContext = LoggingContext.ForTesting

it should "signal onError aborting the stream when user rights state hasn't been refreshed in a timely manner" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we have a test case for claims changing over time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's tested in OngoingStreamAuthIT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct but the comments of the OngoingAuthorizationObserver class state that the class will ensure stream termination on changing user rights state. Even if that would be a test duplication, I'd still add such test case to OngoingAuthorizationObserverSpec.

@pbatko-da pbatko-da merged commit c72c27c into main Jan 28, 2022
@pbatko-da pbatko-da deleted the pbatko/persistent-user-management-stream-auth branch January 28, 2022 15:44
azure-pipelines bot pushed a commit that referenced this pull request Feb 2, 2022
This PR has been created by a script, which is not very smart
and does not have all the context. Please do double-check that
the version prefix is correct before merging.

@adriaanm-da is in charge of this release.

Commit log:
```
5390505 Remove participant-side command deduplication [DPP-848] (#12677)
85f2690 split release (#12700)
ed1bf24 LF: Check activeness of cached contracts inside FetchInterface (#12698)
aa2494f Speedy: check contract type after checking if they are consumed (#12691)
183f936 LF: Imporve safety of the Serialization of proto message. (#12686)
b4ed15b Changed product names in the documentation (#12668)
5caaf1f Switch github urls from zip to tar.gz (#12692)
6cdda6f Metering report [DPP-817] (#12604)
716cc22 Deduplication offset period errors handled in Sandbox-on-X [DPP-872] (#12652)
e4764cc Upgrade to GHC 9.0.2 (#12300)
1238e1b Update profiling documentation to Canton sandbox (#12680)
f1c7e9e Drop `daml ledger navigator` in favor of `daml navigator` (#12669)
7731324 Drop profile-dir validation for sandbox-kv (#12671)
1fa095a match Foldable.foldl1's argument order to DA.List.foldl1's order (#12685)
06b63e9 update NOTICES file (#12688)
2ac76fd LF: Gracefully handle serialization error of Values beyond 2GB (#12638)
0494731 Adjust response type for script JSON API user mgmt requests (#12683)
7aabc49 Update OpenTelemetry from 0.16.0 to 1.1.0 [KVL-1256] (#12568)
117c920 Bump sandbox-on-x-it-tests size (#12681)
100c59f [JSON-API] Docs for user management endpoints (#12432)
92938d5 Moved com.daml.platform.sandbox to com.daml.ledger.sandbox in SoX (#12675)
8d5d3bd [JSON-API] User management endpoint adjustments (#12578)
54c427f Expose documentation for user-management functions in daml-script. (#12674)
c1afabe [participant-state] Add earliest_offset metadata to pruned data error [kvl-1270] (#12546)
ef18bf4 DEV all caps needed to exclude dev dependencies (#12679)
3d3d84f User management in daml-script over json API (#12646)
182edde [compatiblity tests] Limit the exclusion to versions that have the CommandDeduplication:Participant* tests (#12660)
f08dfa3 Bump terraform (#12670)
366cd89 Add new interface serializability tests (#12666)
84cec38 Upgrade ghc-lib (#12664)
d215a01 Enable user management for the Canton sandbox (#12667)
bb5722c Move sandbox-classic test lib and IT tests to Sandbox-on-x (#12641)
c9a65d1 update NOTICES file (#12659)
1fa7f61 ci: pin workdirs on Windows (#12645)
dcbb398 Typecheck experimental primitives in damlc (#12650)
0d5443f Drop direct dependencies on system-filepath (#12658)
49f37b8 Allow defining ledger-begin and ledger-end offset conditions in ledger-api-bench-tool [DPP-836] (#12521)
f1560ce Support `implements` qualified interfaces (#12644)
20836b1 Address CVE-2022-0355 alert, resolve `simple-get` to 4.0.1 (#12655)
49e6646 Clean up unstable-types test (#12648)
214951e Fix log string interpolation in AkkaExecutionSequencer (#12647)
c72c27c [User management] Terminate ongoing streams when user state has changed [DPP-830] (#12437)
35eae89 Compiler: expose LF builtin ExerciseByKey (#12615)
dfdb7ce Remove DA.Generics (#12634)
14f43e4 update NOTICES file (#12642)
cfa8d30 [Sandbox-on-X] Ledger-side in-memory command deduplication [DPP-872] (#12596)
7567cf5 Add scala serializability checks for interfaces (#12631)
39a421e Drop unused silencer variable (#12640)
4ec336d [User management] Enforce 1k user rights limit [DPP-833] (#12558)
16a13e2 Remove redundant conformance tests from ledger-on-sql (#12639)
0610a44 daml-react: add an useUser hook (#12622)
579fb20 Reenable Canton contract id tests (#12637)
60a0f03 Hide primitiveInterface from docs (#12635)
536e493 Bump canton to latest snapshot (#12633)
613e070 Wrap client instead of runner in script test (#12630)
d5ede55 Turn name collision warnings for virtual modules into errors (#12627)
d5ae82d Bump cached-path-relative from 1.0.2 to 1.1.0 (#12632)
f4c2862 Split release 2022-01-27 (#12624)
9e5425e update NOTICES file (#12613)
fcb6124 Simplify Nix GHC derivation (#12620)
5a47223 ledger-on-memory: Do not create iterators on a mutable log. (#12626)
d2216b3 Clean up unused test and unnecessary flags in ledger-on-sql tests (#12623)
345e267 create-daml-app: Use alias templates for display names (#12390)
2e3ae0d Disable ghcide snapshot on Windows (#12621)
e55622e Remove a bunch of ledger-on-memory tests (#12618)
f5c9a67 Remove DA.Experimental.Interfaces (#12619)
5810c25 release 2.0.0-snapshot.20220127.9042.0.4038d0a7 (#12614)
f396da5 Run ledger-api-test-tool tests on ledger-on-sql with PostgreSQL (#12616)
89ce7d0 Remove conflict-checking logic from JdbcLedgerDao [DPP-808] (#12555)
```
Changelog:
```
with experimental support for non-aggregated metering reporting
- [Daml Standard Library] An argument order in the default
  implementation of ``Foldable.foldl1`` was reversed from that of
  ``DA.List.foldl1``; this incompatibly changes the former to match the
  latter.

- [HTTP-JSON] Added documentation for the new user management endpoints

- [HTTP-JSON] Adjusted the response for the createUser & deleteUser endpoint to an empty object instead of "true". This was done to allow possible future extensions to the return type without applying breaking changes.
- [HTTP-JSON] Made the field rights optional for the request body of the createUser endpoint. Now it is possible to create a user with just the userId (i.e. { "userId": "nice.user" }).

- [Integration Kit] - ledger-api-bench-tool allow to define ledger-begin and ledger-end offset boundaries for streams.
Ledger API Specification: When using user management based authorization streams will now get aborted on authenticated user's rights change.
- [Daml Standard Library] The DA.Generics module has been removed.
Ledger API Specification: Maximum number of user rights per user is now limited to 1000 and is added to UserManagementFeature in VersionService. getLedgerApiVersion endpoint.
[daml-react] A `useUser` react hook is added to the daml-react
TypeScript library. It allows for easy access to the currently logged in
user of a ledger participant node for ledgers supporting user
management.
```

CHANGELOG_BEGIN
CHANGELOG_END
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants