diff --git a/pom.xml b/pom.xml index aa84f2d5aa..884b417183 100644 --- a/pom.xml +++ b/pom.xml @@ -67,7 +67,7 @@ org.json json - 20240205 + 20240303 com.google.code.gson diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index 40989c2502..c36eb6625b 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -312,7 +312,7 @@ public static enum Keyword implements Rawable { DELETE, LIBRARYNAME, WITHCODE, DESCRIPTION, GETKEYS, GETKEYSANDFLAGS, DOCS, FILTERBY, DUMP, MODULE, ACLCAT, PATTERN, DOCTOR, LATEST, HISTORY, USAGE, SAMPLES, PURGE, STATS, LOADEX, CONFIG, ARGS, RANK, NOW, VERSION, ADDR, SKIPME, USER, LADDR, - CHANNELS, NUMPAT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB, NOVALUES; + CHANNELS, NUMPAT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB, NOVALUES, MAXAGE; private final byte[] raw; diff --git a/src/main/java/redis/clients/jedis/commands/ClientCommands.java b/src/main/java/redis/clients/jedis/commands/ClientCommands.java index edcfbd602e..75bda24a30 100644 --- a/src/main/java/redis/clients/jedis/commands/ClientCommands.java +++ b/src/main/java/redis/clients/jedis/commands/ClientCommands.java @@ -30,10 +30,10 @@ public interface ClientCommands { String clientKill(String ip, int port); /** - * Close a given client connection. + * Close client connections based on certain selection parameters. * - * @param params Connection info will be closed - * @return Close success return OK + * @param params Parameters defining what client connections to close. + * @return The number of client connections that were closed. */ long clientKill(ClientKillParams params); diff --git a/src/main/java/redis/clients/jedis/params/ClientKillParams.java b/src/main/java/redis/clients/jedis/params/ClientKillParams.java index 12c65be882..0082ef3340 100644 --- a/src/main/java/redis/clients/jedis/params/ClientKillParams.java +++ b/src/main/java/redis/clients/jedis/params/ClientKillParams.java @@ -67,6 +67,16 @@ public ClientKillParams laddr(String ip, int port) { return addParam(Keyword.LADDR, ip + ':' + port); } + /** + * Kill clients older than {@code maxAge} seconds. + * + * @param maxAge Clients older than this number of seconds will be killed. + * @return The {@code ClientKillParams} instance, for call chaining. + */ + public ClientKillParams maxAge(long maxAge) { + return addParam(Keyword.MAXAGE, maxAge); + } + @Override public void addParams(CommandArguments args) { params.forEach(kv -> args.add(kv.getKey()).add(kv.getValue())); diff --git a/src/test/java/redis/clients/jedis/MigratePipeliningTest.java b/src/test/java/redis/clients/jedis/MigratePipeliningTest.java new file mode 100644 index 0000000000..b7942fc140 --- /dev/null +++ b/src/test/java/redis/clients/jedis/MigratePipeliningTest.java @@ -0,0 +1,398 @@ +package redis.clients.jedis; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.commands.jedis.JedisCommandsTestBase; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.jedis.params.MigrateParams; + +public class MigratePipeliningTest extends JedisCommandsTestBase { + + private static final byte[] bfoo = { 0x01, 0x02, 0x03 }; + private static final byte[] bbar = { 0x04, 0x05, 0x06 }; + private static final byte[] bfoo1 = { 0x07, 0x08, 0x01 }; + private static final byte[] bbar1 = { 0x09, 0x00, 0x01 }; + private static final byte[] bfoo2 = { 0x07, 0x08, 0x02 }; + private static final byte[] bbar2 = { 0x09, 0x00, 0x02 }; + private static final byte[] bfoo3 = { 0x07, 0x08, 0x03 }; + private static final byte[] bbar3 = { 0x09, 0x00, 0x03 }; + + private static final String host = hnp.getHost(); + private static final int port = 6386; + private static final int portAuth = hnp.getPort() + 1; + private static final int db = 2; + private static final int dbAuth = 3; + private static final int timeout = Protocol.DEFAULT_TIMEOUT; + + private Jedis dest; + private Jedis destAuth; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + dest = new Jedis(host, port, 500); + dest.flushAll(); + dest.select(db); + + destAuth = new Jedis(host, portAuth, 500); + destAuth.auth("foobared"); + destAuth.flushAll(); + destAuth.select(dbAuth); + } + + @After + @Override + public void tearDown() throws Exception { + dest.close(); + destAuth.close(); + super.tearDown(); + } + + @Test + public void noKey() { + Pipeline p = jedis.pipelined(); + + p.migrate(host, port, "foo", db, timeout); + p.migrate(host, port, bfoo, db, timeout); + p.migrate(host, port, db, timeout, new MigrateParams(), "foo1", "foo2", "foo3"); + p.migrate(host, port, db, timeout, new MigrateParams(), bfoo1, bfoo2, bfoo3); + + assertThat(p.syncAndReturnAll(), + hasItems("NOKEY", "NOKEY", "NOKEY", "NOKEY")); + } + + @Test + public void migrate() { + assertNull(dest.get("foo")); + + Pipeline p = jedis.pipelined(); + + p.set("foo", "bar"); + p.migrate(host, port, "foo", db, timeout); + p.get("foo"); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", null)); + + assertEquals("bar", dest.get("foo")); + } + + @Test + public void migrateBinary() { + assertNull(dest.get(bfoo)); + + Pipeline p = jedis.pipelined(); + + p.set(bfoo, bbar); + p.migrate(host, port, bfoo, db, timeout); + p.get(bfoo); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", null)); + + assertArrayEquals(bbar, dest.get(bfoo)); + } + + @Test + public void migrateEmptyParams() { + assertNull(dest.get("foo")); + + Pipeline p = jedis.pipelined(); + + p.set("foo", "bar"); + p.migrate(host, port, db, timeout, new MigrateParams(), "foo"); + p.get("foo"); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", null)); + + assertEquals("bar", dest.get("foo")); + } + + @Test + public void migrateEmptyParamsBinary() { + assertNull(dest.get(bfoo)); + + Pipeline p = jedis.pipelined(); + + p.set(bfoo, bbar); + p.migrate(host, port, db, timeout, new MigrateParams(), bfoo); + p.get(bfoo); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", null)); + + assertArrayEquals(bbar, dest.get(bfoo)); + } + + @Test + public void migrateCopy() { + assertNull(dest.get("foo")); + + Pipeline p = jedis.pipelined(); + + p.set("foo", "bar"); + p.migrate(host, port, db, timeout, new MigrateParams().copy(), "foo"); + p.get("foo"); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", "bar")); + + assertEquals("bar", dest.get("foo")); + } + + @Test + public void migrateCopyBinary() { + assertNull(dest.get(bfoo)); + + Pipeline p = jedis.pipelined(); + + p.set(bfoo, bbar); + p.migrate(host, port, db, timeout, new MigrateParams().copy(), bfoo); + p.get(bfoo); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", bbar)); + + assertArrayEquals(bbar, dest.get(bfoo)); + } + + @Test + public void migrateReplace() { + dest.set("foo", "bar2"); + + assertEquals("bar2", dest.get("foo")); + + Pipeline p = jedis.pipelined(); + + p.set("foo", "bar1"); + p.migrate(host, port, db, timeout, new MigrateParams().replace(), "foo"); + p.get("foo"); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", null)); + + assertEquals("bar1", dest.get("foo")); + } + + @Test + public void migrateReplaceBinary() { + dest.set(bfoo, bbar2); + + assertArrayEquals(bbar2, dest.get(bfoo)); + + Pipeline p = jedis.pipelined(); + + p.set(bfoo, bbar1); + p.migrate(host, port, db, timeout, new MigrateParams().replace(), bfoo); + p.get(bfoo); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", null)); + + assertArrayEquals(bbar1, dest.get(bfoo)); + } + + @Test + public void migrateCopyReplace() { + dest.set("foo", "bar2"); + + assertEquals("bar2", dest.get("foo")); + + Pipeline p = jedis.pipelined(); + + p.set("foo", "bar1"); + p.migrate(host, port, db, timeout, new MigrateParams().copy().replace(), "foo"); + p.get("foo"); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", "bar1")); + + assertEquals("bar1", dest.get("foo")); + } + + @Test + public void migrateCopyReplaceBinary() { + dest.set(bfoo, bbar2); + + assertArrayEquals(bbar2, dest.get(bfoo)); + + Pipeline p = jedis.pipelined(); + + p.set(bfoo, bbar1); + p.migrate(host, port, db, timeout, new MigrateParams().copy().replace(), bfoo); + p.get(bfoo); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", bbar1)); + + assertArrayEquals(bbar1, dest.get(bfoo)); + } + + @Test + public void migrateAuth() { + assertNull(dest.get("foo")); + + Pipeline p = jedis.pipelined(); + + p.set("foo", "bar"); + p.migrate(host, portAuth, dbAuth, timeout, new MigrateParams().auth("foobared"), "foo"); + p.get("foo"); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", null)); + + assertEquals("bar", destAuth.get("foo")); + } + + @Test + public void migrateAuthBinary() { + assertNull(dest.get(bfoo)); + + Pipeline p = jedis.pipelined(); + + p.set(bfoo, bbar); + p.migrate(host, portAuth, dbAuth, timeout, new MigrateParams().auth("foobared"), bfoo); + p.get(bfoo); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", null)); + + assertArrayEquals(bbar, destAuth.get(bfoo)); + } + + @Test + public void migrateAuth2() { + assertNull(jedis.get("foo")); + + Pipeline p = destAuth.pipelined(); + + p.set("foo", "bar"); + p.migrate(host, hnp.getPort(), 0, timeout, + new MigrateParams().auth2("acljedis", "fizzbuzz"), "foo"); + p.get("foo"); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", null)); + + assertEquals("bar", jedis.get("foo")); + } + + @Test + public void migrateAuth2Binary() { + assertNull(jedis.get(bfoo)); + + Pipeline p = dest.pipelined(); + + p.set(bfoo, bbar); + p.migrate(host, hnp.getPort(), 0, timeout, + new MigrateParams().auth2("acljedis", "fizzbuzz"), bfoo); + p.get(bfoo); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK", null)); + + assertArrayEquals(bbar, jedis.get(bfoo)); + } + + @Test + public void migrateMulti() { + assertNull(dest.get("foo1")); + assertNull(dest.get("foo2")); + assertNull(dest.get("foo3")); + + Pipeline p = jedis.pipelined(); + + p.mset("foo1", "bar1", "foo2", "bar2", "foo3", "bar3"); + p.migrate(host, port, db, timeout, new MigrateParams(), "foo1", "foo2", "foo3"); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK")); + + assertEquals("bar1", dest.get("foo1")); + assertEquals("bar2", dest.get("foo2")); + assertEquals("bar3", dest.get("foo3")); + } + + @Test + public void migrateMultiBinary() { + assertNull(dest.get(bfoo1)); + assertNull(dest.get(bfoo2)); + assertNull(dest.get(bfoo3)); + + Pipeline p = jedis.pipelined(); + + p.mset(bfoo1, bbar1, bfoo2, bbar2, bfoo3, bbar3); + p.migrate(host, port, db, timeout, new MigrateParams(), bfoo1, bfoo2, bfoo3); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "OK")); + + assertArrayEquals(bbar1, dest.get(bfoo1)); + assertArrayEquals(bbar2, dest.get(bfoo2)); + assertArrayEquals(bbar3, dest.get(bfoo3)); + } + + @Test + public void migrateConflict() { + dest.set("foo2", "bar"); + + assertNull(dest.get("foo1")); + assertEquals("bar", dest.get("foo2")); + assertNull(dest.get("foo3")); + + Pipeline p = jedis.pipelined(); + + p.mset("foo1", "bar1", "foo2", "bar2", "foo3", "bar3"); + p.migrate(host, port, db, timeout, new MigrateParams(), "foo1", "foo2", "foo3"); + + assertThat(p.syncAndReturnAll(), + hasItems( + equalTo("OK"), + both(instanceOf(JedisDataException.class)).and(hasToString(containsString("BUSYKEY"))) + )); + + assertEquals("bar1", dest.get("foo1")); + assertEquals("bar", dest.get("foo2")); + assertEquals("bar3", dest.get("foo3")); + } + + @Test + public void migrateConflictBinary() { + dest.set(bfoo2, bbar); + + assertNull(dest.get(bfoo1)); + assertArrayEquals(bbar, dest.get(bfoo2)); + assertNull(dest.get(bfoo3)); + + Pipeline p = jedis.pipelined(); + + p.mset(bfoo1, bbar1, bfoo2, bbar2, bfoo3, bbar3); + p.migrate(host, port, db, timeout, new MigrateParams(), bfoo1, bfoo2, bfoo3); + + assertThat(p.syncAndReturnAll(), + hasItems( + equalTo("OK"), + both(instanceOf(JedisDataException.class)).and(hasToString(containsString("BUSYKEY"))) + )); + + assertArrayEquals(bbar1, dest.get(bfoo1)); + assertArrayEquals(bbar, dest.get(bfoo2)); + assertArrayEquals(bbar3, dest.get(bfoo3)); + } + +} diff --git a/src/test/java/redis/clients/jedis/PipeliningTest.java b/src/test/java/redis/clients/jedis/PipeliningTest.java index bb9834c8d6..527b9dfc6d 100644 --- a/src/test/java/redis/clients/jedis/PipeliningTest.java +++ b/src/test/java/redis/clients/jedis/PipeliningTest.java @@ -1,5 +1,10 @@ package redis.clients.jedis; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.matchesPattern; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -16,18 +21,23 @@ import java.util.Set; import java.util.UUID; -import org.hamcrest.MatcherAssert; +import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.Test; - -import redis.clients.jedis.exceptions.JedisDataException; -import redis.clients.jedis.resps.Tuple; +import redis.clients.jedis.commands.ProtocolCommand; import redis.clients.jedis.commands.jedis.JedisCommandsTestBase; +import redis.clients.jedis.exceptions.JedisDataException; import redis.clients.jedis.params.SetParams; +import redis.clients.jedis.resps.Tuple; import redis.clients.jedis.util.SafeEncoder; public class PipeliningTest extends JedisCommandsTestBase { + private static final byte[] bfoo = { 0x01, 0x02, 0x03, 0x04 }; + private static final byte[] bfoo1 = { 0x01, 0x02, 0x03, 0x04, 0x11, 0x12, 0x13, 0x14 }; + private static final byte[] bbar = { 0x05, 0x06, 0x07, 0x08 }; + private static final byte[] bbaz = { 0x09, 0x0A, 0x0B, 0x0C }; + @Test public void pipeline() { Pipeline p = jedis.pipelined(); @@ -271,150 +281,6 @@ public void piplineWithError() { } assertEquals(r.get(), "bar"); } -// -// @Test -// public void multi() { -// Pipeline p = jedis.pipelined(); -// p.multi(); -// Response r1 = p.hincrBy("a", "f1", -1); -// Response r2 = p.hincrBy("a", "f1", -2); -// Response> r3 = p.exec(); -// List result = p.syncAndReturnAll(); -// -// assertEquals(Long.valueOf(-1), r1.get()); -// assertEquals(Long.valueOf(-3), r2.get()); -// -// assertEquals(4, result.size()); -// -// assertEquals("OK", result.get(0)); -// assertEquals("QUEUED", result.get(1)); -// assertEquals("QUEUED", result.get(2)); -// -// // 4th result is a list with the results from the multi -// @SuppressWarnings("unchecked") -// List multiResult = (List) result.get(3); -// assertEquals(Long.valueOf(-1), multiResult.get(0)); -// assertEquals(Long.valueOf(-3), multiResult.get(1)); -// -// assertEquals(Long.valueOf(-1), r3.get().get(0)); -// assertEquals(Long.valueOf(-3), r3.get().get(1)); -// -// } -// -// @Test -// public void multiWithMassiveRequests() { -// Pipeline p = jedis.pipelined(); -// p.multi(); -// -// List> responseList = new ArrayList>(); -// for (int i = 0; i < 100000; i++) { -// // any operation should be ok, but shouldn't forget about timeout -// responseList.add(p.setbit("test", 1, true)); -// } -// -// Response> exec = p.exec(); -// p.sync(); -// -// // we don't need to check return value -// // if below codes run without throwing Exception, we're ok -// exec.get(); -// -// for (Response resp : responseList) { -// resp.get(); -// } -// } -// -// @Test -// public void multiWithSync() { -// jedis.set("foo", "314"); -// jedis.set("bar", "foo"); -// jedis.set("hello", "world"); -// Pipeline p = jedis.pipelined(); -// Response r1 = p.get("bar"); -// p.multi(); -// Response r2 = p.get("foo"); -// p.exec(); -// Response r3 = p.get("hello"); -// p.sync(); -// -// // before multi -// assertEquals("foo", r1.get()); -// // It should be readable whether exec's response was built or not -// assertEquals("314", r2.get()); -// // after multi -// assertEquals("world", r3.get()); -// } -// -// @Test -// public void multiWatch() { -// final String key = "foo"; -// assertEquals(5L, jedis.incrBy(key, 5L)); -// -// List expect = new ArrayList<>(); -// List expMulti = null; // MULTI will fail -// -// Pipeline pipe = jedis.pipelined(); -// pipe.watch(key); expect.add("OK"); -// pipe.incrBy(key, 3L); expect.add(8L); -// pipe.multi(); expect.add("OK"); -// pipe.incrBy(key, 6L); expect.add("QUEUED"); -// assertEquals(expect, pipe.syncAndReturnAll()); expect.clear(); -// -// try (Jedis tweak = createJedis()) { -// assertEquals(10L, tweak.incrBy(key, 2L)); -// } -// -// pipe.incrBy(key, 4L); expect.add("QUEUED"); -// pipe.exec(); expect.add(expMulti); // failed MULTI -// pipe.incrBy(key, 7L); expect.add(17L); -// assertEquals(expect, pipe.syncAndReturnAll()); -// } -// -// @Test -// public void multiUnwatch() { -// final String key = "foo"; -// assertEquals(5L, jedis.incrBy(key, 5L)); -// -// List expect = new ArrayList<>(); -// List expMulti = new ArrayList<>(); -// -// Pipeline pipe = jedis.pipelined(); -// pipe.watch(key); expect.add("OK"); -// pipe.incrBy(key, 3L); expect.add(8L); -// pipe.unwatch(); expect.add("OK"); -// pipe.multi(); expect.add("OK"); -// pipe.incrBy(key, 6L); expect.add("QUEUED"); expMulti.add(16L); -// assertEquals(expect, pipe.syncAndReturnAll()); expect.clear(); -// -// try (Jedis tweak = createJedis()) { -// assertEquals(10L, tweak.incrBy(key, 2L)); -// } -// -// pipe.incrBy(key, 4L); expect.add("QUEUED"); expMulti.add(20L); -// pipe.exec(); expect.add(expMulti); // successful MULTI -// pipe.incrBy(key, 7L); expect.add(27L); -// assertEquals(expect, pipe.syncAndReturnAll()); -// } -// -// @Test(expected = IllegalStateException.class) -// public void pipelineExecWhenNotInMulti() { -// Pipeline pipeline = jedis.pipelined(); -// pipeline.exec(); -// } -// -// @Test(expected = IllegalStateException.class) -// public void pipelineDiscardWhenNotInMulti() { -// Pipeline pipeline = jedis.pipelined(); -// pipeline.discard(); -// } -// -// @Test(expected = IllegalStateException.class) -// public void pipelineMultiWhenAlreadyInMulti() { -// Pipeline pipeline = jedis.pipelined(); -// pipeline.multi(); -// pipeline.set("foo", "3"); -// pipeline.multi(); -// } @Test(expected = IllegalStateException.class) public void testJedisThrowExceptionWhenInPipeline() { @@ -441,18 +307,6 @@ public void testResetStateWhenInPipeline() { String result = jedis.get("foo"); assertEquals(result, "3"); } -// -// @Test -// public void testDiscardInPipeline() { -// Pipeline pipeline = jedis.pipelined(); -// pipeline.multi(); -// pipeline.set("foo", "bar"); -// Response discard = pipeline.discard(); -// Response get = pipeline.get("foo"); -// pipeline.sync(); -// discard.get(); -// get.get(); -// } @Test public void waitReplicas() { @@ -581,8 +435,8 @@ public void testEvalNestedLists() { p.sync(); List results = (List) result.get(); - MatcherAssert.assertThat((List) results.get(0), Matchers.hasItem("key1")); - MatcherAssert.assertThat((List) results.get(1), Matchers.hasItem(2L)); + assertThat((List) results.get(0), Matchers.hasItem("key1")); + assertThat((List) results.get(1), Matchers.hasItem(2L)); } @Test @@ -595,8 +449,8 @@ public void testEvalNestedListsWithBinary() { p.sync(); List results = (List) result.get(); - MatcherAssert.assertThat((List) results.get(0), Matchers.hasItem(bKey)); - MatcherAssert.assertThat((List) results.get(1), Matchers.hasItem(2L)); + assertThat((List) results.get(0), Matchers.hasItem(bKey)); + assertThat((List) results.get(1), Matchers.hasItem(2L)); } @Test @@ -657,72 +511,6 @@ public void testEvalshaKeyAndArgWithBinary() { assertNull(result1.get()); assertArrayEquals(SafeEncoder.encode("13"), result2.get()); } -// -// @Test -// public void testPipelinedTransactionResponse() { -// -// String key1 = "key1"; -// String val1 = "val1"; -// -// String key2 = "key2"; -// String val2 = "val2"; -// -// String key3 = "key3"; -// String field1 = "field1"; -// String field2 = "field2"; -// String field3 = "field3"; -// String field4 = "field4"; -// -// String value1 = "value1"; -// String value2 = "value2"; -// String value3 = "value3"; -// String value4 = "value4"; -// -// Map hashMap = new HashMap(); -// hashMap.put(field1, value1); -// hashMap.put(field2, value2); -// -// String key4 = "key4"; -// Map hashMap1 = new HashMap(); -// hashMap1.put(field3, value3); -// hashMap1.put(field4, value4); -// -// jedis.set(key1, val1); -// jedis.set(key2, val2); -// jedis.hmset(key3, hashMap); -// jedis.hmset(key4, hashMap1); -// -// Pipeline pipeline = jedis.pipelined(); -// pipeline.multi(); -// -// pipeline.get(key1); -// pipeline.hgetAll(key2); -// pipeline.hgetAll(key3); -// pipeline.get(key4); -// -// Response> response = pipeline.exec(); -// pipeline.sync(); -// -// List result = response.get(); -// -// assertEquals(4, result.size()); -// -// assertEquals("val1", result.get(0)); -// -// assertTrue(result.get(1) instanceof JedisDataException); -// -// Map hashMapReceived = (Map) result.get(2); -// Iterator iterator = hashMapReceived.keySet().iterator(); -// String mapKey1 = iterator.next(); -// String mapKey2 = iterator.next(); -// assertFalse(iterator.hasNext()); -// verifyHasBothValues(mapKey1, mapKey2, field1, field2); -// String mapValue1 = hashMapReceived.get(mapKey1); -// String mapValue2 = hashMapReceived.get(mapKey2); -// verifyHasBothValues(mapValue1, mapValue2, value1, value2); -// -// assertTrue(result.get(3) instanceof JedisDataException); -// } @Test public void testSyncWithNoCommandQueued() { @@ -760,111 +548,148 @@ public void testCloseable() throws IOException { retFuture2.get(); jedis2.close(); } -// -// @Test -// public void testCloseableWithMulti() throws IOException { -// // we need to test with fresh instance of Jedis -// Jedis jedis2 = new Jedis(hnp.getHost(), hnp.getPort(), 500); -// jedis2.auth("foobared"); -// -// Pipeline pipeline = jedis2.pipelined(); -// Response retFuture1 = pipeline.set("a", "1"); -// Response retFuture2 = pipeline.set("b", "2"); -// -// pipeline.multi(); -// -// pipeline.set("a", "a"); -// pipeline.set("b", "b"); -// -// pipeline.close(); -// -// try { -// pipeline.exec(); -// fail("close should discard transaction"); -// } catch (IllegalStateException e) { -// assertTrue(e.getMessage().contains("EXEC without MULTI")); -// // pass -// } -// -// // it shouldn't meet any exception -// retFuture1.get(); -// retFuture2.get(); -// jedis2.close(); -// } -// -// @Test -// public void execAbort() { -// final String luaTimeLimitKey = "lua-time-limit"; -// final String luaTimeLimit = jedis.configGet(luaTimeLimitKey).get(1); -// jedis.configSet(luaTimeLimitKey, "10"); -// -// Thread thread = new Thread(() -> { -// try (Jedis blocker = createJedis()) { -// blocker.eval("while true do end"); -// } catch (Exception ex) { -// // swallow any exception -// } -// }); -// -// Pipeline pipe = jedis.pipelined(); -// pipe.incr("foo"); -// pipe.multi(); -// pipe.incr("foo"); -// pipe.sync(); -// -// thread.start(); -// try { -// Thread.sleep(12); // allow Redis to be busy with the script and 'lua-time-limit' to exceed -// } catch (InterruptedException ex) { } -// -// pipe.incr("foo"); -// Response> txResp = pipe.exec(); -// pipe.sync(); -// try { -// txResp.get(); -// } catch (Exception ex) { -// assertSame(AbortedTransactionException.class, ex.getClass()); -// } finally { -// try { -// String status = jedis.scriptKill(); -// // https://github.com/redis/jedis/issues/2656 -// if ("OK".equalsIgnoreCase(status)) { -// scriptKillWait(); -// } else { -// // #2656: Checking if this status is actually 'OK' when error occurs in next command. -// org.apache.logging.log4j.LogManager.getLogger().error( -// String.format("Status if SCRIPT KILL command is \"%s\"", status)); -// } -// } finally { -// jedis.configSet(luaTimeLimitKey, luaTimeLimit); -// } -// } -// } -// -// private void scriptKillWait() { -// int attemptLeft = 10; -// while (attemptLeft > 0) { -// try (Jedis pingJedis = createJedis()) { -// while (attemptLeft > 0) { -// try { -// pingJedis.ping(); -// return; // wait is over -// } catch (JedisBusyException busy) { -// Thread.sleep(10); // BUSY, waiting for some time -// --attemptLeft; // doing this later; otherwise any exception in Thread.sleep() -// // would cause decrement twice for the same turn. -// } -// } -// } catch (Exception any) { -// --attemptLeft; -// // try new connection -// } -// } -// } -// -// private void verifyHasBothValues(String firstKey, String secondKey, String value1, String value2) { -// assertFalse(firstKey.equals(secondKey)); -// assertTrue(firstKey.equals(value1) || firstKey.equals(value2)); -// assertTrue(secondKey.equals(value1) || secondKey.equals(value2)); -// } + + @Test + public void time() { + Pipeline p = jedis.pipelined(); + + p.time(); + + // we get back one result, with two components: the seconds, and the microseconds, but encoded as strings + Matcher timeResponseMatcher = hasItems(matchesPattern("\\d+"), matchesPattern("\\d+")); + assertThat(p.syncAndReturnAll(), + hasItems(timeResponseMatcher)); + } + + @Test + public void dbSize() { + Pipeline p = jedis.pipelined(); + + p.dbSize(); + p.set("foo", "bar"); + p.dbSize(); + + assertThat(p.syncAndReturnAll(), + hasItems(0L, "OK", 1L)); + } + + @Test + public void move() { + Pipeline p = jedis.pipelined(); + + p.move("foo", 1); + p.set("foo", "bar"); + p.move("foo", 1); + p.get("foo"); + p.select(1); + p.get("foo"); + + assertThat(p.syncAndReturnAll(), + hasItems(0L, "OK", 1L, null, "OK", "bar")); + } + + @Test + public void moveBinary() { + Pipeline p = jedis.pipelined(); + + p.move(bfoo, 1); + p.set(bfoo, bbar); + p.move(bfoo, 1); + p.get(bfoo); + p.select(1); + p.get(bfoo); + + assertThat(p.syncAndReturnAll(), + hasItems(0L, "OK", 1L, null, "OK", bbar)); + } + + @Test + public void swapDb() { + Pipeline p = jedis.pipelined(); + + p.set("foo", "bar"); + p.get("foo"); + p.select(1); + p.get("foo"); + p.swapDB(0, 1); + p.select(0); + p.get("foo"); + p.select(1); + p.get("foo"); + + assertThat(p.syncAndReturnAll(), + hasItems("OK", "bar", "OK", null, "OK", "OK", null, "OK", "bar")); + } + + @Test + public void copyToAnotherDb() { + Pipeline p = jedis.pipelined(); + + p.copy("foo", "foo-copy", 1, false); + p.set("foo", "bar"); + p.copy("foo", "foo-copy", 1, false); + p.get("foo"); + p.select(1); + p.get("foo-copy"); + p.select(0); + p.set("foo", "baz"); + p.copy("foo", "foo-copy", 1, false); + p.get("foo"); + p.select(1); + p.get("foo-copy"); + + assertThat(p.syncAndReturnAll(), + hasItems(false, "OK", true, "bar", "OK", "bar", "OK", "OK", false, "baz", "bar")); + } + + @Test + public void copyToAnotherDbBinary() { + Pipeline p = jedis.pipelined(); + + + p.copy(bfoo, bfoo1, 1, false); + p.set(bfoo, bbar); + p.copy(bfoo, bfoo1, 1, false); + p.get(bfoo); + p.select(1); + p.get(bfoo1); + p.select(0); + p.set(bfoo, bbaz); + p.copy(bfoo, bfoo1, 1, false); + p.get(bfoo); + p.select(1); + p.get(bfoo1); + + assertThat(p.syncAndReturnAll(), + hasItems(false, "OK", true, bbar, "OK", bbar, "OK", "OK", false, bbaz, bbar)); + } + + enum Foo implements ProtocolCommand { + FOO; + + @Override + public byte[] getRaw() { + return SafeEncoder.encode(name()); + } + } + + @Test + public void errorInTheMiddle() { + CommandObject invalidCommand = + new CommandObject<>(new CommandObjects().commandArguments(Foo.FOO), BuilderFactory.STRING); + + Pipeline p = jedis.pipelined(); + + p.set("foo", "bar"); + p.appendCommand(invalidCommand); + p.get("foo"); + + assertThat(p.syncAndReturnAll(), + hasItems( + equalTo("OK"), + instanceOf(JedisDataException.class), + equalTo("bar") + )); + } + } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/ClientCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/ClientCommandsTest.java index 8c3f35db49..cf7760dbf4 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/ClientCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/ClientCommandsTest.java @@ -222,10 +222,10 @@ public void killAddrIpPort() { @Test public void killUser() { - Jedis client2 = new Jedis(hnp.getHost(), hnp.getPort(), 500); client.aclSetUser("test_kill", "on", "+acl", ">password1"); - try { + try (Jedis client2 = new Jedis(hnp.getHost(), hnp.getPort(), 500)) { client2.auth("test_kill", "password1"); + assertEquals(1, jedis.clientKill(new ClientKillParams().user("test_kill"))); assertDisconnected(client2); } finally { @@ -233,6 +233,27 @@ public void killUser() { } } + @Test + public void killMaxAge() throws InterruptedException { + long maxAge = 2; + + // sleep twice the maxAge, to be sure + Thread.sleep(maxAge * 2 * 1000); + + try (Jedis client2 = new Jedis(hnp.getHost(), hnp.getPort(), 500)) { + client2.auth("foobared"); + + long killedClients = jedis.clientKill(new ClientKillParams().maxAge(maxAge)); + + // The reality is that some tests leak clients, so we can't assert + // on the exact number of killed clients. + assertTrue(killedClients > 0); + + assertDisconnected(client); + assertConnected(client2); + } + } + @Test public void clientInfo() { String info = client.clientInfo(); @@ -267,6 +288,10 @@ private void assertDisconnected(Jedis j) { } } + private void assertConnected(Jedis j) { + assertEquals("PONG", j.ping()); + } + private String findInClientList() { for (String clientInfo : jedis.clientList().split("\n")) { if (pattern.matcher(clientInfo).find()) { diff --git a/src/test/java/redis/clients/jedis/commands/unified/pipeline/HashesPipelineCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pipeline/HashesPipelineCommandsTest.java new file mode 100644 index 0000000000..e276441e17 --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/pipeline/HashesPipelineCommandsTest.java @@ -0,0 +1,391 @@ +package redis.clients.jedis.commands.unified.pipeline; + +import static redis.clients.jedis.util.AssertUtil.assertPipelineSyncAll; + +import java.util.*; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import redis.clients.jedis.commands.unified.pooled.PooledCommandsTestHelper; + +public class HashesPipelineCommandsTest extends PipelineCommandsTestBase { + + @BeforeClass + public static void prepare() throws InterruptedException { + jedis = PooledCommandsTestHelper.getPooled(); + } + + @AfterClass + public static void cleanUp() { + jedis.close(); + } +// +// @Before +// public void setUp() { +// PooledCommandsTestHelper.clearData(); +// } +// +// @After +// public void tearDown() { +// PooledCommandsTestHelper.clearData(); +// } + + final byte[] bfoo = { 0x01, 0x02, 0x03, 0x04 }; + final byte[] bbar = { 0x05, 0x06, 0x07, 0x08 }; + final byte[] bcar = { 0x09, 0x0A, 0x0B, 0x0C }; + + final byte[] bbar1 = { 0x05, 0x06, 0x07, 0x08, 0x0A }; + final byte[] bbar2 = { 0x05, 0x06, 0x07, 0x08, 0x0B }; + final byte[] bbar3 = { 0x05, 0x06, 0x07, 0x08, 0x0C }; + final byte[] bbarstar = { 0x05, 0x06, 0x07, 0x08, '*' }; + + @Test + public void hset() { + pipe.hset("foo", "bar", "car"); + pipe.hset("foo", "bar", "foo"); + + // Binary + pipe.hset(bfoo, bbar, bcar); + pipe.hset(bfoo, bbar, bfoo); + + assertPipelineSyncAll( + Arrays.asList(1L, 0L, 1L, 0L), + pipe.syncAndReturnAll()); + } + + @Test + public void hget() { + pipe.hset("foo", "bar", "car"); + pipe.hget("bar", "foo"); + pipe.hget("foo", "car"); + pipe.hget("foo", "bar"); + + // Binary + pipe.hset(bfoo, bbar, bcar); + pipe.hget(bbar, bfoo); + pipe.hget(bfoo, bcar); + pipe.hget(bfoo, bbar); + + assertPipelineSyncAll( + Arrays.asList( + 1L, null, null, "car", + 1L, null, null, bcar), + pipe.syncAndReturnAll()); + } + + @Test + public void hsetnx() { + pipe.hsetnx("foo", "bar", "car"); + pipe.hget("foo", "bar"); + + pipe.hsetnx("foo", "bar", "foo"); + pipe.hget("foo", "bar"); + + pipe.hsetnx("foo", "car", "bar"); + pipe.hget("foo", "car"); + + // Binary + pipe.hsetnx(bfoo, bbar, bcar); + pipe.hget(bfoo, bbar); + + pipe.hsetnx(bfoo, bbar, bfoo); + pipe.hget(bfoo, bbar); + + pipe.hsetnx(bfoo, bcar, bbar); + pipe.hget(bfoo, bcar); + + assertPipelineSyncAll( + Arrays.asList( + 1L, "car", 0L, "car", 1L, "bar", + 1L, bcar, 0L, bcar, 1L, bbar), + pipe.syncAndReturnAll()); + } + + @Test + public void hmset() { + Map hash = new HashMap<>(); + hash.put("bar", "car"); + hash.put("car", "bar"); + pipe.hmset("foo", hash); + pipe.hget("foo", "bar"); + pipe.hget("foo", "car"); + + // Binary + Map bhash = new HashMap<>(); + bhash.put(bbar, bcar); + bhash.put(bcar, bbar); + pipe.hmset(bfoo, bhash); + pipe.hget(bfoo, bbar); + pipe.hget(bfoo, bcar); + + assertPipelineSyncAll( + Arrays.asList("OK", "car", "bar", "OK", bcar, bbar), + pipe.syncAndReturnAll()); + } + + @Test + public void hsetVariadic() { + Map hash = new HashMap<>(); + hash.put("bar", "car"); + hash.put("car", "bar"); + pipe.hset("foo", hash); + pipe.hget("foo", "bar"); + pipe.hget("foo", "car"); + + // Binary + Map bhash = new HashMap<>(); + bhash.put(bbar, bcar); + bhash.put(bcar, bbar); + pipe.hset(bfoo, bhash); + pipe.hget(bfoo, bbar); + pipe.hget(bfoo, bcar); + + assertPipelineSyncAll( + Arrays.asList(2L, "car", "bar", 2L, bcar, bbar), + pipe.syncAndReturnAll()); + } + + @Test + public void hmget() { + Map hash = new HashMap<>(); + hash.put("bar", "car"); + hash.put("car", "bar"); + pipe.hmset("foo", hash); + + pipe.hmget("foo", "bar", "car", "foo"); + List expected = new ArrayList<>(); + expected.add("car"); + expected.add("bar"); + expected.add(null); + + // Binary + Map bhash = new HashMap<>(); + bhash.put(bbar, bcar); + bhash.put(bcar, bbar); + pipe.hmset(bfoo, bhash); + + pipe.hmget(bfoo, bbar, bcar, bfoo); + List bexpected = new ArrayList<>(); + bexpected.add(bcar); + bexpected.add(bbar); + bexpected.add(null); + + assertPipelineSyncAll( + Arrays.asList( + "OK", Arrays.asList("car", "bar", null), + "OK", Arrays.asList(bcar, bbar, null)), + pipe.syncAndReturnAll()); + } + + @Test + public void hincrBy() { + pipe.hincrBy("foo", "bar", 1); + pipe.hincrBy("foo", "bar", -1); + pipe.hincrBy("foo", "bar", -10); + + // Binary + pipe.hincrBy(bfoo, bbar, 1); + pipe.hincrBy(bfoo, bbar, -1); + pipe.hincrBy(bfoo, bbar, -10); + + assertPipelineSyncAll( + Arrays.asList(1L, 0L, -10L, 1L, 0L, -10L), + pipe.syncAndReturnAll()); + } + + @Test + public void hincrByFloat() { + pipe.hincrByFloat("foo", "bar", 1.5d); + pipe.hincrByFloat("foo", "bar", -1.5d); + pipe.hincrByFloat("foo", "bar", -10.7d); + + // Binary + pipe.hincrByFloat(bfoo, bbar, 1.5d); + pipe.hincrByFloat(bfoo, bbar, -1.5d); + pipe.hincrByFloat(bfoo, bbar, -10.7d); + + assertPipelineSyncAll( + Arrays.asList(1.5, 0d, -10.7, 1.5, 0d, -10.7), + pipe.syncAndReturnAll()); + } + + @Test + public void hexists() { + Map hash = new HashMap<>(); + hash.put("bar", "car"); + hash.put("car", "bar"); + pipe.hset("foo", hash); + + pipe.hexists("bar", "foo"); + pipe.hexists("foo", "foo"); + pipe.hexists("foo", "bar"); + + // Binary + Map bhash = new HashMap<>(); + bhash.put(bbar, bcar); + bhash.put(bcar, bbar); + pipe.hset(bfoo, bhash); + + pipe.hexists(bbar, bfoo); + pipe.hexists(bfoo, bfoo); + pipe.hexists(bfoo, bbar); + + assertPipelineSyncAll( + Arrays.asList( + 2L, false, false, true, + 2L, false, false, true), + pipe.syncAndReturnAll()); + } + + @Test + public void hdel() { + Map hash = new HashMap<>(); + hash.put("bar", "car"); + hash.put("car", "bar"); + pipe.hset("foo", hash); + + pipe.hdel("bar", "foo"); + pipe.hdel("foo", "foo"); + pipe.hdel("foo", "bar"); + pipe.hget("foo", "bar"); + + // Binary + Map bhash = new HashMap<>(); + bhash.put(bbar, bcar); + bhash.put(bcar, bbar); + pipe.hset(bfoo, bhash); + + pipe.hdel(bbar, bfoo); + pipe.hdel(bfoo, bfoo); + pipe.hdel(bfoo, bbar); + pipe.hget(bfoo, bbar); + + assertPipelineSyncAll( + Arrays.asList( + 2L, 0L, 0L, 1L, null, + 2L, 0L, 0L, 1L, null), + pipe.syncAndReturnAll()); + } + + @Test + public void hlen() { + Map hash = new HashMap<>(); + hash.put("bar", "car"); + hash.put("car", "bar"); + pipe.hset("foo", hash); + + pipe.hlen("bar"); + pipe.hlen("foo"); + + // Binary + Map bhash = new HashMap<>(); + bhash.put(bbar, bcar); + bhash.put(bcar, bbar); + pipe.hset(bfoo, bhash); + + pipe.hlen(bbar); + pipe.hlen(bfoo); + + assertPipelineSyncAll( + Arrays.asList(2L, 0L, 2L, 2L, 0L, 2L), + pipe.syncAndReturnAll()); + } + + @Test + public void hkeys() { + Map hash = new LinkedHashMap<>(); + hash.put("bar", "car"); + hash.put("car", "bar"); + pipe.hset("foo", hash); + + pipe.hkeys("foo"); + Set expected = new LinkedHashSet<>(); + expected.add("bar"); + expected.add("car"); + + // Binary + Map bhash = new LinkedHashMap<>(); + bhash.put(bbar, bcar); + bhash.put(bcar, bbar); + pipe.hset(bfoo, bhash); + + pipe.hkeys(bfoo); + Set bexpected = new LinkedHashSet<>(); + bexpected.add(bbar); + bexpected.add(bcar); + + assertPipelineSyncAll( + Arrays.asList( + 2L, new HashSet<>(Arrays.asList("bar", "car")), + 2L, new HashSet<>(Arrays.asList(bbar, bcar))), + pipe.syncAndReturnAll()); + } + + @Test + public void hvals() { + Map hash = new LinkedHashMap<>(); + hash.put("bar", "car"); + //hash.put("car", "bar"); + pipe.hset("foo", hash); + + pipe.hvals("foo"); + + // Binary + Map bhash = new LinkedHashMap<>(); + bhash.put(bbar, bcar); + //bhash.put(bcar, bbar); + pipe.hset(bfoo, bhash); + + pipe.hvals(bfoo); + + assertPipelineSyncAll( + Arrays.asList( + //2L, Arrays.asList("bar", "car"), + //2L, Arrays.asList(bbar, bcar)), + 1L, Arrays.asList("car"), + 1L, Arrays.asList(bcar)), + pipe.syncAndReturnAll()); + } + + @Test + public void hgetAll() { + Map hash = new HashMap<>(); + hash.put("bar", "car"); + //hash.put("car", "bar"); + pipe.hset("foo", hash); + + pipe.hgetAll("foo"); + + // Binary + Map bhash = new HashMap<>(); + bhash.put(bbar, bcar); + //bhash.put(bcar, bbar); + pipe.hset(bfoo, bhash); + + pipe.hgetAll(bfoo); + +// assertPipelineSyncAll( +// Arrays.asList( +// 1L, hash, +// 1L, bhash), +// pipe.syncAndReturnAll()); + pipe.syncAndReturnAll(); + } + + @Test + public void hstrlen() { + pipe.hstrlen("foo", "key"); + pipe.hset("foo", "key", "value"); + pipe.hstrlen("foo", "key"); + + pipe.hstrlen(bfoo, bbar); + pipe.hset(bfoo, bbar, bcar); + pipe.hstrlen(bfoo, bbar); + + assertPipelineSyncAll( + Arrays.asList(0L, 1L, 5L, 0L, 1L, 4L), + pipe.syncAndReturnAll()); + } +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/pipeline/PipelineCommandsTestBase.java b/src/test/java/redis/clients/jedis/commands/unified/pipeline/PipelineCommandsTestBase.java new file mode 100644 index 0000000000..c162954509 --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/pipeline/PipelineCommandsTestBase.java @@ -0,0 +1,28 @@ +package redis.clients.jedis.commands.unified.pipeline; + +import org.junit.After; +import org.junit.Before; + +import redis.clients.jedis.JedisPooled; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.commands.unified.pooled.PooledCommandsTestHelper; + +public abstract class PipelineCommandsTestBase { + + protected static JedisPooled jedis; + protected Pipeline pipe; + + public PipelineCommandsTestBase() { + } + + @Before + public void setUp() { + PooledCommandsTestHelper.clearData(); + pipe = jedis.pipelined(); + } + + @After + public void tearDown() { + pipe.close(); + } +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledCommandsTestHelper.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledCommandsTestHelper.java index 42a1e20b79..82ecc600c8 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledCommandsTestHelper.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledCommandsTestHelper.java @@ -13,18 +13,18 @@ public class PooledCommandsTestHelper { private static Jedis node; - static JedisPooled getPooled() throws InterruptedException { + public static JedisPooled getPooled() throws InterruptedException { node = new Jedis(nodeInfo); node.auth("foobared"); - node.flushAll(); + //node.flushAll(); //return new JedisPooled(nodeInfo.getHost(), nodeInfo.getPort(), null, "foobared"); return new JedisPooled(nodeInfo, DefaultJedisClientConfig.builder() .protocol(RedisProtocolUtil.getRedisProtocol()).password("foobared").build()); } - static void clearData() { - node.flushDB(); + public static void clearData() { + node.flushAll(); } } diff --git a/src/test/java/redis/clients/jedis/util/AssertUtil.java b/src/test/java/redis/clients/jedis/util/AssertUtil.java index 5d2ca5518d..110be7e48e 100644 --- a/src/test/java/redis/clients/jedis/util/AssertUtil.java +++ b/src/test/java/redis/clients/jedis/util/AssertUtil.java @@ -7,6 +7,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -105,4 +106,47 @@ public static void assertByteArrayCollectionContainsAll(Collection all, } } + public static void assertPipelineSyncAll(List expected, List actual) { + assertEquals(expected.size(), actual.size()); + for (int n = 0; n < expected.size(); n++) { + Object expObj = expected.get(n); + Object actObj = actual.get(n); + if (expObj instanceof List) { + if (!(actObj instanceof List)) { + throw new ComparisonFailure(n + "'th element is not a list", + expObj.getClass().toString(), actObj.getClass().toString()); + } + assertPipelineSyncAll((List) expObj, (List) actObj); + } else if (expObj instanceof List) { + if (!(actObj instanceof List)) { + throw new ComparisonFailure(n + "'th element is not a list", + expObj.getClass().toString(), actObj.getClass().toString()); + } + assertPipelineSyncAll((List) expObj, (List) actObj); + } else if (expObj instanceof Set) { + if (!(actObj instanceof Set)) { + throw new ComparisonFailure(n + "'th element is not a set", + expObj.getClass().toString(), actObj.getClass().toString()); + } + assertPipelineSyncAllSet((Set) expObj, (Set) actObj); + } else if (expObj instanceof byte[]) { + if (!(actObj instanceof byte[])) { + throw new ComparisonFailure(n + "'th element is not byte array", + expObj.getClass().toString(), actObj.getClass().toString()); + } + assertArrayEquals((byte[]) expObj, (byte[]) actObj); + } else { + assertEquals(n + "'th element mismatched", expObj, actObj); + } + } + } + + private static void assertPipelineSyncAllSet(Set expected, Set actual) { + assertEquals(expected.size(), actual.size()); + if (expected.iterator().next() instanceof byte[]) { + assertByteArraySetEquals((Set) expected, (Set) actual); + } else { + assertEquals(expected, actual); + } + } }