Skip to content

Commit

Permalink
Update tests to provision 3 nodes cluster tests with a replicated log…
Browse files Browse the repository at this point in the history
…let with replication property 2

This commit adds the NodeCtl grpc svc to the repo to generate a grpc client to manually
provision a replicated loglet with a replciation property 2. W/o manually provisioning the
cluster, the replication property defaults to 1.

Note: Whenever the *.proto files change in the restate repo, they need to be updated in this
repository as well if there is an incompatible change.

This fixes #26.
  • Loading branch information
tillrohrmann committed Jan 14, 2025
1 parent 2cd408b commit 4a94d8a
Show file tree
Hide file tree
Showing 8 changed files with 482 additions and 11 deletions.
56 changes: 53 additions & 3 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ plugins {
id("com.diffplug.spotless") version "6.25.0"
id("com.gradleup.shadow") version "8.3.5"
id("com.github.jk1.dependency-license-report") version "2.9"

alias(libs.plugins.protobuf)
}

group = "dev.restate.sdktesting"
Expand Down Expand Up @@ -57,13 +59,32 @@ dependencies {

implementation(libs.assertj)
implementation(libs.awaitility)

// grpc
implementation(libs.grpc.kotlin.stub)
implementation(libs.grpc.protobuf)
implementation(libs.grpc.netty.shaded)
implementation(libs.protobuf.kotlin)
}

kotlin { jvmToolchain(21) }

val generatedJ2SPDir = layout.buildDirectory.dir("generated/j2sp")

sourceSets { main { java.srcDir(generatedJ2SPDir) } }
val generatedProto = layout.buildDirectory.dir("generated/source/proto/main")

sourceSets {
main {
java {
srcDir(generatedJ2SPDir)
srcDir(generatedProto.get().dir("java"))
srcDir(generatedProto.get().dir("grpc"))
}
kotlin {
srcDir(generatedProto.get().dir("kotlin"))
srcDir(generatedProto.get().dir("grpckt"))
}
}
}

