Skip to content

Commit

Permalink
[User management] Terminate ongoing streams when user state has chang…
Browse files Browse the repository at this point in the history
…ed [DPP-830] (#12437)

CHANGELOG_BEGIN
Ledger API Specification: When using user management based authorization streams will now get aborted on authenticated user's rights change.
CHANGELOG_END
  • Loading branch information
pbatko-da authored Jan 28, 2022
1 parent 35eae89 commit c72c27c
Show file tree
Hide file tree
Showing 33 changed files with 656 additions and 120 deletions.
1 change: 1 addition & 0 deletions language-support/java/bindings-rxjava/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ da_scala_library(
"@maven//:org_scalatest_scalatest_core",
"@maven//:org_scalatest_scalatest_matchers_core",
"@maven//:org_scalatest_scalatest_shouldmatchers",
"@maven//:com_typesafe_akka_akka_actor",
],
deps = [
":bindings-rxjava",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import java.net.{InetSocketAddress, SocketAddress}
import java.time.{Clock, Duration}
import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import com.daml.ledger.rxjava.grpc._
import com.daml.ledger.rxjava.grpc.helpers.TransactionsServiceImpl.LedgerItem
import com.daml.ledger.rxjava.{CommandCompletionClient, LedgerConfigurationClient, PackageClient}
Expand Down Expand Up @@ -46,13 +47,18 @@ final class LedgerServices(val ledgerId: String) {

val executionContext: ExecutionContext = global
private val esf: ExecutionSequencerFactory = new SingleThreadExecutionSequencerPool(ledgerId)
private val akkaSystem = ActorSystem("LedgerServicesParticipant")
private val participantId = "LedgerServicesParticipant"
private val authorizer =
Authorizer(
() => Clock.systemUTC().instant(),
ledgerId,
participantId,
new ErrorCodesVersionSwitcher(enableSelfServiceErrorCodes = true),
new InMemoryUserManagementStore(),
executionContext,
userRightsCheckIntervalInSeconds = 1,
akkaScheduler = akkaSystem.scheduler,
)

def newServerBuilder(): NettyServerBuilder = NettyServerBuilder.forAddress(nextAddress())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
package com.daml.ledger

import com.daml.error.ErrorCodesVersionSwitcher

import java.time.Clock
import java.util.UUID

import akka.actor.ActorSystem

import scala.concurrent.ExecutionContext
import com.daml.lf.data.Ref
import com.daml.ledger.api.auth.{
AuthServiceStatic,
Expand All @@ -18,18 +21,25 @@ import com.daml.ledger.api.auth.{
ClaimReadAsParty,
ClaimSet,
}
import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagementStore

package object rxjava {

private[rxjava] def untestedEndpoint: Nothing =
throw new UnsupportedOperationException("Untested endpoint, implement if needed")
private val akkaSystem = ActorSystem("testActorSystem")
sys.addShutdownHook(akkaSystem.terminate(): Unit)

private[rxjava] val authorizer =
Authorizer(
() => Clock.systemUTC().instant(),
"testLedgerId",
"testParticipantId",
new ErrorCodesVersionSwitcher(enableSelfServiceErrorCodes = true),
new InMemoryUserManagementStore(),
ExecutionContext.parasitic,
userRightsCheckIntervalInSeconds = 1,
akkaScheduler = akkaSystem.scheduler,
)

private[rxjava] val emptyToken = "empty"
Expand Down
2 changes: 2 additions & 0 deletions ledger/error/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ da_scala_library(
deps = [
"//ledger/error",
"//ledger/test-common",
"//libs-scala/contextualized-logging",
"//libs-scala/scala-utils",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:ch_qos_logback_logback_core",
"@maven//:com_google_api_grpc_proto_google_common_protos",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import com.daml.lf.transaction.GlobalKey
import com.daml.lf.value.Value
import com.daml.lf.{VersionRange, language}
import org.slf4j.event.Level

import java.time.{Duration, Instant}

import scala.concurrent.duration._

@Explanation(
"Errors raised by or forwarded by the Ledger API."
)
Expand Down Expand Up @@ -287,8 +288,30 @@ object LedgerApiErrors extends LedgerApiErrorGroup {
}
}

@Explanation("Authentication errors.")
@Explanation("Authentication and authorization errors.")
object AuthorizationChecks extends ErrorGroup() {

@Explanation("""The stream was aborted because the authenticated user's rights changed,
|and the user might thus no longer be authorized to this stream.
|""")
@Resolution(
"The application should automatically retry fetching the stream. It will either succeed, or fail with an explicit denial of authentication or permission."
)
object StaleUserManagementBasedStreamClaims
extends ErrorCode(
id = "STALE_STREAM_AUTHORIZATION",
ErrorCategory.ContentionOnSharedResources,
) {
case class Reject()(implicit
loggingContext: ContextualizedErrorLogger
) extends LoggingTransactionErrorImpl("Stale stream authorization. Retry quickly.") {
override def retryable: Option[ErrorCategoryRetry] = Some(
ErrorCategoryRetry(who = "application", duration = 0.seconds)
)
}

}

@Explanation(
"""This rejection is given if the submitted command does not contain a JWT token on a participant enforcing JWT authentication."""
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import com.google.protobuf
import com.google.rpc.{ErrorInfo, RequestInfo, ResourceInfo, RetryInfo}

import scala.jdk.CollectionConverters._
import scala.concurrent.duration._

object ErrorDetails {
sealed trait ErrorDetail extends Product with Serializable

final case class ResourceInfoDetail(name: String, typ: String) extends ErrorDetail
final case class ErrorInfoDetail(reason: String, metadata: Map[String, String])
extends ErrorDetail
final case class RetryInfoDetail(retryDelayInSeconds: Long) extends ErrorDetail
final case class RetryInfoDetail(duration: Duration) extends ErrorDetail
final case class RequestInfoDetail(requestId: String) extends ErrorDetail

def from(anys: Seq[protobuf.Any]): Seq[ErrorDetail] = anys.toList.map {
Expand All @@ -28,7 +29,9 @@ object ErrorDetails {

case any if any.is(classOf[RetryInfo]) =>
val v = any.unpack(classOf[RetryInfo])
RetryInfoDetail(v.getRetryDelay.getSeconds)
val delay = v.getRetryDelay
val duration = (delay.getSeconds.seconds + delay.getNanos.nanos).toCoarsest
RetryInfoDetail(duration)

case any if any.is(classOf[RequestInfo]) =>
val v = any.unpack(classOf[RequestInfo])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,97 @@
package com.daml.error

import com.daml.error.utils.ErrorDetails
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.testing.{LogCollector, LogCollectorAssertions}
import com.daml.platform.testing.LogCollector.ExpectedLogEntry
import com.daml.scalautil.Statement
import io.grpc.Status.Code
import io.grpc.StatusRuntimeException
import io.grpc.protobuf.StatusProto
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
import org.scalatest.Checkpoints.Checkpoint

trait ErrorsAssertions {
self: Matchers with LogCollectorAssertions =>
self: Matchers =>

private val logger = ContextualizedLogger.get(getClass)
private val loggingContext = LoggingContext.ForTesting
private val errorLogger = new DamlContextualizedErrorLogger(logger, loggingContext, None)

def assertError(
actual: StatusRuntimeException,
expectedCode: Code,
expectedMessage: String,
expectedDetails: Seq[ErrorDetails.ErrorDetail],
expectedF: ContextualizedErrorLogger => StatusRuntimeException,
): Unit = {
doAssertError(actual, expectedCode, expectedMessage, expectedDetails, None)
assertError(
actual = actual,
expected = expectedF(errorLogger),
)
}

def assertError[Test, Logger](
/** Asserts that the two errors have the same code, message and details.
*/
def assertError(
actual: StatusRuntimeException,
expected: StatusRuntimeException,
): Unit = {
val expectedStatus = StatusProto.fromThrowable(expected)
val expectedDetails = expectedStatus.getDetailsList.asScala.toSeq
assertError(
actual = actual,
expectedCode = expected.getStatus.getCode,
expectedMessage = expectedStatus.getMessage,
expectedDetails = ErrorDetails.from(expectedDetails),
)
}

def assertError(
actual: StatusRuntimeException,
expectedCode: Code,
expectedMessage: String,
expectedDetails: Seq[ErrorDetails.ErrorDetail],
expectedLogEntry: ExpectedLogEntry,
)(implicit
test: ClassTag[Test],
logger: ClassTag[Logger],
): Unit = {
doAssertError(actual, expectedCode, expectedMessage, expectedDetails, Some(expectedLogEntry))(
test,
logger,
)
val actualStatus = StatusProto.fromThrowable(actual)
val actualDetails = actualStatus.getDetailsList.asScala.toSeq
val cp = new Checkpoint
cp { Statement.discard { actual.getStatus.getCode shouldBe expectedCode } }
cp { Statement.discard { actualStatus.getMessage shouldBe expectedMessage } }
cp {
Statement.discard {
ErrorDetails.from(actualDetails) should contain theSameElementsAs expectedDetails
}
}
cp.reportAll()
}

private def doAssertError[Test, Logger](
}

trait ErrorAssertionsWithLogCollectorAssertions
extends ErrorsAssertions
with LogCollectorAssertions {
self: Matchers =>

def assertError[Test, Logger](
actual: StatusRuntimeException,
expectedCode: Code,
expectedMessage: String,
expectedDetails: Seq[ErrorDetails.ErrorDetail],
expectedLogEntry: Option[ExpectedLogEntry],
expectedLogEntry: ExpectedLogEntry,
)(implicit
test: ClassTag[Test],
logger: ClassTag[Logger],
): Unit = {
val status = StatusProto.fromThrowable(actual)
status.getCode shouldBe expectedCode.value()
status.getMessage shouldBe expectedMessage
val details = status.getDetailsList.asScala.toSeq
val _ = ErrorDetails.from(details) should contain theSameElementsAs expectedDetails
if (expectedLogEntry.isDefined) {
val actualLogs: Seq[LogCollector.Entry] = LogCollector.readAsEntries(test, logger)
actualLogs should have size 1
assertLogEntry(actualLogs.head, expectedLogEntry.get)
}
assertError(
actual = actual,
expectedCode = expectedCode,
expectedMessage = expectedMessage,
expectedDetails = expectedDetails,
)
val actualLogs: Seq[LogCollector.Entry] = LogCollector.readAsEntries(test, logger)
actualLogs should have size 1
assertLogEntry(actualLogs.head, expectedLogEntry)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ErrorCodeSpec
with Matchers
with BeforeAndAfter
with LogCollectorAssertions
with ErrorsAssertions {
with ErrorAssertionsWithLogCollectorAssertions {

implicit private val testLoggingContext: LoggingContext = LoggingContext.ForTesting
private val logger = ContextualizedLogger.get(getClass)
Expand Down Expand Up @@ -85,7 +85,7 @@ class ErrorCodeSpec
NotSoSeriousError.id,
Map("category" -> "1") ++ contextMetadata ++ Map("definite_answer" -> "true"),
),
ErrorDetails.RetryInfoDetail(TransientServerFailure.retryable.get.duration.toSeconds),
ErrorDetails.RetryInfoDetail(TransientServerFailure.retryable.get.duration),
ErrorDetails.RequestInfoDetail(correlationId),
ErrorDetails.ResourceInfoDetail(error.resources.head._1.asString, error.resources.head._2),
),
Expand Down
12 changes: 12 additions & 0 deletions ledger/ledger-api-auth/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ da_scala_library(
scala_deps = [
"@maven//:io_spray_spray_json",
"@maven//:org_scalaz_scalaz_core",
"@maven//:com_typesafe_akka_akka_actor",
],
tags = ["maven_coordinates=com.daml:ledger-api-auth:__VERSION__"],
visibility = [
Expand Down Expand Up @@ -63,17 +64,28 @@ da_scala_test_suite(
"@maven//:org_scalatest_scalatest_shouldmatchers",
"@maven//:org_scalatest_scalatest_wordspec",
"@maven//:org_scalatestplus_scalacheck_1_15",
"@maven//:com_typesafe_akka_akka_actor",
"@maven//:com_typesafe_akka_akka_stream",
],
deps = [
":ledger-api-auth",
"//daml-lf/data",
"//ledger-api/rs-grpc-bridge",
"//ledger-api/testing-utils",
"//ledger/error",
"//ledger/error:error-test-lib",
"//ledger/ledger-api-common",
"//ledger/ledger-api-domain",
"//ledger/participant-state-index",
"//ledger/test-common",
"//libs-scala/adjustable-clock",
"//libs-scala/contextualized-logging",
"@maven//:com_google_api_grpc_proto_google_common_protos",
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:io_grpc_grpc_api",
"@maven//:io_grpc_grpc_context",
"@maven//:io_grpc_grpc_protobuf",
"@maven//:io_grpc_grpc_stub",
"@maven//:org_mockito_mockito_core",
"@maven//:org_scalatest_scalatest_compatible",
],
Expand Down
Loading

0 comments on commit c72c27c

Please sign in to comment.