Skip to content

Commit

Permalink
Merge #62082
Browse files Browse the repository at this point in the history
62082: roachtest: upgrade to confluent 6 in cdc roachtests r=HonoreDB a=stevendanna

We are currently testing on a old version of confluent (4.0.0) that is
actually end-of-life.

https://docs.confluent.io/platform/current/installation/versions-interoperability.html

This moves us to the latest community version.

To run the tests locally, macOS users might need to make sure that an
appropriate version of java is in their PATH. For example:

    brew install openjdk@11
    PATH="/usr/local/Cellar/openjdk@11/11.0.10/:$PATH"

Unfortunately, this adds yet another network dependency as they have
moved the confluent-cli binary out of their main archive.

Fixes #61271

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Mar 30, 2021
2 parents 52d81ca + 3528848 commit 4ee2449
Showing 1 changed file with 105 additions and 30 deletions.
135 changes: 105 additions & 30 deletions pkg/cmd/roachtest/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func runCDCBank(ctx context.Context, t *test, c *cluster) {
// runner, so kafka needs to advertise the external address. Better
// would be a binary we could run on one of the roachprod machines.
c.Run(ctx, kafka.nodes, `echo "advertised.listeners=PLAINTEXT://`+kafka.consumerURL(ctx)+`" >> `+
kafka.basePath()+`/confluent-4.0.0/etc/kafka/server.properties`)
filepath.Join(kafka.configDir(), "server.properties"))
}
kafka.start(ctx, "kafka")
defer kafka.stop(ctx)
Expand Down Expand Up @@ -427,10 +427,12 @@ func runCDCSchemaRegistry(ctx context.Context, t *test, c *cluster) {
t.Fatal(err)
}

folder := kafka.basePath()
output, err := c.RunWithBuffer(ctx, t.l, kafkaNode,
`CONFLUENT_CURRENT=`+folder+` `+folder+`/confluent-4.0.0/bin/kafka-avro-console-consumer `+
`--from-beginning --topic=foo --max-messages=14 --bootstrap-server=localhost:9092`)
kafka.makeCommand("kafka-avro-console-consumer",
"--from-beginning",
"--topic=foo",
"--max-messages=14",
"--bootstrap-server=localhost:9092"))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -858,17 +860,47 @@ func randomSerial() (*big.Int, error) {
}

