Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add postgres computations service #1136

Merged
merged 14 commits into from
Aug 3, 2023

Conversation

YuhongWang-Amazon
Copy link
Contributor

This adds the postgres computations service and unit tests for it.

@wfa-reviewable
Copy link

This change is Reviewable

Copy link
Contributor

@renjiezh renjiezh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 20 of 21 files at r1, 1 of 1 files at r2, all commit messages.
Reviewable status: all files reviewed, 7 unresolved discussions (waiting on @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/db/computation/ComputationTypes.kt line 33 at r2 (raw file):

}

fun ComputationStage.toComputationType() =

This PR updates duchy to support REACH_ONLY_LIQUID_LEGIONS_V2 protocol. Some change might need to add for postgres's implementation.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 305 at r2 (raw file):

        endingStage = request.endingComputationStage,
        endComputationReason =
          when (val it = request.reason) {

nit: what about

when (request.reason) {
  ...
  else -> error("Unknown CompletedReason ${request.reason}")
}

src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 327 at r2 (raw file):

    val token =
      computationReader.readComputationToken(client, request.token.globalComputationId)
        ?: failGrpc(Status.INTERNAL) {

might be Status.NOT_FOUND.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 351 at r2 (raw file):

    val token =
      computationReader.readComputationToken(client, request.token.globalComputationId)
        ?: failGrpc(Status.INTERNAL) {

ditto


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 373 at r2 (raw file):

    val token =
      computationReader.readComputationToken(client, request.token.globalComputationId)
        ?: failGrpc(Status.INTERNAL) {

ditto


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 428 at r2 (raw file):

  ): GetComputationIdsResponse {
    val ids = computationReader.readGlobalComputationIds(client.singleUse(), request.stagesList)
    return GetComputationIdsResponse.newBuilder().addAllGlobalIds(ids).build()

Please avoid using builder.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 460 at r2 (raw file):

    val token =
      computationReader.readComputationToken(client, request.token.globalComputationId)
        ?: failGrpc(Status.INTERNAL) {

ditto.


src/main/kotlin/org/wfanet/measurement/duchy/service/internal/testing/ComputationsServiceTest.kt line 532 at r2 (raw file):

      val nextStage = computationStage {
        liquidLegionsSketchAggregationV2 = Stage.WAIT_REQUISITIONS_AND_KEY_SET

Nit:To make the unit test more realistic, this stage could be Stage.CONFIRMATION_PHASE.
Check the Stage to AfterTransition map here

Copy link
Contributor Author

@YuhongWang-Amazon YuhongWang-Amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 20 of 21 files reviewed, 5 unresolved discussions (waiting on @renjiezh)


src/main/kotlin/org/wfanet/measurement/duchy/db/computation/ComputationTypes.kt line 33 at r2 (raw file):

Previously, renjiezh wrote…

This PR updates duchy to support REACH_ONLY_LIQUID_LEGIONS_V2 protocol. Some change might need to add for postgres's implementation.

Thanks for the heads up. When do you plan to merge that PR? I can rebase after you merged that one.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 327 at r2 (raw file):

Previously, renjiezh wrote…

might be Status.NOT_FOUND.

We had this discussion last time. Since in this use case, a null computation token should only be caused by system invariant, we decided to use Status.INTERNAL.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 428 at r2 (raw file):

Previously, renjiezh wrote…

Please avoid using builder.

Done.


src/main/kotlin/org/wfanet/measurement/duchy/service/internal/testing/ComputationsServiceTest.kt line 532 at r2 (raw file):

Previously, renjiezh wrote…

Nit:To make the unit test more realistic, this stage could be Stage.CONFIRMATION_PHASE.
Check the Stage to AfterTransition map here

It was not trivial to change this stage to CONFIRMATION_PHASE. Since the only valid transition for INITIALIZATION_PHASE is the WAIT_REQUISITIONS_AND_KEY_SET. Many setup are required to advance a computation from INITIALIZATION_PHASE to CONFIRMATION_PHASE.

Is it ok to just use this for a unit test? Or are there any better ways to prepare a ready to be advanced WAIT_REQUISITIONS_AND_KEY_SET phase computation?

@SanjayVas SanjayVas requested a review from renjiezh July 28, 2023 21:54
Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting for @renjiezh to complete his review first.

Reviewable status: 20 of 21 files reviewed, 5 unresolved discussions (waiting on @renjiezh and @YuhongWang-Amazon)

Copy link
Contributor

@renjiezh renjiezh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 1 of 1 files at r3, all commit messages.
Reviewable status: :shipit: complete! all files reviewed, all discussions resolved (waiting on @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/db/computation/ComputationTypes.kt line 33 at r2 (raw file):

Previously, YuhongWang-Amazon wrote…

Thanks for the heads up. When do you plan to merge that PR? I can rebase after you merged that one.

I was blocked by another one (#1123). I will merge it once this blocker solved,


src/main/kotlin/org/wfanet/measurement/duchy/service/internal/testing/ComputationsServiceTest.kt line 532 at r2 (raw file):

Previously, YuhongWang-Amazon wrote…

It was not trivial to change this stage to CONFIRMATION_PHASE. Since the only valid transition for INITIALIZATION_PHASE is the WAIT_REQUISITIONS_AND_KEY_SET. Many setup are required to advance a computation from INITIALIZATION_PHASE to CONFIRMATION_PHASE.

Is it ok to just use this for a unit test? Or are there any better ways to prepare a ready to be advanced WAIT_REQUISITIONS_AND_KEY_SET phase computation?

Makes sense. Let's save the effort.

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 5 of 21 files at r1, 1 of 1 files at r2, all commit messages.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 118 at r3 (raw file):

  private val idGenerator: IdGenerator,
  private val duchyName: String,
  private val computationStorageClient: ComputationStore,

ComputationStore implements the Store interface and wraps a StorageClient instance.

Suggestion:

computationStore

src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 227 at r3 (raw file):

      try {
        computationStorageClient.get(blobKey)?.delete()
      } catch (e: StatusException) {

A Store shouldn't be leaking gRPC StatusException, especially since not every underlying implementation even uses gRPC. If the blob isn't found get should just return null.

Code quote:

     } catch (e: StatusException) {

src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

        if (!isTerminated(computationStageEnum)) {
          FinishComputation(

@renjiezh This behavior doesn't look correct to me. I thought we only delete finished computations? My understanding was that we have separate TTLs for finished pending computations than for deleting finished computations? We shouldn't be terminating and deleting the same computation in the same RPC.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 326 at r3 (raw file):

    )

    val token =

It looks like this may be repeating an issue that exists in the Spanner implementation: the token isn't read in the same transaction that performs the mutation.

The FinishComputation operation should return the updated token.

Copy link
Contributor

@renjiezh renjiezh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @SanjayVas and @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

@renjiezh This behavior doesn't look correct to me. I thought we only delete finished computations? My understanding was that we have separate TTLs for finished pending computations than for deleting finished computations? We shouldn't be terminating and deleting the same computation in the same RPC.

purgeComputations in the service is capable to deletes any Computations with the respect to the TTL. However, the request has the field stages to restrict t stage of Computations to delete. The caller is the cronjob ComputationCleaner, which only request to delete COMPLETE Computations.
The FinishComputation here is to handle purge requests for unfinished computations. However, it is never executed in our system yet. So I think it is fine.

@SanjayVas SanjayVas requested a review from renjiezh July 28, 2023 22:50
Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @renjiezh and @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

Previously, renjiezh wrote…

purgeComputations in the service is capable to deletes any Computations with the respect to the TTL. However, the request has the field stages to restrict t stage of Computations to delete. The caller is the cronjob ComputationCleaner, which only request to delete COMPLETE Computations.
The FinishComputation here is to handle purge requests for unfinished computations. However, it is never executed in our system yet. So I think it is fine.

My point still stands that the same Computation should never be finished and deleted in the same RPC. You should have to make two separate RPCs: one to finish and one to delete.

Copy link
Contributor

@renjiezh renjiezh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @SanjayVas and @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

My point still stands that the same Computation should never be finished and deleted in the same RPC. You should have to make two separate RPCs: one to finish and one to delete.

Suppose the scenario that, a Computation never gets the requisitions fulfilled (something wrong from the EDP side). Thus the Computation will be PENDING_REQUISITION forever. Currently we don't have any method to dealing with this case. One solution is to rely on the cronjob to force clean very old Computations(not enabled yet). This is why I keep the feature here.
Could you let me know your consideration why computations cannot be finished and deleted in the same RPC?

@SanjayVas SanjayVas requested a review from renjiezh July 28, 2023 23:11
Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @renjiezh and @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):
The idea is to allow time for processing changes before things get deleted. We follow this pattern in the Kingdom: Measurements are either transitioned from pending->cancelled or from cancelled->deleted transitions, with different TTLs for each operation.

For Duchy computations, isn't the normal flow that the Measurement gets marked as completed (SUCCEEDED or FAILED) at the Kingdom and then the Herald picks that up to delete it?

Suppose the scenario that, a Computation never gets the requisitions fulfilled (something wrong from the EDP side). Thus the Computation will be PENDING_REQUISITION forever.

In practice, this case is handled at the Kingdom (it has a cron job for cancelling Measurements that have been pending too long).

Copy link
Contributor

@renjiezh renjiezh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

The idea is to allow time for processing changes before things get deleted. We follow this pattern in the Kingdom: Measurements are either transitioned from pending->cancelled or from cancelled->deleted transitions, with different TTLs for each operation.

For Duchy computations, isn't the normal flow that the Measurement gets marked as completed (SUCCEEDED or FAILED) at the Kingdom and then the Herald picks that up to delete it?

Suppose the scenario that, a Computation never gets the requisitions fulfilled (something wrong from the EDP side). Thus the Computation will be PENDING_REQUISITION forever.

In practice, this case is handled at the Kingdom (it has a cron job for cancelling Measurements that have been pending too long).

Sounds good. Then we change the purge function to allow only terminal (Complete, Failed) Computation to be deleted.
@YuhongWang-Amazon Could you please make the change in the PR so that it ignore non-terminated stages in request.stages.

Copy link
Contributor Author

@YuhongWang-Amazon YuhongWang-Amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @SanjayVas)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

Previously, renjiezh wrote…

Sounds good. Then we change the purge function to allow only terminal (Complete, Failed) Computation to be deleted.
@YuhongWang-Amazon Could you please make the change in the PR so that it ignore non-terminated stages in request.stages.

Sure, I can make the update. Since this has the same logic as spanner implementation, I will also update that.

Copy link
Contributor Author

@YuhongWang-Amazon YuhongWang-Amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @SanjayVas)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 118 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

ComputationStore implements the Store interface and wraps a StorageClient instance.

Done.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 227 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

A Store shouldn't be leaking gRPC StatusException, especially since not every underlying implementation even uses gRPC. If the blob isn't found get should just return null.

Im trying to be consistent with the spanner implementation. Should I remove the try catch block from both places? https://github.com/world-federation-of-advertisers/cross-media-measurement/blob/main/src/main/kotlin/org/wfanet/measurement/duchy/service/internal/computations/ComputationsService.kt#L137-L152


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

Previously, YuhongWang-Amazon wrote…

Sure, I can make the update. Since this has the same logic as spanner implementation, I will also update that.

Updated both postgres and spanner implementations to filter out the non-terminal stages from the requests.
I am wondering if it is better to failGrpc when there are non-terminal stages in the request.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 326 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

It looks like this may be repeating an issue that exists in the Spanner implementation: the token isn't read in the same transaction that performs the mutation.

The FinishComputation operation should return the updated token.

Interesting. I am not sure how that could happen.

I moved the read operation into all writer transactions. Please take another look.

Copy link
Contributor

@renjiezh renjiezh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 11 of 28 files reviewed, 4 unresolved discussions (waiting on @SanjayVas)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

Previously, YuhongWang-Amazon wrote…

Updated both postgres and spanner implementations to filter out the non-terminal stages from the requests.
I am wondering if it is better to failGrpc when there are non-terminal stages in the request.

That is a good idea to failGrpc for that. Otherwise the caller might be confused that some of the Computations still exist.

Copy link
Contributor Author

@YuhongWang-Amazon YuhongWang-Amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 10 of 28 files reviewed, 4 unresolved discussions (waiting on @renjiezh and @SanjayVas)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

Previously, renjiezh wrote…

That is a good idea to failGrpc for that. Otherwise the caller might be confused that some of the Computations still exist.

Sounds good. Updated with test coverage.

@SanjayVas SanjayVas requested a review from renjiezh July 31, 2023 22:43
Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 5 of 21 files at r1, 11 of 17 files at r4, 3 of 4 files at r5, all commit messages.
Reviewable status: 24 of 28 files reviewed, 5 unresolved discussions (waiting on @renjiezh and @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 227 at r3 (raw file):

Previously, YuhongWang-Amazon wrote…

Im trying to be consistent with the spanner implementation. Should I remove the try catch block from both places? https://github.com/world-federation-of-advertisers/cross-media-measurement/blob/main/src/main/kotlin/org/wfanet/measurement/duchy/service/internal/computations/ComputationsService.kt#L137-L152

Yes. Basically if a StatusException can possibly be thrown by this then it means there's something wrong with the Store or StorageClient impl.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

Previously, YuhongWang-Amazon wrote…

Sounds good. Updated with test coverage.

I'm still seeing this "if not terminated then finish" logic? If we're throwing an error for non-terminal states then this can never be hit.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/readers/ComputationReader.kt line 222 at r5 (raw file):

  /**
   * Reads a [ComputationToken] by globalComputationId.

nit: maybe make the difference between this and the other overload more clear?

Suggestion:

Reads a [ComputationToken] by [globalComputationId] in a new transaction.

src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/readers/ComputationReader.kt line 377 at r5 (raw file):

  }

  suspend fun readLockOwner(readContext: ReadContext, computationId: Long): LockOwnerQueryResult? {

Document that this returns null when the Computation is not found (as opposed to the lock owner not being found as a result).

Alternatively if this is always an error case, throw a ComputationNotFoundException.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/writers/ClaimWork.kt line 71 at r5 (raw file):

      ?.let {
        computationReader.readComputationToken(transactionContext, it.globalId)
          ?: throw UnknownDataError()

Shouldn't this be a more specific ComputationNotFoundException? The calling code can decide what to do with that, e.g. whether it should result in a gRPC status with code INTERNAL.


src/main/kotlin/org/wfanet/measurement/duchy/service/internal/DuchyInternalException.kt line 105 at r5 (raw file):

}

class UnknownDataError(

Error and Exception are different types in Java, and this is one of the latter.

Suggestion:

UnknownDataException

Copy link
Contributor Author

@YuhongWang-Amazon YuhongWang-Amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 24 of 28 files reviewed, 4 unresolved discussions (waiting on @renjiezh and @SanjayVas)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 227 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Yes. Basically if a StatusException can possibly be thrown by this then it means there's something wrong with the Store or StorageClient impl.

Done.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 273 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

I'm still seeing this "if not terminated then finish" logic? If we're throwing an error for non-terminal states then this can never be hit.

Removed!


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/readers/ComputationReader.kt line 377 at r5 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Document that this returns null when the Computation is not found (as opposed to the lock owner not being found as a result).

Alternatively if this is always an error case, throw a ComputationNotFoundException.

Added the document.

This scenario is similar to the other places where I throws UnknownDataError (changed to DataCorruptedException).
Currently, readLockOwner is only called in ClaimWork writer, where the previous listUnclaimedTasks confirmed that the computation exists. Hence, I returned null here and let the caller to throw UnknownDataError.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/writers/ClaimWork.kt line 71 at r5 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Shouldn't this be a more specific ComputationNotFoundException? The calling code can decide what to do with that, e.g. whether it should result in a gRPC status with code INTERNAL.

I defined a new UnknownDataError here to differentiate from the ComputationNotFoundException.
In this scenario, the previous code just modified the same computation within the same transaction, hence the reader is not expected to get a null, unless the database is corrupted or something.
On the other hand, the ComputationNotFoundException is expected to be raised when a transaction could not find the target computation in the first place.

We could actually replace this exception with !! if we don't expect this scenario to happen (at least it should be very rare). Do you have any opinion which approach is better?


src/main/kotlin/org/wfanet/measurement/duchy/service/internal/DuchyInternalException.kt line 105 at r5 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Error and Exception are different types in Java, and this is one of the latter.

Done.

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 21 files at r1, 13 of 13 files at r6, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @renjiezh and @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 266 at r6 (raw file):

      for (globalId in globalIds) {
        val token = computationReader.readComputationToken(client, globalId) ?: continue
        DeleteComputation(token.localComputationId).execute(client, idGenerator)

Each service method should be a single transaction.

Code quote:

        val token = computationReader.readComputationToken(client, globalId) ?: continue
        DeleteComputation(token.localComputationId).execute(client, idGenerator)

src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/writers/ClaimWork.kt line 71 at r5 (raw file):

I defined a new UnknownDataError here to differentiate from the ComputationNotFoundException.

I don't see a need for this.

In this scenario, the previous code just modified the same computation within the same transaction, hence the reader is not expected to get a null,

If it's violating an invariant, throw an IllegalStateException (there's a very convenient checkNotNull function for this). No need for a special exception type.

unless the database is corrupted or something.

More likely to be "or something", as Postgres should be guaranteeing this for us. That is to say that we don't know if the cause is data corruption. All we know is that something that should be impossible happened.

Copy link
Contributor Author

@YuhongWang-Amazon YuhongWang-Amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 20 of 31 files reviewed, 2 unresolved discussions (waiting on @renjiezh and @SanjayVas)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 266 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Each service method should be a single transaction.

Done.


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/writers/ClaimWork.kt line 71 at r5 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

I defined a new UnknownDataError here to differentiate from the ComputationNotFoundException.

I don't see a need for this.

In this scenario, the previous code just modified the same computation within the same transaction, hence the reader is not expected to get a null,

If it's violating an invariant, throw an IllegalStateException (there's a very convenient checkNotNull function for this). No need for a special exception type.

unless the database is corrupted or something.

More likely to be "or something", as Postgres should be guaranteeing this for us. That is to say that we don't know if the cause is data corruption. All we know is that something that should be impossible happened.

Done.

Copy link
Contributor Author

@YuhongWang-Amazon YuhongWang-Amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 18 of 31 files reviewed, 3 unresolved discussions (waiting on @renjiezh and @SanjayVas)


src/main/kotlin/org/wfanet/measurement/duchy/service/internal/computations/ComputationsService.kt line 372 at r8 (raw file):

  }

  private fun isTerminated(token: ComputationToken): Boolean {

@renjiezh it seems that these two methods are not used, could you help confirm whether we can remove them or not?

Copy link
Contributor

@renjiezh renjiezh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 18 of 31 files reviewed, 2 unresolved discussions (waiting on @SanjayVas)


src/main/kotlin/org/wfanet/measurement/duchy/service/internal/computations/ComputationsService.kt line 372 at r8 (raw file):

Previously, YuhongWang-Amazon wrote…

@renjiezh it seems that these two methods are not used, could you help confirm whether we can remove them or not?

You're right. It is safe to remove both.

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 9 of 11 files at r7, 3 of 4 files at r8, 1 of 1 files at r9, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 266 at r6 (raw file):

Previously, YuhongWang-Amazon wrote…

Done.

The read of global IDs is still happening in a different transaction than the deletion here. Perhaps move that read into DeleteComputation and pass the force parameter to it, having it unconditionally return the sample.


src/main/proto/wfa/measurement/internal/duchy/error_code.proto line 44 at r9 (raw file):

  /** Data corrupted for unknown reasons. */
  DATA_CORRUPTED = 7;

revert

Copy link
Contributor Author

@YuhongWang-Amazon YuhongWang-Amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @SanjayVas)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 266 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

The read of global IDs is still happening in a different transaction than the deletion here. Perhaps move that read into DeleteComputation and pass the force parameter to it, having it unconditionally return the sample.

I didn't make this a single transaction because I was thinking about the partial purges. E.g 10 computations should be purged, but somehow it failed at deleting the 6th one. In this implementation, the first 5 computations will still be deleted. But in a transactional implementation, it will be either all 10 are deleted or non.

Since the exceptions are caught and logged, the returned purgeCount could also reflect the actually deleted rows. I am wondering if there is the need to make this one transaction.

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 266 at r6 (raw file):

in a transactional implementation, it will be either all 10 are deleted or non.

That would be the preferred behavior. It's how we handle similar batch deletions in the CMMS public API.

Copy link
Contributor Author

@YuhongWang-Amazon YuhongWang-Amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 28 of 32 files reviewed, 2 unresolved discussions (waiting on @renjiezh and @SanjayVas)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 266 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

in a transactional implementation, it will be either all 10 are deleted or non.

That would be the preferred behavior. It's how we handle similar batch deletions in the CMMS public API.

Done.


src/main/proto/wfa/measurement/internal/duchy/error_code.proto line 44 at r9 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

revert

Done.

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 4 of 4 files at r10, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @YuhongWang-Amazon)


src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/postgres/PostgresComputationsService.kt line 260 at r10 (raw file):

    return purgeComputationsResponse {
      purgeCount = purgeResult.purgeCount
      purgeResult.purgeSamples?.forEach { purgeSample += it }

nit: Use a regular conditional, relying on smart casts (the += operator works for both "add" and "add all")

Alternatively, just have purgeResult.purgeSamples be the empty set (a repeated field with no elements is equivalent to one that hasn't been set).

Suggestion:

if (purgeResult.purgeSamples != null) {
  purgeSamples += purgeResult.purgeSamples
}

Copy link
Contributor Author

@YuhongWang-Amazon YuhongWang-Amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r11, all commit messages.
Reviewable status: :shipit: complete! all files reviewed, all discussions resolved (waiting on @YuhongWang-Amazon)

@YuhongWang-Amazon YuhongWang-Amazon merged commit 6b0d3a4 into main Aug 3, 2023
@YuhongWang-Amazon YuhongWang-Amazon deleted the yuhong.postgres-duchy-data-services branch August 3, 2023 20:16
ple13 pushed a commit that referenced this pull request Aug 16, 2024
This adds the postgres computations service and unit tests for it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants