Skip to content

Commit

Permalink
#23: Use cassandra driver's DNS resolution
Browse files Browse the repository at this point in the history
Summary:
Instead of resolving nodes to IP addresses inside sample app, used appropariate
cassandra driver's client builder. Also restricted to only allow use the same CQL port number for
all specified `--nodes` parameters, since different ports are not supported by client builder.

Test Plan:
- Need https://kubernetes.io/docs/getting-started-guides/minikube for testing.
- Build docker image to be used for minikube:
```
minikube start
eval $(minikube docker-env)

cd ~/code/devops/docker/images/yugabyte
docker build -t yugabytedb/yugabyte:test .
```
- Run local k8s cluster using
  https://github.com/YugaByte/yugabyte-db/blob/master/cloud/kubernetes/README.md. Before running
  replace `yugabytedb/yugabyte:latest` with `yugabytedb/yugabyte:test` and set `imagePullPolicy: Never` in
  `yugabyte-statefulset.yaml`.
- kubectl exec -it yb-master-0 -- yum -y install java-1.8.0-openjdk-headless.x86_64
- kubectl exec -it yb-master-0 -- java -jar /home/yugabyte/java/yb-sample-apps.jar --num_reads 1000000 --num_threads_read 4 --num_unique_keys 1000000 --num_writes 10000000 --num_threads_write 12 --workload CassandraKeyValue --nodes yb-tservers.default.svc.cluster.local:9042
It should log line similar to and list 3 IPs:
```
2018-04-10 17:31:53,814 [INFO|com.yugabyte.sample.apps.AppBase|AppBase] Connecting to nodes: yb-tservers.default.svc.cluster.local/172.17.0.7:9042,yb-tservers.default.svc.cluster.local/172.17.0.8:9042,yb-tservers.default.svc.cluster.local/172.17.0.9:9042
```
- Check the same with `--workload CassandraHelloWorld`

Reviewers: mihnea, karthik

Reviewed By: karthik

Subscribers: yql

Differential Revision: https://phabricator.dev.yugabyte.com/D4584
  • Loading branch information
ttyusupov committed Apr 11, 2018
1 parent 5ab7a6e commit d0679af
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.zip.Adler32;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -98,7 +99,7 @@ public abstract class AppBase implements MetricsTracker.StatusMessageAppender {
*/
protected Session getCassandraClient() {
if (cassandra_session == null) {
createCassandraClient(getNodesAsInet());
createCassandraClient(configuration.getContactPoints());
}
return cassandra_session;
}
Expand Down Expand Up @@ -128,16 +129,21 @@ protected String getKeyspace() {
* Private method that is thread-safe and creates the Cassandra client. Exactly one calling thread
* will succeed in creating the client. This method does nothing for the other threads.
*/
private static synchronized void createCassandraClient(List<InetSocketAddress> nodes) {
protected static synchronized void createCassandraClient(List<ContactPoint> contactPoints) {
if (cassandra_cluster == null) {
{
List<String> ips = new ArrayList<>(nodes.size());
for (InetSocketAddress contactPoint : nodes) {
ips.add(contactPoint.getAddress().getHostAddress());
Cluster.Builder builder = Cluster.builder();
Integer port = null;
for (ContactPoint cp : contactPoints) {
if (port == null) {
port = cp.getPort();
builder.withPort(port);
} else if (port != cp.getPort()) {
throw new IllegalArgumentException("Using multiple CQL ports is not supported.");
}
LOG.info("Connecting to nodes: " + String.join(",", ips));
builder.addContactPoint(cp.getHost());
}
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(nodes);
LOG.info("Connecting to nodes: " + builder.getContactPoints().stream()
.map(it -> it.toString()).collect(Collectors.joining(",")));
if (appConfig.localDc != null && !appConfig.localDc.isEmpty()) {
builder.withLoadBalancingPolicy(new PartitionAwarePolicy(
DCAwareRoundRobinPolicy.builder()
Expand Down Expand Up @@ -465,7 +471,7 @@ public ContactPoint getRandomContactPoint() {
* Returns a list of Inet address objects in the proxy tier. This is needed by Cassandra clients.
*/
public List<InetSocketAddress> getNodesAsInet() {
List<InetSocketAddress> inetSocketAddresses = new ArrayList<InetSocketAddress>();
List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();
for (ContactPoint contactPoint : configuration.getContactPoints()) {
try {
for (InetAddress addr : InetAddress.getAllByName(contactPoint.getHost())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public class CassandraHelloWorld extends AppBase {
public void run() {
try {
// Create a Cassandra client.
Cluster cluster = Cluster.builder().addContactPointsWithPorts(getNodesAsInet()).build();
Session session = cluster.connect();
Session session = getCassandraClient();

// Create the keyspace and use it.
String createKeyspaceStatement =
Expand Down Expand Up @@ -70,8 +69,7 @@ public void run() {
LOG.info("Got result: row-count=" + rows.size() + ", name=" + name + ", age=" + age);

// Close the client.
session.close();
cluster.close();
destroyClients();
} catch (Exception e) {
LOG.error("Error running CassandraHelloWorld" + e.getMessage(), e);
}
Expand Down

0 comments on commit d0679af

Please sign in to comment.