const (
conluentDownloadURL = "https://storage.googleapis.com/cockroach-fixtures/tools/confluent-oss-4.0.0-2.11.tar.gz"
confluentSHA256 = "5cfa68b4368f28bd9231786bb710431394dc14a2b37eecf360e820271ee84f43"
confluentDownloadURL = "https://storage.googleapis.com/cockroach-fixtures/tools/confluent-community-6.1.0.tar.gz"
confluentSHA256 = "53b0e2f08c4cfc55087fa5c9120a614ef04d306db6ec3bcd7710f89f05355355"
confluentInstallBase = "confluent-6.1.0"

// TODO(ssd): Perhaps something like this could be a roachprod command?
confluentDownloadScript = `#!/usr/bin/env bash
confluentCLIVersion = "1.26.0"
confluentCLIDownloadURLBase = "https://s3-us-west-2.amazonaws.com/confluent.cloud/confluent-cli/archives"
)

// TODO(ssd): Perhaps something like this could be a roachprod command?
var confluentDownloadScript = fmt.Sprintf(`#!/usr/bin/env bash
set -euo pipefail
CONFLUENT_URL="$1"
CONFLUENT_SHA256="$2"
CONFLUENT_URL="%s"
CONFLUENT_SHA256="%s"
CONFLUENT_INSTALL_BASE="%s"
CONFLUENT_CLI_VERSION="%s"
CONFLUENT_CLI_URL_BASE="%s"
CONFLUENT_CLI_TAR_PATH="/tmp/confluent-cli-$CONFLUENT_CLI_VERSION.tar.gz"
CONFLUENT_TAR_PATH=/tmp/confluent.tar.gz
CONFLUENT_DIR="$3"
CONFLUENT_DIR="$1"
os() {
uname -s | tr '[:upper:]' '[:lower:]'
}
arch() {
local arch
arch=$(uname -m)
case "$arch" in
x86_64)
echo "amd64"
;;
*)
echo "$arch"
;;
esac
}
checkFile() {
local file_name="${1}"
Expand All @@ -891,16 +923,46 @@ checkFile() {
fi
}
if ! [[ -f "$CONFLUENT_TAR_PATH" ]] || ! checkFile "$CONFLUENT_TAR_PATH" "$CONFLUENT_SHA256"; then
download() {
URL="$1"
OUTPUT_FILE="$2"
for i in $(seq 1 5); do
if curl --retry 3 --retry-delay 1 -o "$CONFLUENT_TAR_PATH" "$CONFLUENT_URL"; then
if curl --retry 3 --retry-delay 1 --fail --show-error -o "$OUTPUT_FILE" "$URL"; then
break
fi
sleep 15;
done
}
PLATFORM="$(os)/$(arch)"
case "$PLATFORM" in
linux/amd64)
CONFLUENT_CLI_URL="${CONFLUENT_CLI_URL_BASE}/${CONFLUENT_CLI_VERSION}/confluent_v${CONFLUENT_CLI_VERSION}_linux_amd64.tar.gz"
;;
darwin/amd64)
CONFLUENT_CLI_URL="${CONFLUENT_CLI_URL_BASE}/${CONFLUENT_CLI_VERSION}/confluent_v${CONFLUENT_CLI_VERSION}_darwin_amd64.tar.gz"
;;
*)
echo "We don't know how to install the confluent CLI for \"${PLATFORM}\""
exit 1
;;
esac
if ! [[ -f "$CONFLUENT_TAR_PATH" ]] || ! checkFile "$CONFLUENT_TAR_PATH" "$CONFLUENT_SHA256"; then
download "$CONFLUENT_URL" "$CONFLUENT_TAR_PATH"
fi
tar xvf /tmp/confluent.tar.gz -C "$CONFLUENT_DIR"
`
tar xvf "$CONFLUENT_TAR_PATH" -C "$CONFLUENT_DIR"
if ! [[ -f "$CONFLUENT_DIR/bin/confluent" ]]; then
if ! [[ -f "$CONFLUENT_CLI_TAR_PATH" ]]; then
download "$CONFLUENT_CLI_URL" "$CONFLUENT_CLI_TAR_PATH"
fi
tar xvf "$CONFLUENT_CLI_TAR_PATH" -C "$CONFLUENT_DIR/$CONFLUENT_INSTALL_BASE/bin/" --strip-components=1 confluent/confluent
fi
`, confluentDownloadURL, confluentSHA256, confluentInstallBase, confluentCLIVersion, confluentCLIDownloadURLBase)

