Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add detailed logging to ExchangeSteps and Attempts services #1565

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import io.grpc.Status
import java.time.Clock
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.wfanet.measurement.common.grpc.failGrpc
import org.wfanet.measurement.common.identity.ExternalId
import org.wfanet.measurement.common.identity.IdGenerator
import org.wfanet.measurement.common.toProtoTime
Expand All @@ -33,6 +32,7 @@ import org.wfanet.measurement.internal.kingdom.ExchangeWorkflow
import org.wfanet.measurement.internal.kingdom.GetExchangeStepRequest
import org.wfanet.measurement.internal.kingdom.StreamExchangeStepsRequest
import org.wfanet.measurement.internal.kingdom.claimReadyExchangeStepResponse
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.ExchangeStepNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.queries.StreamExchangeSteps
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.ExchangeStepAttemptReader
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers.ExchangeStepReader
Expand All @@ -48,14 +48,21 @@ class SpannerExchangeStepsService(
) : ExchangeStepsCoroutineImplBase() {

override suspend fun getExchangeStep(request: GetExchangeStepRequest): ExchangeStep {
val externalRecurringExchangeId = ExternalId(request.externalRecurringExchangeId)
val exchangeStepResult =
ExchangeStepReader()
.readByExternalIds(
client.singleUse(),
ExternalId(request.externalRecurringExchangeId),
externalRecurringExchangeId,
request.date,
request.stepIndex,
) ?: failGrpc(Status.NOT_FOUND) { "ExchangeStep not found" }
)
?: throw ExchangeStepNotFoundException(
externalRecurringExchangeId,
request.date,
request.stepIndex,
)
.asStatusRuntimeException(Status.Code.NOT_FOUND, "ExchangeStep not found")

return exchangeStepResult.exchangeStep
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ class ExchangeStepAttemptNotFoundException(
get() =
mapOf(
"external_recurring_exchange_id" to externalRecurringExchangeId.value.toString(),
"date" to date.toString(),
"date" to date.toLocalDate().toString(),
"step_index" to stepIndex.toString(),
"attempt_number" to attemptNumber.toString(),
)
Expand All @@ -630,7 +630,7 @@ class ExchangeStepNotFoundException(
get() =
mapOf(
"external_recurring_exchange_id" to externalRecurringExchangeId.value.toString(),
"date" to date.toString(),
"date" to date.toLocalDate().toString(),
"step_index" to stepIndex.toString(),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ kt_jvm_library(
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha:principal_server_interceptor",
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha:resource_key",
"//src/main/kotlin/org/wfanet/measurement/common/identity",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:internal_status_conversion",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_attempt_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_attempts_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_kt_jvm_proto",
Expand All @@ -227,6 +228,7 @@ kt_jvm_library(
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha:principal_server_interceptor",
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha:resource_key",
"//src/main/kotlin/org/wfanet/measurement/common/identity",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:internal_status_conversion",
"//src/main/proto/wfa/measurement/api/v2alpha:data_provider_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_steps_service_kt_jvm_grpc_proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,10 @@ class ExchangeStepAttemptsService(
internalExchangeStepAttempts.appendLogEntry(internalRequest)
} catch (e: StatusException) {
throw when (e.status.code) {
Status.Code.NOT_FOUND -> Status.NOT_FOUND
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}
.withCause(e)
.asRuntimeException()
Status.Code.NOT_FOUND -> Status.NOT_FOUND
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}.toExternalStatusRuntimeException(e)
}
return internalResponse.toExchangeStepAttempt()
}
Expand Down Expand Up @@ -135,12 +133,10 @@ class ExchangeStepAttemptsService(
internalExchangeStepAttempts.finishExchangeStepAttempt(internalRequest)
} catch (e: StatusException) {
throw when (e.status.code) {
Status.Code.NOT_FOUND -> Status.NOT_FOUND
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}
.withCause(e)
.asRuntimeException()
Status.Code.NOT_FOUND -> Status.NOT_FOUND
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}.toExternalStatusRuntimeException(e)
}
return internalResponse.toExchangeStepAttempt()
}
Expand Down Expand Up @@ -192,11 +188,9 @@ class ExchangeStepAttemptsService(
)
} catch (e: StatusException) {
throw when (e.status.code) {
Status.Code.NOT_FOUND -> permissionDeniedStatus()
else -> Status.UNKNOWN
}
.withCause(e)
.asRuntimeException()
Status.Code.NOT_FOUND -> Status.PERMISSION_DENIED
else -> Status.UNKNOWN
}.toExternalStatusRuntimeException(e)
}
when (authenticatedPrincipal) {
is DataProviderPrincipal -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,10 @@ class ExchangeStepsService(
)
} catch (e: StatusException) {
throw when (e.status.code) {
Status.Code.NOT_FOUND -> permissionDeniedStatus()
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}
.withCause(e)
.asRuntimeException()
Status.Code.NOT_FOUND -> Status.PERMISSION_DENIED
Status.Code.DEADLINE_EXCEEDED -> Status.DEADLINE_EXCEEDED
else -> Status.UNKNOWN
}.toExternalStatusRuntimeException(e)
}
when (authenticatedPrincipal) {
is DataProviderPrincipal -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.grpc.StatusRuntimeException
import io.grpc.protobuf.StatusProto
import org.wfanet.measurement.api.v2alpha.AccountKey
import org.wfanet.measurement.api.v2alpha.CanonicalExchangeKey
import org.wfanet.measurement.api.v2alpha.CanonicalExchangeStepAttemptKey
import org.wfanet.measurement.api.v2alpha.CanonicalExchangeStepKey
import org.wfanet.measurement.api.v2alpha.CanonicalRecurringExchangeKey
import org.wfanet.measurement.api.v2alpha.CanonicalRequisitionKey
import org.wfanet.measurement.api.v2alpha.DataProviderCertificateKey
Expand Down Expand Up @@ -334,14 +336,35 @@ fun Status.toExternalStatusRuntimeException(
)
)
.toName()
put("recurring_exchange", recurringExchangeName)
put("recurringExchange", recurringExchangeName)
errorMessage = "RecurringExchange $recurringExchangeName not found."
}
ErrorCode.EXCHANGE_STEP_NOT_FOUND -> {
errorMessage = "ExchangeStep not found."
val exchangeStepName =
CanonicalExchangeStepKey(
externalIdToApiId(
checkNotNull(errorInfo.metadataMap["external_recurring_exchange_id"]).toLong()
),
checkNotNull(errorInfo.metadataMap["date"]),
checkNotNull(errorInfo.metadataMap["step_index"]),
)
.toName()
put("exchangeStep", exchangeStepName)
errorMessage = "ExchangeStep $exchangeStepName not found."
}
ErrorCode.EXCHANGE_STEP_ATTEMPT_NOT_FOUND -> {
errorMessage = "ExchangeStepAttempt not found."
val exchangeStepAttemptName =
CanonicalExchangeStepAttemptKey(
externalIdToApiId(
checkNotNull(errorInfo.metadataMap["external_recurring_exchange_id"]).toLong()
),
checkNotNull(errorInfo.metadataMap["date"]),
checkNotNull(errorInfo.metadataMap["step_index"]),
checkNotNull(errorInfo.metadataMap["attempt_number"]),
)
.toName()
put("exchangeStepAttempt", exchangeStepAttemptName)
errorMessage = "ExchangeStepAttempt $exchangeStepAttemptName not found."
}
ErrorCode.EVENT_GROUP_STATE_ILLEGAL -> {
errorMessage = "EventGroup not found."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ kt_jvm_test(
srcs = ["ExchangeStepAttemptsServiceTest.kt"],
test_class = "org.wfanet.measurement.kingdom.service.api.v2alpha.ExchangeStepAttemptsServiceTest",
deps = [
"//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/common",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:exchange_step_attempts_service",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_attempts_service_kt_jvm_grpc_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_kt_jvm_proto",
Expand All @@ -177,6 +178,7 @@ kt_jvm_test(
deps = [
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha:resource_key",
"//src/main/kotlin/org/wfanet/measurement/api/v2alpha/testing",
"//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/common",
"//src/main/kotlin/org/wfanet/measurement/kingdom/service/api/v2alpha:exchange_steps_service",
"//src/main/proto/wfa/measurement/api/v2alpha:data_provider_kt_jvm_proto",
"//src/main/proto/wfa/measurement/api/v2alpha:exchange_step_kt_jvm_proto",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ import org.junit.runners.JUnit4
import org.mockito.kotlin.any
import org.mockito.kotlin.stub
import org.wfanet.measurement.api.v2alpha.CanonicalExchangeStepAttemptKey
import org.wfanet.measurement.api.v2alpha.CanonicalExchangeStepKey
import org.wfanet.measurement.api.v2alpha.CanonicalRecurringExchangeKey
import org.wfanet.measurement.api.v2alpha.DataProviderKey
import org.wfanet.measurement.api.v2alpha.DataProviderPrincipal
import org.wfanet.measurement.api.v2alpha.ExchangeStepAttempt
import org.wfanet.measurement.api.v2alpha.appendExchangeStepAttemptLogEntryRequest
import org.wfanet.measurement.api.v2alpha.finishExchangeStepAttemptRequest
import org.wfanet.measurement.api.v2alpha.withPrincipal
import org.wfanet.measurement.common.grpc.errorInfo
import org.wfanet.measurement.common.grpc.testing.GrpcTestServerRule
import org.wfanet.measurement.common.grpc.testing.mockService
import org.wfanet.measurement.common.identity.ExternalId
Expand All @@ -51,6 +54,9 @@ import org.wfanet.measurement.internal.kingdom.ExchangeStepsGrpcKt.ExchangeSteps
import org.wfanet.measurement.internal.kingdom.exchangeStep as internalExchangeStep
import org.wfanet.measurement.internal.kingdom.finishExchangeStepAttemptRequest as internalFinishExchangeStepAttemptRequest
import org.wfanet.measurement.internal.kingdom.getExchangeStepRequest
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.ExchangeStepAttemptNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.ExchangeStepNotFoundException
import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.common.RecurringExchangeNotFoundException

private const val RECURRING_EXCHANGE_ID = 1L
private const val STEP_INDEX = 1
Expand All @@ -64,6 +70,23 @@ private const val DEBUG_LOG_2_MESSAGE = "some other message"

private val DATE = Date.newBuilder().setYear(2021).setMonth(3).setDay(14).build()
private const val EXCHANGE_ID = "2021-03-14"
private val EXTERNAL_DATA_PROVIDER_ID = ExternalId(12345L)
private val DATA_PROVIDER_KEY = DataProviderKey(EXTERNAL_DATA_PROVIDER_ID.apiId.value)
private val RECURRING_EXCHANGE_KEY =
CanonicalRecurringExchangeKey(ExternalId(RECURRING_EXCHANGE_ID).apiId.value)
private val EXCHANGE_STEP_KEY =
CanonicalExchangeStepKey(
recurringExchangeId = RECURRING_EXCHANGE_KEY.recurringExchangeId,
exchangeId = EXCHANGE_ID,
exchangeStepId = STEP_INDEX.toString(),
)
private val EXCHANGE_STEP_ATTEMPT_KEY =
CanonicalExchangeStepAttemptKey(
recurringExchangeId = RECURRING_EXCHANGE_KEY.recurringExchangeId,
exchangeId = EXCHANGE_ID,
exchangeStepId = STEP_INDEX.toString(),
exchangeStepAttemptId = ATTEMPT_NUMBER.toString(),
)

private val INTERNAL_EXCHANGE_STEP_ATTEMPT: InternalExchangeStepAttempt =
InternalExchangeStepAttempt.newBuilder()
Expand Down Expand Up @@ -285,4 +308,105 @@ class ExchangeStepAttemptsServiceTest {
}
)
}

@Test
fun `finishExchangeStepAttempt throws NOT_FOUND with recurring exchange name when recurring exchange not found`() {
internalExchangeSteps.stub {
onBlocking { getExchangeStep(any()) }
.thenReturn(
internalExchangeStep { this.externalDataProviderId = EXTERNAL_DATA_PROVIDER_ID.value }
)
}
internalExchangeStepAttempts.stub {
onBlocking { finishExchangeStepAttempt(any()) }
.thenThrow(
RecurringExchangeNotFoundException(ExternalId(RECURRING_EXCHANGE_ID))
.asStatusRuntimeException(Status.Code.NOT_FOUND)
)
}
val exception =
assertFailsWith<StatusRuntimeException> {
withPrincipal(DataProviderPrincipal(DATA_PROVIDER_KEY)) {
runBlocking {
service.finishExchangeStepAttempt(
finishExchangeStepAttemptRequest {
name = EXCHANGE_STEP_ATTEMPT.name
finalState = ExchangeStepAttempt.State.FAILED
logEntries += EXCHANGE_STEP_ATTEMPT.debugLogEntriesList
}
)
}
}
}
assertThat(exception.status.code).isEqualTo(Status.Code.NOT_FOUND)
assertThat(exception.errorInfo?.metadataMap)
.containsEntry("recurringExchange", RECURRING_EXCHANGE_KEY.toName())
}

@Test
fun `finishExchangeStepAttempt throws PERMISSION_DENIED with exchange step name when exchange step not found`() {
internalExchangeSteps.stub {
onBlocking { getExchangeStep(any()) }
.thenThrow(
ExchangeStepNotFoundException(ExternalId(RECURRING_EXCHANGE_ID), DATE, STEP_INDEX)
.asStatusRuntimeException(Status.Code.NOT_FOUND)
)
}
val exception =
assertFailsWith<StatusRuntimeException> {
withPrincipal(DataProviderPrincipal(DATA_PROVIDER_KEY)) {
runBlocking {
service.finishExchangeStepAttempt(
finishExchangeStepAttemptRequest {
name = EXCHANGE_STEP_ATTEMPT.name
finalState = ExchangeStepAttempt.State.FAILED
logEntries += EXCHANGE_STEP_ATTEMPT.debugLogEntriesList
}
)
}
}
}
assertThat(exception.status.code).isEqualTo(Status.Code.PERMISSION_DENIED)
assertThat(exception.errorInfo?.metadataMap)
.containsEntry("exchangeStep", EXCHANGE_STEP_KEY.toName())
}

@Test
fun `finishExchangeStepAttempt throws NOT_FOUND with exchange step attempt name when exchange step attempt not found`() {
internalExchangeSteps.stub {
onBlocking { getExchangeStep(any()) }
.thenReturn(
internalExchangeStep { this.externalDataProviderId = EXTERNAL_DATA_PROVIDER_ID.value }
)
}
internalExchangeStepAttempts.stub {
onBlocking { finishExchangeStepAttempt(any()) }
.thenThrow(
ExchangeStepAttemptNotFoundException(
ExternalId(RECURRING_EXCHANGE_ID),
DATE,
STEP_INDEX,
ATTEMPT_NUMBER,
)
.asStatusRuntimeException(Status.Code.NOT_FOUND)
)
}
val exception =
assertFailsWith<StatusRuntimeException> {
withPrincipal(DataProviderPrincipal(DATA_PROVIDER_KEY)) {
runBlocking {
service.finishExchangeStepAttempt(
finishExchangeStepAttemptRequest {
name = EXCHANGE_STEP_ATTEMPT.name
finalState = ExchangeStepAttempt.State.FAILED
logEntries += EXCHANGE_STEP_ATTEMPT.debugLogEntriesList
}
)
}
}
}
assertThat(exception.status.code).isEqualTo(Status.Code.NOT_FOUND)
assertThat(exception.errorInfo?.metadataMap)
.containsEntry("exchangeStepAttempt", EXCHANGE_STEP_ATTEMPT_KEY.toName())
}
}
Loading
Loading