Skip to content

Commit

Permalink
ENG-2363: CQL compression
Browse files Browse the repository at this point in the history
Summary: Implemented CQL message compression. Supports LZ4 and Snappy compression in both CQL request and response bodies.

Test Plan: TestMessageCompression.testLZ4() and .testSnappy() Java tests

Reviewers: mihnea, pritam.damania

Reviewed By: pritam.damania

Subscribers: kannan, yql

Differential Revision: https://phabricator.dev.yugabyte.com/D3465
  • Loading branch information
robertpang committed Nov 18, 2017
1 parent 82e9c7c commit 07f1dea
Show file tree
Hide file tree
Showing 12 changed files with 480 additions and 195 deletions.
2 changes: 2 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
<!-- Library dependencies -->
<async.version>1.4.1</async.version>
<cassandra-driver-core.version>3.2.0-yb-8</cassandra-driver-core.version>
<lz4.version>1.3.0</lz4.version>
<snappy.version>1.1.4</snappy.version>
<commons-cli.version>1.2</commons-cli.version>
<commons-io.version>2.4</commons-io.version>
<commons-lang3.version>3.6</commons-lang3.version>
Expand Down
10 changes: 10 additions & 0 deletions java/yb-cql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra-driver-core.version}</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>${lz4.version}</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
Expand Down
22 changes: 22 additions & 0 deletions java/yb-cql/src/test/java/org/yb/cql/TestAuthentication.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.yb.cql;

import com.datastax.driver.core.*;
import com.datastax.driver.core.ProtocolOptions.Compression;

import java.util.Arrays;

Expand Down Expand Up @@ -49,14 +50,35 @@ public void testConnectNoUserPass() throws Exception {
checkConnectivity(false, null, null, true);
}

@Test(timeout = 100000)
public void testConnectWithDefaultUserPassAndCompression() throws Exception {
checkConnectivity(true, "cassandra", "cassandra", Compression.LZ4, false);
checkConnectivity(true, "cassandra", "cassandra", Compression.SNAPPY, false);
checkConnectivity(true, "fakeUser", "fakePass", Compression.LZ4, true);
checkConnectivity(true, "fakeUser", "fakePass", Compression.SNAPPY, true);
checkConnectivity(false, null, null, Compression.LZ4, true);
checkConnectivity(false, null, null, Compression.SNAPPY, true);
}

public void checkConnectivity(
boolean usingAuth, String optUser, String optPass, boolean expectFailure) {
checkConnectivity(usingAuth, optUser, optPass, Compression.NONE, expectFailure);
}

public void checkConnectivity(boolean usingAuth,
String optUser,
String optPass,
Compression compression,
boolean expectFailure) {
// Use superclass definition to not have a default set of credentials.
Cluster.Builder cb = super.getDefaultClusterBuilder();
Cluster c = null;
if (usingAuth) {
cb = cb.withCredentials(optUser, optPass);
}
if (compression != Compression.NONE) {
cb = cb.withCompression(compression);
}
c = cb.build();
try {
Session s = c.connect();
Expand Down
109 changes: 109 additions & 0 deletions java/yb-cql/src/test/java/org/yb/cql/TestMessageCompression.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
package org.yb.cql;

import java.util.*;

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Test;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.ProtocolOptions.Compression;

import static org.junit.Assert.assertEquals;

public class TestMessageCompression extends BaseCQLTest {

@Test
public void testLZ4() throws Exception {

// Test connection with LZ4 compression.
Session s = getDefaultClusterBuilder()
.withCompression(Compression.LZ4)
.build()
.connect(DEFAULT_TEST_KEYSPACE);

final int NUM_KEYS = 10;
final int NUM_RANGE = 10;

// Create table and insert some rows.
s.execute("create table test_lz4 (h int, r int, c text, primary key ((h), r));");
PreparedStatement stmt = s.prepare("insert into test_lz4 (h, r, c) values (?, ?, ?);");
Set<String> expected = new HashSet<String>();
for (int i = 1; i <= NUM_KEYS; i++) {
for (int j = 1; j <= NUM_KEYS; j++) {
String val = "v" + i + j;
s.execute(stmt.bind(Integer.valueOf(i), Integer.valueOf(j), val));
expected.add(String.format("Row[%d, %d, %s]", i, j, val));
}
}

// Select and verify the rows.
Set<String> actual = new HashSet<String>();
for (Row r : s.execute("select * from test_lz4;")) {
actual.add(r.toString());
}
assertEquals(expected, actual);

// Insert large string, select it back and verify.
String string = RandomStringUtils.randomAscii(8 * 1024 * 1024);
LOG.info("string = \"" + string + "\"");
s.execute("insert into test_lz4 (h, r, c) values (100, 100, ?);", string);
Row row = s.execute("select c from test_lz4 where h = 100 and r = 100;").one();
assertEquals(string, row.getString("c"));
}

@Test
public void testSnappy() throws Exception {

// Test connection with Snappy compression.
Session s = getDefaultClusterBuilder()
.withCompression(Compression.SNAPPY)
.build()
.connect(DEFAULT_TEST_KEYSPACE);

final int NUM_KEYS = 10;
final int NUM_RANGE = 10;

// Create table and upsert some rows.
BatchStatement batch = new BatchStatement();
s.execute("create table test_snappy (h int, r int, c text, primary key ((h), r));");
PreparedStatement stmt = s.prepare("update test_snappy set c = ? where h = ? and r = ?;");
Set<String> expected = new HashSet<String>();
for (int i = 1; i <= NUM_KEYS; i++) {
for (int j = 1; j <= NUM_KEYS; j++) {
String val = "v" + i + j;
batch.add(stmt.bind(val, Integer.valueOf(i), Integer.valueOf(j)));
expected.add(String.format("Row[%d, %d, %s]", i, j, val));
}
}
s.execute(batch);

// Select and verify the rows.
Set<String> actual = new HashSet<String>();
for (Row r : s.execute("select * from test_snappy;")) {
actual.add(r.toString());
}
assertEquals(expected, actual);

// Insert large string, select it back and verify.
String string = RandomStringUtils.randomAscii(8 * 1024 * 1024);
LOG.info("string = \"" + string + "\"");
s.execute("insert into test_snappy (h, r, c) values (100, 100, ?);", string);
Row row = s.execute("select c from test_snappy where h = 100 and r = 100;").one();
assertEquals(string, row.getString("c"));
}
}
4 changes: 3 additions & 1 deletion src/yb/cqlserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ target_link_libraries(yb-cql
yb_client
cql_service_proto
server_common
server_process)
server_process
lz4
snappy)

#########################################
# yb-cqlserver
Expand Down
Loading

0 comments on commit 07f1dea

Please sign in to comment.