Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add BaseTeeApplication #1904

Merged
merged 25 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ee482b1
feat: add BaseTeeApplication
Nov 6, 2024
f627dcb
add: BaseTeeApplication
Nov 7, 2024
92f8b9b
update: make runWork suspended function
Nov 7, 2024
08932f7
fix: moved proto Message parsing inside QueueClient
Nov 8, 2024
dea0f64
Update on PR comments.
Nov 18, 2024
4f82708
fix: Add run method to BaseTeeApplication
Nov 18, 2024
c67eaab
refactor: replace InMemoryQueueclient with GooglePubSubEmulator
Nov 22, 2024
5c39e0b
refactor: delete test_work_proto in cross_media_measurement and use t…
Nov 22, 2024
c49a138
style: run linter
Nov 22, 2024
3819bd6
refactor: link to proper common-jvm commit
Nov 22, 2024
4cea6e7
refactor: remove obsolete code
Nov 22, 2024
765788c
refactor: use new Subscriber defined in common-jvm/gcloud/subscriber …
Nov 26, 2024
cde8d4e
fix: update test using the updated GooglePubSubEmulatorClient from co…
Nov 26, 2024
5f7d65f
refactor: add setup and teardown methods in BaseTeeApplicationTest
Nov 26, 2024
344f607
refactor: based on latest PR comments
Nov 27, 2024
a555153
refactor: update code on latest common-jvm commit
Nov 29, 2024
920b4b1
refactor: update based on PR comments
Dec 9, 2024
6dc21f6
style: renamed queueName into subscriptionId in BaseTeeApplication class
Dec 9, 2024
5c2d175
refactor: based on PR comments
Dec 12, 2024
baa0bf7
refactor: run bazel REPIN
Dec 12, 2024
1e407ea
refactor: fix conflicts
Dec 13, 2024
a7680f8
style: run linter
Dec 13, 2024
237bf69
style: CONST_CASE for variable in test class
Dec 13, 2024
e983b23
fix: add no-remote-exec tag to BaseTeeApplicationTest
Dec 13, 2024
fd14799
Merge branch 'main' into marcopremier/tee-sdk-base-class
Marco-Premier Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ bazel_dep(
)
bazel_dep(
name = "common-jvm",
version = "0.96.0",
version = "0.97.0",
repo_name = "wfa_common_jvm",
)
bazel_dep(
Expand Down
6 changes: 3 additions & 3 deletions MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

139 changes: 137 additions & 2 deletions maven_install.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": "THERE_IS_NO_DATA_ONLY_ZUUL",
"__INPUT_ARTIFACTS_HASH": 1425304572,
"__RESOLVED_ARTIFACTS_HASH": -1806191256,
"__INPUT_ARTIFACTS_HASH": -107977611,
"__RESOLVED_ARTIFACTS_HASH": 1704463959,
"artifacts": {
"args4j:args4j": {
"shasums": {
Expand Down Expand Up @@ -677,6 +677,13 @@
},
"version": "2.9.0"
},
"com.google.cloud.functions:functions-framework-api": {
"shasums": {
"jar": "f1e318303c6b7df3edce61c2d5b607ab5a34212cbda57aa44f502d36dbcb9c28",
"sources": "8eb9156b0ab40d32d4ce7711d88eae38b77194e49370d684d7138a7f97dd16c0"
},
"version": "1.1.0"
},
"com.google.cloud.opentelemetry:detector-resources-support": {
"shasums": {
"jar": "6252d6a872563933a3f5b522d7e59bd31576ddfd6ed4d1d43d671408185c81ad",
Expand Down Expand Up @@ -831,6 +838,13 @@
},
"version": "2.42.0"
},
"com.google.cloud:google-cloudevent-types": {
"shasums": {
"jar": "9750968913b4388f177283cdb36000b434a0db5c4efe2fd2028a527aa024d0da",
"sources": "c574c09603d97b05048bafe0a83a56579757d256e270979f3219d8289ec8f2be"
},
"version": "0.16.0"
},
"com.google.cloud:grpc-gcp": {
"shasums": {
"jar": "5685df2913047269fd01c0fa469f66121406ccf65bfd1aea9efeb6da46a3575e",
Expand Down Expand Up @@ -1187,6 +1201,20 @@
},
"version": "4.4.0"
},
"io.cloudevents:cloudevents-api": {
"shasums": {
"jar": "1eb08784349017f4d9bf7e31cd9107c7e1dd15138f60507168a3a68b0206cf5b",
"sources": "5a7aa69e3075a40249e3f8a3887c4c54d1c1471d0c0293e3718ff9ae42ac0d98"
},
"version": "4.0.1"
},
"io.cloudevents:cloudevents-core": {
"shasums": {
"jar": "f6226bac64b9fd84b15ec2111eb50b512febec55bbf40495cce841946b8dc120",
"sources": "6d05d3cb64fdf4fa482e8cacfe817c0b9de466cd40102f9add4b0b725b2278b4"
},
"version": "4.0.1"
},
"io.dropwizard.metrics:metrics-core": {
"shasums": {
"jar": "79d903d4ae850c9dee8d3939e5bd8d4172a91fda40b31b7e40a5d8c3e1fe4534",
Expand Down Expand Up @@ -2884,6 +2912,13 @@
},
"version": "1.19.3"
},
"org.testcontainers:gcloud": {
"shasums": {
"jar": "7cdf6654e01169a65fe7e0f6ef78d209fdfd9ab61924cde711648ad46b00d997",
"sources": "698a39cb7f9680db56a17475127ecf97d601338bbeb10b614d0d157412bae784"
},
"version": "1.19.3"
},
"org.testcontainers:jdbc": {
"shasums": {
"jar": "5ed185d08a3a704b7265ebd2b5eafe3e9374c34194d982dc279a5ea62965edfe",
Expand Down Expand Up @@ -3969,6 +4004,9 @@
"com.google.oauth-client:google-oauth-client",
"com.google.protobuf:protobuf-java"
],
"com.google.cloud.functions:functions-framework-api": [
"io.cloudevents:cloudevents-api"
],
"com.google.cloud.sql:cloud-sql-connector-r2dbc-core": [
"com.google.cloud.sql:jdbc-socket-factory-core",
"io.netty:netty-handler",
Expand Down Expand Up @@ -4695,6 +4733,12 @@
"org.slf4j:slf4j-api",
"org.threeten:threetenbp"
],
"com.google.cloud:google-cloudevent-types": [
"com.google.api-client:google-api-client",
"com.google.api.grpc:proto-google-common-protos",
"com.google.protobuf:protobuf-java",
"com.google.protobuf:protobuf-java-util"
],
"com.google.cloud:proto-google-cloud-firestore-bundle-v1": [
"com.google.api.grpc:proto-google-common-protos",
"com.google.api:api-common",
Expand Down Expand Up @@ -4859,6 +4903,9 @@
"commons-collections:commons-collections",
"commons-logging:commons-logging"
],
"io.cloudevents:cloudevents-core": [
"io.cloudevents:cloudevents-api"
],
"io.dropwizard.metrics:metrics-core": [
"org.slf4j:slf4j-api"
],
Expand Down Expand Up @@ -5940,6 +5987,9 @@
"org.testcontainers:database-commons": [
"org.testcontainers:testcontainers"
],
"org.testcontainers:gcloud": [
"org.testcontainers:testcontainers"
],
"org.testcontainers:jdbc": [
"org.testcontainers:database-commons"
],
Expand Down Expand Up @@ -7025,6 +7075,9 @@
"com.google.datastore.v1.client",
"com.google.datastore.v1.client.testing"
],
"com.google.cloud.functions:functions-framework-api": [
"com.google.cloud.functions"
],
"com.google.cloud.opentelemetry:detector-resources-support": [
"com.google.cloud.opentelemetry.detection"
],
Expand Down Expand Up @@ -7165,6 +7218,54 @@
"com.google.cloud.storage.testing",
"com.google.cloud.storage.transfermanager"
],
"com.google.cloud:google-cloudevent-types": [
"com.google.events.cloud.alloydb.v1",
"com.google.events.cloud.apigateway.v1",
"com.google.events.cloud.apigeeregistry.v1",
"com.google.events.cloud.audit.v1",
"com.google.events.cloud.batch.v1",
"com.google.events.cloud.beyondcorp.appconnections.v1",
"com.google.events.cloud.beyondcorp.appconnectors.v1",
"com.google.events.cloud.beyondcorp.appgateways.v1",
"com.google.events.cloud.beyondcorp.clientconnectorservices.v1",
"com.google.events.cloud.beyondcorp.clientgateways.v1",
"com.google.events.cloud.certificatemanager.v1",
"com.google.events.cloud.cloudbuild.v1",
"com.google.events.cloud.clouddms.v1",
"com.google.events.cloud.dataflow.v1beta3",
"com.google.events.cloud.datafusion.v1",
"com.google.events.cloud.dataplex.v1",
"com.google.events.cloud.datastore.v1",
"com.google.events.cloud.datastream.v1",
"com.google.events.cloud.deploy.v1",
"com.google.events.cloud.eventarc.v1",
"com.google.events.cloud.firestore.v1",
"com.google.events.cloud.functions.v2",
"com.google.events.cloud.gkebackup.v1",
"com.google.events.cloud.gkehub.v1",
"com.google.events.cloud.iot.v1",
"com.google.events.cloud.memcache.v1",
"com.google.events.cloud.metastore.v1",
"com.google.events.cloud.networkconnectivity.v1",
"com.google.events.cloud.networkmanagement.v1",
"com.google.events.cloud.networkservices.v1",
"com.google.events.cloud.notebooks.v1",
"com.google.events.cloud.pubsub.v1",
"com.google.events.cloud.redis.v1",
"com.google.events.cloud.scheduler.v1",
"com.google.events.cloud.speech.v1",
"com.google.events.cloud.storage.v1",
"com.google.events.cloud.video.transcoder.v1",
"com.google.events.cloud.visionai.v1",
"com.google.events.cloud.vmmigration.v1",
"com.google.events.cloud.workflows.v1",
"com.google.events.firebase.analytics.v1",
"com.google.events.firebase.auth.v1",
"com.google.events.firebase.database.v1",
"com.google.events.firebase.firebasealerts.v1",
"com.google.events.firebase.remoteconfig.v1",
"com.google.events.firebase.testlab.v1"
],
"com.google.cloud:grpc-gcp": [
"com.google.cloud.grpc",
"com.google.cloud.grpc.multiendpoint",
Expand Down Expand Up @@ -7484,6 +7585,27 @@
"info.picocli:picocli": [
"picocli"
],
"io.cloudevents:cloudevents-api": [
"io.cloudevents",
"io.cloudevents.lang",
"io.cloudevents.rw",
"io.cloudevents.types"
],
"io.cloudevents:cloudevents-core": [
"io.cloudevents.core",
"io.cloudevents.core.builder",
"io.cloudevents.core.data",
"io.cloudevents.core.extensions",
"io.cloudevents.core.extensions.impl",
"io.cloudevents.core.format",
"io.cloudevents.core.impl",
"io.cloudevents.core.message",
"io.cloudevents.core.message.impl",
"io.cloudevents.core.provider",
"io.cloudevents.core.v03",
"io.cloudevents.core.v1",
"io.cloudevents.core.validator"
],
"io.dropwizard.metrics:metrics-core": [
"com.codahale.metrics"
],
Expand Down Expand Up @@ -11204,6 +11326,9 @@
"org.testcontainers.exception",
"org.testcontainers.ext"
],
"org.testcontainers:gcloud": [
"org.testcontainers.containers"
],
"org.testcontainers:jdbc": [
"org.testcontainers.containers",
"org.testcontainers.jdbc",
Expand Down Expand Up @@ -12241,6 +12366,8 @@
"com.google.cloud.bigtable:bigtable-metrics-api:jar:sources",
"com.google.cloud.datastore:datastore-v1-proto-client",
"com.google.cloud.datastore:datastore-v1-proto-client:jar:sources",
"com.google.cloud.functions:functions-framework-api",
"com.google.cloud.functions:functions-framework-api:jar:sources",
"com.google.cloud.opentelemetry:detector-resources-support",
"com.google.cloud.opentelemetry:detector-resources-support:jar:sources",
"com.google.cloud.opentelemetry:exporter-metrics",
Expand Down Expand Up @@ -12285,6 +12412,8 @@
"com.google.cloud:google-cloud-spanner:jar:sources",
"com.google.cloud:google-cloud-storage",
"com.google.cloud:google-cloud-storage:jar:sources",
"com.google.cloud:google-cloudevent-types",
"com.google.cloud:google-cloudevent-types:jar:sources",
"com.google.cloud:grpc-gcp",
"com.google.cloud:grpc-gcp:jar:sources",
"com.google.cloud:proto-google-cloud-firestore-bundle-v1",
Expand Down Expand Up @@ -12386,6 +12515,10 @@
"dev.failsafe:failsafe:jar:sources",
"info.picocli:picocli",
"info.picocli:picocli:jar:sources",
"io.cloudevents:cloudevents-api",
"io.cloudevents:cloudevents-api:jar:sources",
"io.cloudevents:cloudevents-core",
"io.cloudevents:cloudevents-core:jar:sources",
"io.dropwizard.metrics:metrics-core",
"io.dropwizard.metrics:metrics-core:jar:sources",
"io.github.classgraph:classgraph",
Expand Down Expand Up @@ -12869,6 +13002,8 @@
"org.springframework:spring-webmvc:jar:sources",
"org.testcontainers:database-commons",
"org.testcontainers:database-commons:jar:sources",
"org.testcontainers:gcloud",
"org.testcontainers:gcloud:jar:sources",
"org.testcontainers:jdbc",
"org.testcontainers:jdbc:jar:sources",
"org.testcontainers:postgresql",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
load("@wfa_rules_kotlin_jvm//kotlin:defs.bzl", "kt_jvm_library")

package(default_visibility = ["//visibility:public"])

kt_jvm_library(
name = "base_tee_application",
srcs = ["BaseTeeApplication.kt"],
deps = [
"@wfa_common_jvm//imports/java/com/google/protobuf",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/queue:queue_subscriber",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2024 The Cross-Media Measurement Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wfanet.measurement.securecomputation.teesdk

import com.google.protobuf.InvalidProtocolBufferException
import com.google.protobuf.Message
import com.google.protobuf.Parser
import java.util.logging.Level
import java.util.logging.Logger
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.consumeAsFlow
import org.wfanet.measurement.queue.QueueSubscriber

/**
* BaseTeeApplication is an abstract base class for TEE applications that automatically subscribes
* to a specified queue and processes messages as they arrive.
*
* @param T The type of message that this application will process.
* @param subscriptionId The name of the subscription to which this application subscribes.
* @param queueSubscriber A client that manages connections and interactions with the queue.
* @param parser [Parser] used to parse serialized queue messages into [T] instances.
*/
abstract class BaseTeeApplication<T : Message>(
private val subscriptionId: String,
private val queueSubscriber: QueueSubscriber,
private val parser: Parser<T>,
) : AutoCloseable {

/** Starts the TEE application by listening for messages on the specified queue. */
suspend fun run() {
receiveAndProcessMessages()
}

/**
* Begins listening for messages on the specified queue. Each message is processed as it arrives.
* If an error occurs during the message flow, it is logged and handling continues.
*/
private suspend fun receiveAndProcessMessages() {
val messageChannel: ReceiveChannel<QueueSubscriber.QueueMessage<T>> =
queueSubscriber.subscribe(subscriptionId, parser)
for (message: QueueSubscriber.QueueMessage<T> in messageChannel) {
processMessage(message)
}
}

/**
* Processes each message received from the queue by attempting to parse and pass it to [runWork].
* If parsing fails, the message is negatively acknowledged and discarded. If processing fails,
* the message is negatively acknowledged and optionally requeued.
*
* @param queueMessage The raw message received from the queue.
*/
private suspend fun processMessage(queueMessage: QueueSubscriber.QueueMessage<T>) {
try {
runWork(queueMessage.body)
queueMessage.ack()
} catch (e: InvalidProtocolBufferException) {
logger.log(Level.SEVERE, e) { "Failed to parse protobuf message" }
queueMessage.nack()
} catch (e: Exception) {
logger.log(Level.SEVERE, e) { "Error processing message" }
queueMessage.nack()
}
}

abstract suspend fun runWork(message: T)

override fun close() {
queueSubscriber.close()
}

companion object {
protected val logger = Logger.getLogger(this::class.java.name)
}
}
Loading