From d0679af3e839ba7fe8e62cf8dc2cd072fee3e4a9 Mon Sep 17 00:00:00 2001 From: Timur Yusupov Date: Wed, 11 Apr 2018 18:40:14 +0300 Subject: [PATCH] #23: Use cassandra driver's DNS resolution Summary: Instead of resolving nodes to IP addresses inside sample app, used appropariate cassandra driver's client builder. Also restricted to only allow use the same CQL port number for all specified `--nodes` parameters, since different ports are not supported by client builder. Test Plan: - Need https://kubernetes.io/docs/getting-started-guides/minikube for testing. - Build docker image to be used for minikube: ``` minikube start eval $(minikube docker-env) cd ~/code/devops/docker/images/yugabyte docker build -t yugabytedb/yugabyte:test . ``` - Run local k8s cluster using https://github.com/YugaByte/yugabyte-db/blob/master/cloud/kubernetes/README.md. Before running replace `yugabytedb/yugabyte:latest` with `yugabytedb/yugabyte:test` and set `imagePullPolicy: Never` in `yugabyte-statefulset.yaml`. - kubectl exec -it yb-master-0 -- yum -y install java-1.8.0-openjdk-headless.x86_64 - kubectl exec -it yb-master-0 -- java -jar /home/yugabyte/java/yb-sample-apps.jar --num_reads 1000000 --num_threads_read 4 --num_unique_keys 1000000 --num_writes 10000000 --num_threads_write 12 --workload CassandraKeyValue --nodes yb-tservers.default.svc.cluster.local:9042 It should log line similar to and list 3 IPs: ``` 2018-04-10 17:31:53,814 [INFO|com.yugabyte.sample.apps.AppBase|AppBase] Connecting to nodes: yb-tservers.default.svc.cluster.local/172.17.0.7:9042,yb-tservers.default.svc.cluster.local/172.17.0.8:9042,yb-tservers.default.svc.cluster.local/172.17.0.9:9042 ``` - Check the same with `--workload CassandraHelloWorld` Reviewers: mihnea, karthik Reviewed By: karthik Subscribers: yql Differential Revision: https://phabricator.dev.yugabyte.com/D4584 --- .../com/yugabyte/sample/apps/AppBase.java | 24 ++++++++++++------- .../sample/apps/CassandraHelloWorld.java | 6 ++--- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java index ac05b0e720a6..7f90c325b753 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/AppBase.java @@ -24,6 +24,7 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import java.util.zip.Adler32; import java.util.zip.Checksum; @@ -98,7 +99,7 @@ public abstract class AppBase implements MetricsTracker.StatusMessageAppender { */ protected Session getCassandraClient() { if (cassandra_session == null) { - createCassandraClient(getNodesAsInet()); + createCassandraClient(configuration.getContactPoints()); } return cassandra_session; } @@ -128,16 +129,21 @@ protected String getKeyspace() { * Private method that is thread-safe and creates the Cassandra client. Exactly one calling thread * will succeed in creating the client. This method does nothing for the other threads. */ - private static synchronized void createCassandraClient(List nodes) { + protected static synchronized void createCassandraClient(List contactPoints) { if (cassandra_cluster == null) { - { - List ips = new ArrayList<>(nodes.size()); - for (InetSocketAddress contactPoint : nodes) { - ips.add(contactPoint.getAddress().getHostAddress()); + Cluster.Builder builder = Cluster.builder(); + Integer port = null; + for (ContactPoint cp : contactPoints) { + if (port == null) { + port = cp.getPort(); + builder.withPort(port); + } else if (port != cp.getPort()) { + throw new IllegalArgumentException("Using multiple CQL ports is not supported."); } - LOG.info("Connecting to nodes: " + String.join(",", ips)); + builder.addContactPoint(cp.getHost()); } - Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(nodes); + LOG.info("Connecting to nodes: " + builder.getContactPoints().stream() + .map(it -> it.toString()).collect(Collectors.joining(","))); if (appConfig.localDc != null && !appConfig.localDc.isEmpty()) { builder.withLoadBalancingPolicy(new PartitionAwarePolicy( DCAwareRoundRobinPolicy.builder() @@ -465,7 +471,7 @@ public ContactPoint getRandomContactPoint() { * Returns a list of Inet address objects in the proxy tier. This is needed by Cassandra clients. */ public List getNodesAsInet() { - List inetSocketAddresses = new ArrayList(); + List inetSocketAddresses = new ArrayList<>(); for (ContactPoint contactPoint : configuration.getContactPoints()) { try { for (InetAddress addr : InetAddress.getAllByName(contactPoint.getHost())) { diff --git a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraHelloWorld.java b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraHelloWorld.java index 2954b87cedca..a3004b7638c1 100644 --- a/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraHelloWorld.java +++ b/java/yb-loadtester/src/main/java/com/yugabyte/sample/apps/CassandraHelloWorld.java @@ -36,8 +36,7 @@ public class CassandraHelloWorld extends AppBase { public void run() { try { // Create a Cassandra client. - Cluster cluster = Cluster.builder().addContactPointsWithPorts(getNodesAsInet()).build(); - Session session = cluster.connect(); + Session session = getCassandraClient(); // Create the keyspace and use it. String createKeyspaceStatement = @@ -70,8 +69,7 @@ public void run() { LOG.info("Got result: row-count=" + rows.size() + ", name=" + name + ", age=" + age); // Close the client. - session.close(); - cluster.close(); + destroyClients(); } catch (Exception e) { LOG.error("Error running CassandraHelloWorld" + e.getMessage(), e); }