Skip to content

Commit

Permalink
Add detailed logging to ExchangeSteps and Attempts services (#1565)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcorilla authored Apr 15, 2024
1 parent 0c18429 commit ba0f72f
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import io.grpc.Status
import java.time.Clock
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.common.identity.ExternalId
import org.wfanet.measurement.common.identity.IdGenerator
import org.wfanet.measurement.common.toProtoTime
Expand All @@ -33,6 +32,7 @@ import org.wfanet.measurement.internal.kingdom.ExchangeWorkflow
import org.wfanet.measurement.internal.kingdom.GetExchangeStepRequest
import org.wfanet.measurement.internal.kingdom.StreamExchangeStepsRequest
import org.wfanet.measurement.internal.kingdom.claimReadyExchangeStepResponse
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.ExchangeStepNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.queries.StreamExchangeSteps
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.ExchangeStepAttemptReader
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.ExchangeStepReader
Expand All @@ -48,14 +48,21 @@ class SpannerExchangeStepsService(
) : ExchangeStepsCoroutineImplBase() {

override suspend fun getExchangeStep(request: GetExchangeStepRequest): ExchangeStep {
val externalRecurringExchangeId = ExternalId(request.externalRecurringExchangeId)
val exchangeStepResult =
ExchangeStepReader()
.readByExternalIds(
client.singleUse(),
ExternalId(request.externalRecurringExchangeId),
externalRecurringExchangeId,
request.date,
request.stepIndex,
) ?: failGrpc(Status.NOT_FOUND) { "ExchangeStep not found" }
)
?: throw ExchangeStepNotFoundException(
externalRecurringExchangeId,
request.date,
request.stepIndex,
)
.asStatusRuntimeException(Status.Code.NOT_FOUND, "ExchangeStep not found")

return exchangeStepResult.exchangeStep
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ class ExchangeStepAttemptNotFoundException(
get() =
mapOf(
"external_recurring_exchange_id" to externalRecurringExchangeId.value.toString(),
"date" to date.toString(),
"date" to date.toLocalDate().toString(),
"step_index" to stepIndex.toString(),
"attempt_number" to attemptNumber.toString(),
)
Expand All @@ -630,7 +630,7 @@ class ExchangeStepNotFoundException(
get() =
mapOf(
"external_recurring_exchange_id" to externalRecurringExchangeId.value.toString(),
"date" to date.toString(),
"date" to date.toLocalDate().toString(),
"step_index" to stepIndex.toString(),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ kt_jvm_library(
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha:principal_server_interceptor",
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha:resource_key",
"//src/main/kotlin/org/wfanet/measurement/common/identity",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:internal_status_conversion",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_attempt_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_attempts_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_kt_jvm_proto",
Expand All @@ -227,6 +228,7 @@ kt_jvm_library(
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha:principal_server_interceptor",
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha:resource_key",
"//src/main/kotlin/org/wfanet/measurement/common/identity",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:internal_status_conversion",
"//src/main/proto/wfa/measurement/api/v2alpha:data_provider_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_steps_service_kt_jvm_grpc_proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,10 @@ class ExchangeStepAttemptsService(
internalExchangeStepAttempts.appendLogEntry(internalRequest)
} catch (e: StatusException) {
throw when (e.status.code) {
Status.Code.NOT_FOUND -> Status.NOT_FOUND
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}
.withCause(e)
.asRuntimeException()
Status.Code.NOT_FOUND -> Status.NOT_FOUND
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}.toExternalStatusRuntimeException(e)
}
return internalResponse.toExchangeStepAttempt()
}
Expand Down Expand Up @@ -135,12 +133,10 @@ class ExchangeStepAttemptsService(
internalExchangeStepAttempts.finishExchangeStepAttempt(internalRequest)
} catch (e: StatusException) {
throw when (e.status.code) {
Status.Code.NOT_FOUND -> Status.NOT_FOUND
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}
.withCause(e)
.asRuntimeException()
Status.Code.NOT_FOUND -> Status.NOT_FOUND
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}.toExternalStatusRuntimeException(e)
}
return internalResponse.toExchangeStepAttempt()
}
Expand Down Expand Up @@ -192,11 +188,9 @@ class ExchangeStepAttemptsService(
)
} catch (e: StatusException) {
throw when (e.status.code) {
Status.Code.NOT_FOUND -> permissionDeniedStatus()
else -> Status.UNKNOWN
}
.withCause(e)
.asRuntimeException()
Status.Code.NOT_FOUND -> Status.PERMISSION_DENIED
else -> Status.UNKNOWN
}.toExternalStatusRuntimeException(e)
}
when (authenticatedPrincipal) {
is DataProviderPrincipal -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,10 @@ class ExchangeStepsService(
)
} catch (e: StatusException) {
throw when (e.status.code) {
Status.Code.NOT_FOUND -> permissionDeniedStatus()
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}
.withCause(e)
.asRuntimeException()
Status.Code.NOT_FOUND -> Status.PERMISSION_DENIED
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}.toExternalStatusRuntimeException(e)
}
when (authenticatedPrincipal) {
is DataProviderPrincipal -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.grpc.StatusRuntimeException
import io.grpc.protobuf.StatusProto
import org.wfanet.measurement.api.v2alpha.AccountKey
import org.wfanet.measurement.api.v2alpha.CanonicalExchangeKey
import org.wfanet.measurement.api.v2alpha.CanonicalExchangeStepAttemptKey
import org.wfanet.measurement.api.v2alpha.CanonicalExchangeStepKey
import org.wfanet.measurement.api.v2alpha.CanonicalRecurringExchangeKey
import org.wfanet.measurement.api.v2alpha.CanonicalRequisitionKey
import org.wfanet.measurement.api.v2alpha.DataProviderCertificateKey
Expand Down Expand Up @@ -334,14 +336,35 @@ fun Status.toExternalStatusRuntimeException(
)
)
.toName()
put("recurring_exchange", recurringExchangeName)
put("recurringExchange", recurringExchangeName)
errorMessage = "RecurringExchange $recurringExchangeName not found."
}
ErrorCode.EXCHANGE_STEP_NOT_FOUND -> {
errorMessage = "ExchangeStep not found."
val exchangeStepName =
CanonicalExchangeStepKey(
externalIdToApiId(
checkNotNull(errorInfo.metadataMap["external_recurring_exchange_id"]).toLong()
),
checkNotNull(errorInfo.metadataMap["date"]),
checkNotNull(errorInfo.metadataMap["step_index"]),
)
.toName()
put("exchangeStep", exchangeStepName)
errorMessage = "ExchangeStep $exchangeStepName not found."
}
ErrorCode.EXCHANGE_STEP_ATTEMPT_NOT_FOUND -> {
errorMessage = "ExchangeStepAttempt not found."
val exchangeStepAttemptName =
CanonicalExchangeStepAttemptKey(
externalIdToApiId(
checkNotNull(errorInfo.metadataMap["external_recurring_exchange_id"]).toLong()
),
checkNotNull(errorInfo.metadataMap["date"]),
checkNotNull(errorInfo.metadataMap["step_index"]),
checkNotNull(errorInfo.metadataMap["attempt_number"]),
)
.toName()
put("exchangeStepAttempt", exchangeStepAttemptName)
errorMessage = "ExchangeStepAttempt $exchangeStepAttemptName not found."
}
ErrorCode.EVENT_GROUP_STATE_ILLEGAL -> {
errorMessage = "EventGroup not found."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ kt_jvm_test(
srcs = ["ExchangeStepAttemptsServiceTest.kt"],
test_class = "org.wfanet.measurement.kingdom.service.api.v2alpha.ExchangeStepAttemptsServiceTest",
deps = [
"//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/common",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:exchange_step_attempts_service",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_attempts_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_kt_jvm_proto",
Expand All @@ -177,6 +178,7 @@ kt_jvm_test(
deps = [
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha:resource_key",
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha/testing",
"//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/common",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:exchange_steps_service",
"//src/main/proto/wfa/measurement/api/v2alpha:data_provider_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_kt_jvm_proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ import org.junit.runners.JUnit4
import org.mockito.kotlin.any
import org.mockito.kotlin.stub
import org.wfanet.measurement.api.v2alpha.CanonicalExchangeStepAttemptKey
import org.wfanet.measurement.api.v2alpha.CanonicalExchangeStepKey
import org.wfanet.measurement.api.v2alpha.CanonicalRecurringExchangeKey
import org.wfanet.measurement.api.v2alpha.DataProviderKey
import org.wfanet.measurement.api.v2alpha.DataProviderPrincipal
import org.wfanet.measurement.api.v2alpha.ExchangeStepAttempt
import org.wfanet.measurement.api.v2alpha.appendExchangeStepAttemptLogEntryRequest
import org.wfanet.measurement.api.v2alpha.finishExchangeStepAttemptRequest
import org.wfanet.measurement.api.v2alpha.withPrincipal
import org.wfanet.measurement.common.grpc.errorInfo
import org.wfanet.measurement.common.grpc.testing.GrpcTestServerRule
import org.wfanet.measurement.common.grpc.testing.mockService
import org.wfanet.measurement.common.identity.ExternalId
Expand All @@ -51,6 +54,9 @@ import org.wfanet.measurement.internal.kingdom.ExchangeStepsGrpcKt.ExchangeSteps
import org.wfanet.measurement.internal.kingdom.exchangeStep as internalExchangeStep
import org.wfanet.measurement.internal.kingdom.finishExchangeStepAttemptRequest as internalFinishExchangeStepAttemptRequest
import org.wfanet.measurement.internal.kingdom.getExchangeStepRequest
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.ExchangeStepAttemptNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.ExchangeStepNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.RecurringExchangeNotFoundException

private const val RECURRING_EXCHANGE_ID = 1L
private const val STEP_INDEX = 1
Expand All @@ -64,6 +70,23 @@ private const val DEBUG_LOG_2_MESSAGE = "some other message"

private val DATE = Date.newBuilder().setYear(2021).setMonth(3).setDay(14).build()
private const val EXCHANGE_ID = "2021-03-14"
private val EXTERNAL_DATA_PROVIDER_ID = ExternalId(12345L)
private val DATA_PROVIDER_KEY = DataProviderKey(EXTERNAL_DATA_PROVIDER_ID.apiId.value)
private val RECURRING_EXCHANGE_KEY =
CanonicalRecurringExchangeKey(ExternalId(RECURRING_EXCHANGE_ID).apiId.value)
private val EXCHANGE_STEP_KEY =
CanonicalExchangeStepKey(
recurringExchangeId = RECURRING_EXCHANGE_KEY.recurringExchangeId,
exchangeId = EXCHANGE_ID,
exchangeStepId = STEP_INDEX.toString(),
)
private val EXCHANGE_STEP_ATTEMPT_KEY =
CanonicalExchangeStepAttemptKey(
recurringExchangeId = RECURRING_EXCHANGE_KEY.recurringExchangeId,
exchangeId = EXCHANGE_ID,
exchangeStepId = STEP_INDEX.toString(),
exchangeStepAttemptId = ATTEMPT_NUMBER.toString(),
)

private val INTERNAL_EXCHANGE_STEP_ATTEMPT: InternalExchangeStepAttempt =
InternalExchangeStepAttempt.newBuilder()
Expand Down Expand Up @@ -285,4 +308,105 @@ class ExchangeStepAttemptsServiceTest {
}
)
}

@Test
fun `finishExchangeStepAttempt throws NOT_FOUND with recurring exchange name when recurring exchange not found`() {
internalExchangeSteps.stub {
onBlocking { getExchangeStep(any()) }
.thenReturn(
internalExchangeStep { this.externalDataProviderId = EXTERNAL_DATA_PROVIDER_ID.value }
)
}
internalExchangeStepAttempts.stub {
onBlocking { finishExchangeStepAttempt(any()) }
.thenThrow(
RecurringExchangeNotFoundException(ExternalId(RECURRING_EXCHANGE_ID))
.asStatusRuntimeException(Status.Code.NOT_FOUND)
)
}
val exception =
assertFailsWith<StatusRuntimeException> {
withPrincipal(DataProviderPrincipal(DATA_PROVIDER_KEY)) {
runBlocking {
service.finishExchangeStepAttempt(
finishExchangeStepAttemptRequest {
name = EXCHANGE_STEP_ATTEMPT.name
finalState = ExchangeStepAttempt.State.FAILED
logEntries += EXCHANGE_STEP_ATTEMPT.debugLogEntriesList
}
)
}
}
}
assertThat(exception.status.code).isEqualTo(Status.Code.NOT_FOUND)
assertThat(exception.errorInfo?.metadataMap)
.containsEntry("recurringExchange", RECURRING_EXCHANGE_KEY.toName())
}

@Test
fun `finishExchangeStepAttempt throws PERMISSION_DENIED with exchange step name when exchange step not found`() {
internalExchangeSteps.stub {
onBlocking { getExchangeStep(any()) }
.thenThrow(
ExchangeStepNotFoundException(ExternalId(RECURRING_EXCHANGE_ID), DATE, STEP_INDEX)
.asStatusRuntimeException(Status.Code.NOT_FOUND)
)
}
val exception =
assertFailsWith<StatusRuntimeException> {
withPrincipal(DataProviderPrincipal(DATA_PROVIDER_KEY)) {
runBlocking {
service.finishExchangeStepAttempt(
finishExchangeStepAttemptRequest {
name = EXCHANGE_STEP_ATTEMPT.name
finalState = ExchangeStepAttempt.State.FAILED
logEntries += EXCHANGE_STEP_ATTEMPT.debugLogEntriesList
}
)
}
}
}
assertThat(exception.status.code).isEqualTo(Status.Code.PERMISSION_DENIED)
assertThat(exception.errorInfo?.metadataMap)
.containsEntry("exchangeStep", EXCHANGE_STEP_KEY.toName())
}

@Test
fun `finishExchangeStepAttempt throws NOT_FOUND with exchange step attempt name when exchange step attempt not found`() {
internalExchangeSteps.stub {
onBlocking { getExchangeStep(any()) }
.thenReturn(
internalExchangeStep { this.externalDataProviderId = EXTERNAL_DATA_PROVIDER_ID.value }
)
}
internalExchangeStepAttempts.stub {
onBlocking { finishExchangeStepAttempt(any()) }
.thenThrow(
ExchangeStepAttemptNotFoundException(
ExternalId(RECURRING_EXCHANGE_ID),
DATE,
STEP_INDEX,
ATTEMPT_NUMBER,
)
.asStatusRuntimeException(Status.Code.NOT_FOUND)
)
}
val exception =
assertFailsWith<StatusRuntimeException> {
withPrincipal(DataProviderPrincipal(DATA_PROVIDER_KEY)) {
runBlocking {
service.finishExchangeStepAttempt(
finishExchangeStepAttemptRequest {
name = EXCHANGE_STEP_ATTEMPT.name
finalState = ExchangeStepAttempt.State.FAILED
logEntries += EXCHANGE_STEP_ATTEMPT.debugLogEntriesList
}
)
}
}
}
assertThat(exception.status.code).isEqualTo(Status.Code.NOT_FOUND)
assertThat(exception.errorInfo?.metadataMap)
.containsEntry("exchangeStepAttempt", EXCHANGE_STEP_ATTEMPT_KEY.toName())
}
}
Loading

0 comments on commit ba0f72f

Please sign in to comment.