jsonSchema2Pojo {
setSource(files("$projectDir/src/main/json"))
Expand All @@ -76,12 +97,41 @@ jsonSchema2Pojo {
generateBuilders = true
}

protobuf {
protoc { artifact = libs.protoc.asProvider().get().toString() }
plugins {
create("grpc") { artifact = libs.protoc.gen.grpc.java.get().toString() }
create("grpckt") { artifact = libs.protoc.gen.grpc.kotlin.get().toString() + ":jdk8@jar" }
}
generateProtoTasks {
all().forEach {
it.plugins {
create("grpc")
create("grpckt")
}
it.builtins { create("kotlin") }
}
}
}

tasks {
getByName("compileKotlin") { dependsOn(generateJsonSchema2Pojo) }
getByName("compileKotlin") {
dependsOn(generateJsonSchema2Pojo)
dependsOn(generateProto)
}

test { useJUnitPlatform() }
}

// Workaround to enforce that kspKotlin runs after the generateProto task. Otherwise, gradle
// complains about an implicit
// dependency. See https://github.com/google/protobuf-gradle-plugin/issues/694.
tasks.configureEach {
if (name == "kspKotlin") {
mustRunAfter(tasks.generateProto)
}
}

spotless {
kotlin {
ktfmt()
Expand Down
3 changes: 3 additions & 0 deletions config/allowed-licenses.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
},
{
"moduleLicense": "Bouncy Castle Licence"
},
{
"moduleLicense": "COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0"
}
]
}
15 changes: 15 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@ dependencyResolutionManagement {
library("symbol-processing-api", "com.google.devtools.ksp", "symbol-processing-api")
.versionRef("ksp")
plugin("ksp", "com.google.devtools.ksp").versionRef("ksp")

library("grpc-kotlin-stub", "io.grpc", "grpc-kotlin-stub").version("1.4.1")

version("grpc", "1.69.0")
library("grpc-protobuf", "io.grpc", "grpc-protobuf").versionRef("grpc")
library("grpc-netty-shaded", "io.grpc", "grpc-netty-shaded").versionRef("grpc")
library("protoc-gen-grpc-java", "io.grpc", "protoc-gen-grpc-java").versionRef("grpc")

library("protoc-gen-grpc-kotlin", "io.grpc", "protoc-gen-grpc-kotlin").version("1.4.1")

version("protobuf", "4.29.2")
library("protobuf-kotlin", "com.google.protobuf", "protobuf-kotlin").versionRef("protobuf")
library("protoc", "com.google.protobuf", "protoc").versionRef("protobuf")

plugin("protobuf", "com.google.protobuf").version("0.9.4")
}
}
}
25 changes: 19 additions & 6 deletions src/main/kotlin/dev/restate/sdktesting/infra/RestateContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,32 @@ class RestateContainer(
private val WAIT_STARTUP_STRATEGY =
WaitAllStrategy()
.withStrategy(
Wait.forHttp("/restate/health")
.forPort(RUNTIME_INGRESS_ENDPOINT_PORT)
// /metrics will be reachable once the node grpc server starts up. Note at this
// point (if manual
// provisioning is enabled) the other services (ingress, admin, etc.) are not yet
// running.
Wait.forHttp("/metrics")
.forPort(RUNTIME_NODE_PORT)
.withRateLimiter(
RateLimiterBuilder.newBuilder()
.withRate(200, TimeUnit.MILLISECONDS)
.withConstantThroughput()
.build()))
.withStartupTimeout(120.seconds.toJavaDuration())

private val WAIT_INGRESS_READY_STRATEGY =
WaitAllStrategy()
.withStrategy(
Wait.forHttp("/health")
.forPort(RUNTIME_ADMIN_ENDPOINT_PORT)
Wait.forHttp("/restate/health")
.forPort(RUNTIME_INGRESS_ENDPOINT_PORT)
.withRateLimiter(
RateLimiterBuilder.newBuilder()
.withRate(200, TimeUnit.MILLISECONDS)
.withConstantThroughput()
.build()))
.withStartupTimeout(120.seconds.toJavaDuration())

fun bootstrapRestateCluster(
fun createRestateContainers(
config: RestateDeployerConfig,
network: Network,
envs: Map<String, String>,
Expand All @@ -80,13 +88,14 @@ class RestateContainer(
mapOf<String, String>(
"RESTATE_CLUSTER_NAME" to clusterId,
"RESTATE_BIFROST__DEFAULT_PROVIDER" to "replicated",
"RESTATE_ALLOW_BOOTSTRAP" to "true",
"RESTATE_ALLOW_BOOTSTRAP" to "false",
"RESTATE_ROLES" to "[worker,log-server,admin,metadata-store]",
)
val followerEnvs =
mapOf<String, String>(
"RESTATE_CLUSTER_NAME" to clusterId,
"RESTATE_BIFROST__DEFAULT_PROVIDER" to "replicated",
"RESTATE_ALLOW_BOOTSTRAP" to "false",
"RESTATE_ROLES" to "[worker,admin,log-server]",
"RESTATE_METADATA_STORE_CLIENT__ADDRESS" to
"http://$RESTATE_RUNTIME:$RUNTIME_NODE_PORT")
Expand Down Expand Up @@ -192,6 +201,10 @@ class RestateContainer(
WAIT_STARTUP_STRATEGY.waitUntilReady(this)
}

fun waitIngressReady() {
WAIT_INGRESS_READY_STRATEGY.waitUntilReady(this)
}

fun dumpConfiguration() {
check(isRunning) { "The container is not running, can't dump configuration" }
dockerClient.killContainerCmd(containerId).withSignal("SIGUSR1").exec()
Expand Down
66 changes: 64 additions & 2 deletions src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import dev.restate.admin.model.RegisterDeploymentRequest
import dev.restate.admin.model.RegisterDeploymentRequestAnyOf
import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions
import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema
import io.grpc.ManagedChannelBuilder
import io.grpc.Status.Code
import io.grpc.StatusException
import java.io.File
import java.net.URI
import java.net.http.HttpClient
Expand All @@ -28,6 +31,7 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import kotlinx.coroutines.runBlocking
import org.apache.logging.log4j.CloseableThreadContext
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.ThreadContext
Expand All @@ -38,6 +42,9 @@ import org.testcontainers.images.builder.Transferable
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider
import org.testcontainers.shaded.com.github.dockerjava.core.DockerClientConfig
import restate.cluster.*
import restate.node_ctl_svc.NodeCtlSvcGrpcKt
import restate.node_ctl_svc.provisionClusterRequest

class RestateDeployer
private constructor(
Expand Down Expand Up @@ -169,12 +176,23 @@ private constructor(
}
.associate { it.second.first to (it.first to it.second.second) }
private val runtimeContainers: List<RestateContainer> =
RestateContainer.bootstrapRestateCluster(
RestateContainer.createRestateContainers(
config, network, runtimeContainerEnvs, configSchema, copyToContainer, config.restateNodes)

private val deployedContainers: Map<String, ContainerHandle> =
(runtimeContainers.map {
it.hostname to ContainerHandle(it, restartWaitStrategy = { it.waitStartup() })
it.hostname to
ContainerHandle(
it,
restartWaitStrategy = {
it.waitStartup()
// For the KillRuntime and StopRuntime tests, we need to wait for the ingress
// to be ready.
// Otherwise, the client seems to be stuck when invoking services. Maybe it's
// because the new
// route has not been set up by Docker?
it.waitIngressReady()
})
} +
serviceContainers.map { it.key to ContainerHandle(it.value.second) } +
additionalContainers.map { it.key to ContainerHandle(it.value) })
Expand Down Expand Up @@ -282,9 +300,53 @@ private constructor(
.toTypedArray())
.get(150, TimeUnit.SECONDS)

if (config.restateNodes > 1) {
provisionCluster()
}

executor.shutdown()
}

private fun provisionCluster() {
LOG.debug("Manually provisioning the cluster")

val nodePort = getContainerPort(RESTATE_RUNTIME, RUNTIME_NODE_PORT)

val channel = ManagedChannelBuilder.forAddress("localhost", nodePort).usePlaintext().build()
val client = NodeCtlSvcGrpcKt.NodeCtlSvcCoroutineStub(channel)
val request = provisionClusterRequest {
dryRun = false
logProvider = defaultProvider {
provider = "replicated"
replicatedConfig = replicatedProviderConfig {
replicationProperty = "2"
nodesetSelectionStrategy = nodeSetSelectionStrategy {
kind = Cluster.NodeSetSelectionStrategyKind.StrictFaultTolerantGreedy
}
}
}
}

Unreliables.retryUntilSuccess(20, TimeUnit.SECONDS) {
try {
runBlocking {
client.provisionCluster(request)
Unit
}
} catch (e: StatusException) {
// already exists code indicates that the cluster has been already provisioned
if (e.status.code != Code.ALREADY_EXISTS) {
Thread.sleep(100)
throw Exception("Failed provisioning the cluster. Retrying.", e)
}
}
}

channel.shutdown()

LOG.debug("Cluster has been provisioned")
}

private fun discoverDeployment(client: DeploymentApi, spec: ServiceSpec) {
val url = spec.getEndpointUrl(config)
if (spec.skipRegistration) {
Expand Down
Loading

0 comments on commit 4a94d8a

Please sign in to comment.