Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tests to provision 3 nodes cluster tests with a replicated loglet with replication property 2 #27

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ plugins {
id("com.diffplug.spotless") version "6.25.0"
id("com.gradleup.shadow") version "8.3.5"
id("com.github.jk1.dependency-license-report") version "2.9"

alias(libs.plugins.protobuf)
}

group = "dev.restate.sdktesting"
Expand All @@ -19,6 +21,8 @@ repositories {
mavenCentral()
// OSSRH Snapshots repo
maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
// for protobuf-gradle-plugin dependencies
google()
}

dependencies {
Expand Down Expand Up @@ -57,13 +61,32 @@ dependencies {

implementation(libs.assertj)
implementation(libs.awaitility)

// grpc
implementation(libs.grpc.kotlin.stub)
implementation(libs.grpc.protobuf)
implementation(libs.grpc.netty.shaded)
implementation(libs.protobuf.kotlin)
}

kotlin { jvmToolchain(21) }

val generatedJ2SPDir = layout.buildDirectory.dir("generated/j2sp")

sourceSets { main { java.srcDir(generatedJ2SPDir) } }
val generatedProto = layout.buildDirectory.dir("generated/source/proto/main")

sourceSets {
main {
java {
srcDir(generatedJ2SPDir)
srcDir(generatedProto.get().dir("java"))
srcDir(generatedProto.get().dir("grpc"))
}
kotlin {
srcDir(generatedProto.get().dir("kotlin"))
srcDir(generatedProto.get().dir("grpckt"))
}
}
}

jsonSchema2Pojo {
setSource(files("$projectDir/src/main/json"))
Expand All @@ -76,12 +99,41 @@ jsonSchema2Pojo {
generateBuilders = true
}

protobuf {
protoc { artifact = libs.protoc.asProvider().get().toString() }
plugins {
create("grpc") { artifact = libs.protoc.gen.grpc.java.get().toString() }
create("grpckt") { artifact = libs.protoc.gen.grpc.kotlin.get().toString() + ":jdk8@jar" }
}
generateProtoTasks {
all().forEach {
it.plugins {
create("grpc")
create("grpckt")
}
it.builtins { create("kotlin") }
}
}
}

tasks {
getByName("compileKotlin") { dependsOn(generateJsonSchema2Pojo) }
getByName("compileKotlin") {
dependsOn(generateJsonSchema2Pojo)
dependsOn(generateProto)
}

test { useJUnitPlatform() }
}

// Workaround to enforce that kspKotlin runs after the generateProto task. Otherwise, gradle
// complains about an implicit
// dependency. See https://github.com/google/protobuf-gradle-plugin/issues/694.
tasks.configureEach {
if (name == "kspKotlin") {
mustRunAfter(tasks.generateProto)
}
}

spotless {
kotlin {
ktfmt()
Expand Down
3 changes: 3 additions & 0 deletions config/allowed-licenses.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
},
{
"moduleLicense": "Bouncy Castle Licence"
},
{
"moduleLicense": "COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0"
}
Comment on lines +24 to 26
Copy link
Contributor Author

@tillrohrmann tillrohrmann Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This license is needed for javax.annotation:javax.annotation-api which is pulled in by io.grpc:grpc-kotlin-stub. It should not be a problem since we are not changing the sources.

]
}
17 changes: 17 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ dependencyResolutionManagement {
mavenCentral()
// OSSRH Snapshots repo
maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
// for protobuf-gradle-plugin dependencies
google()
}

versionCatalogs {
Expand Down Expand Up @@ -75,6 +77,21 @@ dependencyResolutionManagement {
library("symbol-processing-api", "com.google.devtools.ksp", "symbol-processing-api")
.versionRef("ksp")
plugin("ksp", "com.google.devtools.ksp").versionRef("ksp")

library("grpc-kotlin-stub", "io.grpc", "grpc-kotlin-stub").version("1.4.1")

version("grpc", "1.69.0")
library("grpc-protobuf", "io.grpc", "grpc-protobuf").versionRef("grpc")
library("grpc-netty-shaded", "io.grpc", "grpc-netty-shaded").versionRef("grpc")
library("protoc-gen-grpc-java", "io.grpc", "protoc-gen-grpc-java").versionRef("grpc")

library("protoc-gen-grpc-kotlin", "io.grpc", "protoc-gen-grpc-kotlin").version("1.4.1")

version("protobuf", "4.29.2")
library("protobuf-kotlin", "com.google.protobuf", "protobuf-kotlin").versionRef("protobuf")
library("protoc", "com.google.protobuf", "protoc").versionRef("protobuf")

plugin("protobuf", "com.google.protobuf").version("0.9.4")
}
}
}
21 changes: 15 additions & 6 deletions src/main/kotlin/dev/restate/sdktesting/infra/RestateContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,28 @@ class RestateContainer(
private val WAIT_STARTUP_STRATEGY =
WaitAllStrategy()
.withStrategy(
Wait.forHttp("/restate/health")
.forPort(RUNTIME_INGRESS_ENDPOINT_PORT)
Wait.forHttp("/metrics")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/metrics will be reachable once the node grpc server starts up. Note at this point (if manual provisioning is enabled) the other services (ingress, admin, etc.) are not yet running.

.forPort(RUNTIME_NODE_PORT)
.withRateLimiter(
RateLimiterBuilder.newBuilder()
.withRate(200, TimeUnit.MILLISECONDS)
.withConstantThroughput()
.build()))
.withStartupTimeout(120.seconds.toJavaDuration())

private val WAIT_INGRESS_READY_STRATEGY =
WaitAllStrategy()
.withStrategy(
Wait.forHttp("/health")
.forPort(RUNTIME_ADMIN_ENDPOINT_PORT)
Wait.forHttp("/restate/health")
.forPort(RUNTIME_INGRESS_ENDPOINT_PORT)
.withRateLimiter(
RateLimiterBuilder.newBuilder()
.withRate(200, TimeUnit.MILLISECONDS)
.withConstantThroughput()
.build()))
.withStartupTimeout(120.seconds.toJavaDuration())

fun bootstrapRestateCluster(
fun createRestateContainers(
config: RestateDeployerConfig,
network: Network,
envs: Map<String, String>,
Expand All @@ -80,13 +84,14 @@ class RestateContainer(
mapOf<String, String>(
"RESTATE_CLUSTER_NAME" to clusterId,
"RESTATE_BIFROST__DEFAULT_PROVIDER" to "replicated",
"RESTATE_ALLOW_BOOTSTRAP" to "true",
"RESTATE_ALLOW_BOOTSTRAP" to "false",
"RESTATE_ROLES" to "[worker,log-server,admin,metadata-store]",
)
val followerEnvs =
mapOf<String, String>(
"RESTATE_CLUSTER_NAME" to clusterId,
"RESTATE_BIFROST__DEFAULT_PROVIDER" to "replicated",
"RESTATE_ALLOW_BOOTSTRAP" to "false",
"RESTATE_ROLES" to "[worker,admin,log-server]",
"RESTATE_METADATA_STORE_CLIENT__ADDRESS" to
"http://$RESTATE_RUNTIME:$RUNTIME_NODE_PORT")
Expand Down Expand Up @@ -191,6 +196,10 @@ class RestateContainer(
WAIT_STARTUP_STRATEGY.waitUntilReady(this)
}

fun waitIngressReady() {
WAIT_INGRESS_READY_STRATEGY.waitUntilReady(this)
}

fun dumpConfiguration() {
check(isRunning) { "The container is not running, can't dump configuration" }
dockerClient.killContainerCmd(containerId).withSignal("SIGUSR1").exec()
Expand Down
60 changes: 58 additions & 2 deletions src/main/kotlin/dev/restate/sdktesting/infra/RestateDeployer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import dev.restate.admin.model.RegisterDeploymentRequest
import dev.restate.admin.model.RegisterDeploymentRequestAnyOf
import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions
import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema
import io.grpc.ManagedChannelBuilder
import java.io.File
import java.net.URI
import java.net.http.HttpClient
Expand All @@ -28,6 +29,7 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
import kotlinx.coroutines.runBlocking
import org.apache.logging.log4j.CloseableThreadContext
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.ThreadContext
Expand All @@ -38,6 +40,9 @@ import org.testcontainers.images.builder.Transferable
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider
import org.testcontainers.shaded.com.github.dockerjava.core.DockerClientConfig
import restate.cluster.*
import restate.node_ctl_svc.NodeCtlSvcGrpcKt
import restate.node_ctl_svc.provisionClusterRequest

class RestateDeployer
private constructor(
Expand Down Expand Up @@ -169,12 +174,23 @@ private constructor(
}
.associate { it.second.first to (it.first to it.second.second) }
private val runtimeContainers: List<RestateContainer> =
RestateContainer.bootstrapRestateCluster(
RestateContainer.createRestateContainers(
config, network, runtimeContainerEnvs, configSchema, copyToContainer, config.restateNodes)

private val deployedContainers: Map<String, ContainerHandle> =
(runtimeContainers.map {
it.hostname to ContainerHandle(it, restartWaitStrategy = { it.waitStartup() })
it.hostname to
ContainerHandle(
it,
restartWaitStrategy = {
it.waitStartup()
// For the KillRuntime and StopRuntime tests, we need to wait for the ingress
// to be ready.
// Otherwise, the client seems to be stuck when invoking services. Maybe it's
// because the new
// route has not been set up by Docker?
it.waitIngressReady()
})
} +
serviceContainers.map { it.key to ContainerHandle(it.value.second) } +
additionalContainers.map { it.key to ContainerHandle(it.value) })
Expand Down Expand Up @@ -282,9 +298,49 @@ private constructor(
.toTypedArray())
.get(150, TimeUnit.SECONDS)

if (config.restateNodes > 1) {
provisionCluster()
}

executor.shutdown()
}

private fun provisionCluster() {
LOG.debug("Manually provisioning the cluster")

val nodePort = getContainerPort(RESTATE_RUNTIME, RUNTIME_NODE_PORT)

val channel = ManagedChannelBuilder.forAddress("localhost", nodePort).usePlaintext().build()
val client = NodeCtlSvcGrpcKt.NodeCtlSvcCoroutineStub(channel)
val request = provisionClusterRequest {
dryRun = false
logProvider = defaultProvider {
provider = "replicated"
replicatedConfig = replicatedProviderConfig {
replicationProperty = "2"
nodesetSelectionStrategy = nodeSetSelectionStrategy {
kind = Cluster.NodeSetSelectionStrategyKind.StrictFaultTolerantGreedy
}
}
}
}

Unreliables.retryUntilSuccess(20, TimeUnit.SECONDS) {
try {
return@retryUntilSuccess runBlocking { client.provisionCluster(request) }
} catch (e: ApiException) {
Thread.sleep(30)
throw IllegalStateException(
"Error when provisioning cluster, got status code ${e.code} with body: ${e.responseBody}",
e)
}
}

channel.shutdown()

LOG.debug("Cluster has been provisioned")
}

private fun discoverDeployment(client: DeploymentApi, spec: ServiceSpec) {
val url = spec.getEndpointUrl(config)
if (spec.skipRegistration) {
Expand Down
78 changes: 78 additions & 0 deletions src/main/proto/node_ctl_svc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate service protocol, which is
// released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/proto/blob/main/LICENSE

syntax = "proto3";

import "google/protobuf/empty.proto";
import "restate/cluster.proto";
import "restate/common.proto";
import "restate/node.proto";

package restate.node_ctl_svc;

service NodeCtlSvc {
// Get identity information from this node.
rpc GetIdent(google.protobuf.Empty) returns (IdentResponse);

rpc GetMetadata(GetMetadataRequest) returns (GetMetadataResponse);

// Provision the Restate cluster on this node.
rpc ProvisionCluster(ProvisionClusterRequest) returns (ProvisionClusterResponse);
}

message ProvisionClusterRequest {
bool dry_run = 1;
optional uint32 num_partitions = 2;
optional restate.cluster.ReplicationStrategy placement_strategy = 3;
optional restate.cluster.DefaultProvider log_provider = 4;
}

enum ProvisionClusterResponseKind {
ProvisionClusterResponseType_UNKNOWN = 0;
DRY_RUN = 1;
NEWLY_PROVISIONED = 2;
ALREADY_PROVISIONED = 3;
}

message ProvisionClusterResponse {
ProvisionClusterResponseKind kind = 1;
// This field will be empty if the cluster is already provisioned
optional restate.cluster.ClusterConfiguration cluster_configuration = 3;
}

message IdentResponse {
restate.common.NodeStatus status = 1;
restate.common.NodeId node_id = 2;
string cluster_name = 3;
// indicates which roles are enabled on this node
repeated string roles = 4;
// Age of the running node in seconds (how many seconds since the daemon
// started)
uint64 age_s = 5;
restate.common.AdminStatus admin_status = 6;
restate.common.WorkerStatus worker_status = 7;
restate.common.LogServerStatus log_server_status = 8;
restate.common.MetadataServerStatus metadata_server_status = 9;
uint32 nodes_config_version = 10;
uint32 logs_version = 11;
uint32 schema_version = 12;
uint32 partition_table_version = 13;
}

message GetMetadataRequest {
// If set, we'll first sync with metadata store to esnure we are returning the latest value.
// Otherwise, we'll return the local value on this node.
bool sync = 1;
restate.node.MetadataKind kind = 2;
}

message GetMetadataResponse {
// polymorphic. The value depends on the MetadataKind requested
bytes encoded = 1;
}
Loading
Loading