-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add BaseTeeApplication #1904
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 9 files at r1, all commit messages.
Reviewable status: 4 of 9 files reviewed, 6 unresolved discussions (waiting on @Marco-Premier and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 48 at r1 (raw file):
private val queueName: String, private val queueClient: QueueClient, private val parser: (ByteArray) -> T,
Use the existing Parser interface, which offers more flexibility in how to deserialize.
Suggestion:
Parser<T>
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 49 at r1 (raw file):
private val queueClient: QueueClient, private val parser: (ByteArray) -> T, private val blockingContext: CoroutineContext = Dispatchers.IO,
Is this actually used for blocking code? I thought we wrap all of that in the queue client.
If it is, annotate it with @BlockingExecutor
. If not, annotate it with @NonBlockingExecutor
, default it to Dispatchers.Default. In either case, you probably want to rename it and make it clear what it's used for.
That said, this may be irrelevant if you give this a primary run method.
Code quote:
blockingContext
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 52 at r1 (raw file):
) : AutoCloseable { private val scope = CoroutineScope(blockingContext + SupervisorJob())
If you really need supervisor behavior (rare), document exactly why you need it.
Code quote:
SupervisorJob()
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 54 at r1 (raw file):
private val scope = CoroutineScope(blockingContext + SupervisorJob()) init {
Usually for the top-level "application" class called from main
, you'd have a run
method. In the case of coroutine applications, this is most likely to be a suspending function that suspends until the application ends. For a server or daemon, this is forever (until the application crashes or is terminated externally). There are multiple examples of this in the code base.
I believe if you do this, you won't need a separate coroutine scope or context. main
will most likely just wrap this the call to run
with runBlocking
.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 59 at r1 (raw file):
startListening() } catch (e: Exception) { logger.severe("Error starting to listen to queue: ${e.message}")
Log-and-throw is an anti-pattern. Do one or the other.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 71 at r1 (raw file):
private suspend fun startListening() { try { val messageChannel: ReceiveChannel<QueueClient.QueueMessage<ByteArray>> =
Using ByteArray defeats the purpose of having QueueMessage be generic. You'll want to push the parsing code into the queue client. The underlying message type will always be a protobuf, and therefore extend Message.
Suggestion:
T
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 2 of 9 files reviewed, 6 unresolved discussions (waiting on @SanjayVas and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 48 at r1 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Use the existing Parser interface, which offers more flexibility in how to deserialize.
Done.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 49 at r1 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Is this actually used for blocking code? I thought we wrap all of that in the queue client.
If it is, annotate it with
@BlockingExecutor
. If not, annotate it with@NonBlockingExecutor
, default it to Dispatchers.Default. In either case, you probably want to rename it and make it clear what it's used for.That said, this may be irrelevant if you give this a primary run method.
Done.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 52 at r1 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
If you really need supervisor behavior (rare), document exactly why you need it.
Done.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 54 at r1 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Usually for the top-level "application" class called from
main
, you'd have arun
method. In the case of coroutine applications, this is most likely to be a suspending function that suspends until the application ends. For a server or daemon, this is forever (until the application crashes or is terminated externally). There are multiple examples of this in the code base.I believe if you do this, you won't need a separate coroutine scope or context.
main
will most likely just wrap this the call torun
withrunBlocking
.
I connected the dots, thanks. No need for blocking executor here then. Left it inside QueueClient
classes only.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 59 at r1 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Log-and-throw is an anti-pattern. Do one or the other.
Done.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 71 at r1 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Using ByteArray defeats the purpose of having QueueMessage be generic. You'll want to push the parsing code into the queue client. The underlying message type will always be a protobuf, and therefore extend Message.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 9 files at r1, 3 of 3 files at r2, all commit messages.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @Marco-Premier and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 36 at r2 (raw file):
* @param queueClient A client that manages connections and interactions with the queue. * @param parser A `Parser` from `com.google.protobuf` used to parse raw `ByteArray` data from the * queue into the desired message type [T], typically a Protobuf message.
nit: this is enforced here to always be a protobuf message
Suggestion:
* @param parser [Parser] used to parse serialized queue messages into [T] instances
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 49 at r2 (raw file):
* If an error occurs during the message flow, it is logged and handling continues. */ suspend open fun startListening() {
Why would this be overridable and public? It should be a private method called from the single entry point to the application, i.e. a suspend fun run()
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 59 at r2 (raw file):
.collect { queueMessage -> processMessage(queueMessage) println("~~~~~~~~~~~~~~~~~~~~~~~~ collecting message")
Clean up debugging prints. Use logger if needed.
@Marco-Premier as a small housekeeping matter, have a look through the PR titles in here. Good to always name the PRs following the same scheme. For additional context have a look through Conventional Commits. |
fix: lint proto file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks
Reviewable status: 6 of 9 files reviewed, 2 unresolved discussions (waiting on @SanjayVas and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 49 at r2 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Why would this be overridable and public? It should be a private method called from the single entry point to the application, i.e. a
suspend fun run()
Done.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 59 at r2 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Clean up debugging prints. Use logger if needed.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 3 files at r3, all commit messages.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @Marco-Premier and @stevenwarejones)
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 72 at r3 (raw file):
.build() launch {
nit: I don't think this code doesn't need to be launched in a separate coroutine. You should be able to launch the sender and then have this code be top-level inside runBlocking
.
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 75 at r3 (raw file):
app.messageProcessed.await() assertThat(app.processedMessages.contains(testWork)).isTrue() inMemoryQueueClient.close()
nit: Your BaseTeeApplication implementation closes the queue client, meaning it "owns" the client. Therefore you don't need to close the queue client separately.
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 80 at r3 (raw file):
launch { inMemoryQueueClient.sendMessage(testWork.toByteArray())
sendMessage should be taking a TestWork instance as the client should be handling the binary serialization/deserialization.
…he one in common-jvm
refactor: updated common-jvm commit hash in MODULE.bazel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 9 files reviewed, 1 unresolved discussion (waiting on @SanjayVas and @stevenwarejones)
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 80 at r3 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
sendMessage should be taking a TestWork instance as the client should be handling the binary serialization/deserialization.
With the client you refer to the common-jvm GooglePubSubClient
? I guess my question is: do we want to limit the PubSub client to only work with proto messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 9 files at r1, 3 of 8 files at r4, 5 of 5 files at r5, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @SanjayVas)
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 80 at r3 (raw file):
Previously, Marco-Premier (marcopremier) wrote…
With the client you refer to the common-jvm
GooglePubSubClient
? I guess my question is: do we want to limit the PubSub client to only work with proto messages?
I think I'd prefer ByteString but I'm okay with limiting it to Message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 8 files at r4, 3 of 5 files at r5, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @Marco-Premier and @stevenwarejones)
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 80 at r3 (raw file):
Previously, stevenwarejones (Steven Ware Jones) wrote…
I think I'd prefer ByteString but I'm okay with limiting it to Message.
Yes, I'm referring to GooglePubSubClient
(or QueueSubscriber
). There is no reason to allow for anything other than protobuf messages for pubsub or message queues.
refactor: protobuf message serialization is now wrapped in GooglePubSubClient in common-jvm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @SanjayVas and @stevenwarejones)
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 80 at r3 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Yes, I'm referring to
GooglePubSubClient
(orQueueSubscriber
). There is no reason to allow for anything other than protobuf messages for pubsub or message queues.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 4 files at r6, all commit messages.
Reviewable status: 8 of 9 files reviewed, 2 unresolved discussions (waiting on @Marco-Premier and @SanjayVas)
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 79 at r6 (raw file):
logger.severe("Failed to parse protobuf message: ${e.message}") queueMessage.nack() } catch (e: Exception) {
aren't there other exceptions we don't want to retry? I'd prefer to have a list of retryable excpetions and only retry for those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 8 of 9 files reviewed, 2 unresolved discussions (waiting on @SanjayVas and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 79 at r6 (raw file):
Previously, stevenwarejones (Steven Ware Jones) wrote…
aren't there other exceptions we don't want to retry? I'd prefer to have a list of retryable excpetions and only retry for those.
Can we use a dead-letter-queue policy with a max-retry attempts set to (eg.) 5 for the moment?
I'd like to investigate on a separate PR what are the exception we want to retry for ans which ones we don't want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 8 of 9 files reviewed, 2 unresolved discussions (waiting on @SanjayVas)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 8 files at r4, 2 of 4 files at r6, all commit messages.
Reviewable status: 8 of 9 files reviewed, 6 unresolved discussions (waiting on @Marco-Premier and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 52 at r6 (raw file):
* If an error occurs during the message flow, it is logged and handling continues. */ private suspend fun startListening() {
nit: this isn't actually "starting" as it suspends until cancelled or the channel is closed. That is to say it's actually receiving and processing.
Code quote:
startListening
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 57 at r6 (raw file):
queueSubscriber.subscribe(queueName, parser) messageChannel .consumeAsFlow()
Why? You're not passing this to any other code that uses Flows rather than Channels. You can just use a for loop using the channel iterator
for (message: QueueSubscriber.QueueMessage<T> in messageChannel) {
processMessage(message)
}
Or a while loop if you want to call receive yourself e.g. if you want to do more specific error handling
while (true) {
val message: QueueSubscriber.QueueMessage<T> = messageChannel.receive()
processMessage(message)
}
Code quote:
consumeAsFlow
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 78 at r6 (raw file):
} catch (e: InvalidProtocolBufferException) { logger.severe("Failed to parse protobuf message: ${e.message}") queueMessage.nack()
Do we actually want BaseTeeApplication to handle this, or let each application implementation handle acking/nacking? If we're just going to do this, I don't see a huge benefit in exposing the ack/nack from QueueSubscriber.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 91 at r6 (raw file):
queueSubscriber.close() } catch (e: Exception) { logger.severe("Error during close: ${e.message}")
I don't think you need to log here, since I assume this will be called from main
of the application. Therefore it can just bubble up.
Code quote:
logger.severe("Error during close: ${e.message}")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 4 of 9 files reviewed, 5 unresolved discussions (waiting on @SanjayVas and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 57 at r6 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Why? You're not passing this to any other code that uses Flows rather than Channels. You can just use a for loop using the channel iterator
for (message: QueueSubscriber.QueueMessage<T> in messageChannel) { processMessage(message) }Or a while loop if you want to call receive yourself e.g. if you want to do more specific error handling
while (true) { val message: QueueSubscriber.QueueMessage<T> = messageChannel.receive() processMessage(message) }
I'm happy with the for
loop. processMessage
already handle TEE application-level exceptions. For other (eg. channel close, connection issue with pubsub) I'm happy if the app breaks and then eventually rely on auto-healing
capabilities of MIG
.
I'm afraid that handle all the possible exceptions there might be quite complex.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 78 at r6 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
Do we actually want BaseTeeApplication to handle this, or let each application implementation handle acking/nacking? If we're just going to do this, I don't see a huge benefit in exposing the ack/nack from QueueSubscriber.
I see the value of leaving ack
/ nack
here as this class is just a wrapper around the Subscriber
and abstracts that logic away from developers. I think that letting each application handling ack
/ nack
is prone to errors (one may forget them ecc...), especially with Tee Apps
where we want to enforce the processing of one message at the time.
However it doesn’t prevent other classes in future from consuming the Subscriber
with different acknowledgment policies. For example, one might choose to acknowledge a message immediately upon receipt to enable parallel message processing.
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 91 at r6 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
I don't think you need to log here, since I assume this will be called from
main
of the application. Therefore it can just bubble up.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 5 of 5 files at r7, all commit messages.
Reviewable status: all files reviewed, 6 unresolved discussions (waiting on @Marco-Premier and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 72 at r7 (raw file):
queueMessage.ack() } catch (e: InvalidProtocolBufferException) { logger.severe("Failed to parse protobuf message: ${e.message}")
If you're going to log using an exception, use the method that takes in the exception itself rather than including the message.
Suggestion:
logger.log(Level.SEVERE, e) { "Failed to parse protobuf message" }
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 44 at r7 (raw file):
parser = parser, ) { val processedMessages: MutableList<TestWork> = mutableListOf()
nit: since this implementation only has a single Deferred, you can simplify this by dropping the list and having the Deferred contain the message.
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 55 at r7 (raw file):
class BaseTeeApplicationTest { @Rule
This should go in the companion object, assuming it's supposed to be one emulator shared across all test methods.
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 58 at r7 (raw file):
@JvmField val pubSubEmulatorProvider = GooglePubSubEmulatorProvider() private val projectId = "test-project"
These should be constants in the companion object
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 65 at r7 (raw file):
@Before fun setup() {
nit: same comment as before on using meaningful names
Code quote:
setup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @SanjayVas and @stevenwarejones)
src/main/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplication.kt
line 72 at r7 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
If you're going to log using an exception, use the method that takes in the exception itself rather than including the message.
Done.
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 55 at r7 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
This should go in the companion object, assuming it's supposed to be one emulator shared across all test methods.
Done.
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 58 at r7 (raw file):
Previously, SanjayVas (Sanjay Vasandani) wrote…
These should be constants in the companion object
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 2 files at r8, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @Marco-Premier and @stevenwarejones)
src/test/kotlin/org/wfanet/measurement/securecomputation/teesdk/BaseTeeApplicationTest.kt
line 104 at r8 (raw file):
companion object { private const val projectId = "test-project"
nit: CONST_CASE
Code quote:
projectId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 8 files at r4, 1 of 5 files at r5, 1 of 5 files at r7, 5 of 5 files at r9, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @stevenwarejones)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r10, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @stevenwarejones)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 5 files at r7, 4 of 5 files at r9, 1 of 1 files at r10, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @Marco-Premier)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 5 files at r7, 4 of 5 files at r9, 1 of 1 files at r10, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @Marco-Premier)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r11, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @Marco-Premier)
No description provided.