const (
// kafkaJAASConfig is a JAAS configuration file that creats a
// user called "plain" with password "plain-secret" that can
// authenticate via SASL/PLAIN.
Expand Down Expand Up @@ -982,16 +1044,24 @@ func (k kafkaManager) basePath() string {
return `/mnt/data1/confluent`
}

func (k kafkaManager) confluentHome() string {
return filepath.Join(k.basePath(), confluentInstallBase)
}

func (k kafkaManager) configDir() string {
return k.basePath() + `/confluent-4.0.0/etc/kafka/`
return filepath.Join(k.basePath(), confluentInstallBase, "etc/kafka")
}

func (k kafkaManager) binDir() string {
return k.basePath() + `/confluent-4.0.0/bin/`
return filepath.Join(k.basePath(), confluentInstallBase, "bin")
}

func (k kafkaManager) confluentBin() string {
return filepath.Join(k.binDir(), "confluent")
}

func (k kafkaManager) serverJAASConfig() string {
return k.configDir() + `server_jaas.conf`
return filepath.Join(k.configDir(), "server_jaas.conf")
}

func (k kafkaManager) install(ctx context.Context) {
Expand All @@ -1000,12 +1070,12 @@ func (k kafkaManager) install(ctx context.Context) {

k.c.Run(ctx, k.nodes, `mkdir -p `+folder)

downloadScriptPath := filepath.Join(folder, "/install.sh")
downloadScriptPath := filepath.Join(folder, "install.sh")
err := k.c.PutString(ctx, confluentDownloadScript, downloadScriptPath, 0700, k.nodes)
if err != nil {
k.c.t.Fatal(err)
}
k.c.Run(ctx, k.nodes, downloadScriptPath, conluentDownloadURL, confluentSHA256, folder)
k.c.Run(ctx, k.nodes, downloadScriptPath, folder)
if !k.c.isLocal() {
k.c.Run(ctx, k.nodes, `mkdir -p logs`)
k.c.Run(ctx, k.nodes, `sudo apt-get -q update 2>&1 > logs/apt-get-update.log`)
Expand Down Expand Up @@ -1033,7 +1103,7 @@ func (k kafkaManager) configureAuth(ctx context.Context) *testCerts {
keystorePath := filepath.Join(configDir, "kafka.keystore.jks")

caKeyPath := filepath.Join(configDir, "ca.key")
caCertPath := filepath.Join(configDir + "ca.crt")
caCertPath := filepath.Join(configDir, "ca.crt")

kafkaKeyPath := filepath.Join(configDir, "kafka.key")
kafkaCertPath := filepath.Join(configDir, "kafka.crt")
Expand Down Expand Up @@ -1114,15 +1184,12 @@ func (k kafkaManager) addSCRAMUsers(ctx context.Context) {
}

func (k kafkaManager) start(ctx context.Context, services ...string) {
folder := k.basePath()
// This isn't necessary for the nightly tests, but it's nice for iteration.
k.c.Run(ctx, k.nodes, `CONFLUENT_CURRENT=`+folder+` `+folder+`/confluent-4.0.0/bin/confluent destroy || true`)
k.c.Run(ctx, k.nodes, k.makeCommand("confluent", "local destroy || true"))
k.restart(ctx, services...)
}

func (k kafkaManager) restart(ctx context.Context, services ...string) {
folder := k.basePath()

var startArgs string
if len(services) == 0 {
startArgs = "schema-registry"
Expand All @@ -1133,18 +1200,26 @@ func (k kafkaManager) restart(ctx context.Context, services ...string) {
k.c.Run(ctx, k.nodes, "touch", k.serverJAASConfig())

startCmd := fmt.Sprintf(
"CONFLUENT_CURRENT=%s KAFKA_OPTS=-Djava.security.auth.login.config=%s %s start %s",
folder,
"CONFLUENT_CURRENT=%s CONFLUENT_HOME=%s KAFKA_OPTS=-Djava.security.auth.login.config=%s %s local services %s start",
k.basePath(),
k.confluentHome(),
k.serverJAASConfig(),
folder+"/confluent-4.0.0/bin/confluent",
k.confluentBin(),
startArgs)
k.c.Run(ctx, k.nodes, startCmd)
}

func (k kafkaManager) makeCommand(exe string, args ...string) string {
cmdPath := filepath.Join(k.binDir(), exe)
return fmt.Sprintf("CONFLUENT_CURRENT=%s CONFLUENT_HOME=%s %s %s",
k.basePath(),
k.confluentHome(),
cmdPath, strings.Join(args, " "))
}

func (k kafkaManager) stop(ctx context.Context) {
folder := k.basePath()
k.c.Run(ctx, k.nodes, fmt.Sprintf("rm -f %s", k.serverJAASConfig()))
k.c.Run(ctx, k.nodes, `CONFLUENT_CURRENT=`+folder+` `+folder+`/confluent-4.0.0/bin/confluent stop`)
k.c.Run(ctx, k.nodes, k.makeCommand("confluent", "local services stop"))
}

func (k kafkaManager) chaosLoop(
Expand Down

0 comments on commit 4ee2449

Please sign in to comment.