From 91579ed17eddcc48f91c236bef40d8217ac7615f Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Mon, 22 Feb 2021 10:46:16 +0900 Subject: [PATCH] HBASE-25575 Should validate Puts in RowMutations (#2954) Signed-off-by: Duo Zhang --- .../hadoop/hbase/client/ConnectionUtils.java | 10 +++- .../hbase/client/RawAsyncTableImpl.java | 13 +++-- .../hadoop/hbase/client/TestAsyncTable.java | 43 ++++++++++++++++ .../hbase/client/TestAsyncTableBatch.java | 50 +++++++++++++++++++ 4 files changed, 112 insertions(+), 4 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 5b8cb8463225..70312aa4de46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -482,7 +482,7 @@ static CompletableFuture timelineConsistentRead(AsyncRegionLocator locato } // validate for well-formedness - static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException { + static void validatePut(Put put, int maxKeyValueSize) { if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert"); } @@ -497,6 +497,14 @@ static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentExce } } + static void validatePutsInRowMutations(RowMutations rowMutations, int maxKeyValueSize) { + for (Mutation mutation : rowMutations.getMutations()) { + if (mutation instanceof Put) { + validatePut((Put) mutation, maxKeyValueSize); + } + } + } + /** * Select the priority for the rpc call. *

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 3cffad8b44d4..187ecf1f0cbd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePut; +import static org.apache.hadoop.hbase.client.ConnectionUtils.validatePutsInRowMutations; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; @@ -343,6 +344,7 @@ public CompletableFuture thenDelete(Delete delete) { @Override public CompletableFuture thenMutate(RowMutations mutation) { preCheck(); + validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, @@ -403,6 +405,7 @@ public CompletableFuture thenDelete(Delete delete) { @Override public CompletableFuture thenMutate(RowMutations mutation) { + validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); return RawAsyncTableImpl.this. newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, @@ -420,9 +423,6 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) @Override public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { - if (checkAndMutate.getAction() instanceof Put) { - validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); - } if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete || checkAndMutate.getAction() instanceof Increment || checkAndMutate.getAction() instanceof Append) { @@ -442,6 +442,7 @@ public CompletableFuture checkAndMutate(CheckAndMutate che .call(); } else if (checkAndMutate.getAction() instanceof RowMutations) { RowMutations rowMutations = (RowMutations) checkAndMutate.getAction(); + validatePutsInRowMutations(rowMutations, conn.connConf.getMaxKeyValueSize()); return RawAsyncTableImpl.this. newCaller(checkAndMutate.getRow(), rowMutations.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> @@ -514,6 +515,7 @@ public void run(MultiResponse resp) { @Override public CompletableFuture mutateRow(RowMutations mutations) { + validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); return this. newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs).action((controller, loc, stub) -> this. mutateRow(controller, loc, stub, mutations, @@ -615,7 +617,12 @@ private List> batch(List actions, long r CheckAndMutate checkAndMutate = (CheckAndMutate) action; if (checkAndMutate.getAction() instanceof Put) { validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize()); + } else if (checkAndMutate.getAction() instanceof RowMutations) { + validatePutsInRowMutations((RowMutations) checkAndMutate.getAction(), + conn.connConf.getMaxKeyValueSize()); } + } else if (action instanceof RowMutations) { + validatePutsInRowMutations((RowMutations) action, conn.connConf.getMaxKeyValueSize()); } } return conn.callerFactory.batch().table(tableName).actions(actions) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index f76c923c77bf..2952fa37c85c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -1673,4 +1673,47 @@ public void testInvalidPut() { assertThat(e.getMessage(), containsString("KeyValue size too large")); } } + + @Test + public void testInvalidPutInRowMutations() throws IOException { + final byte[] row = Bytes.toBytes(0); + try { + getTable.get().mutateRow(new RowMutations(row).add(new Put(row))); + fail("Should fail since the put does not contain any cells"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("No columns to insert")); + } + + try { + getTable.get() + .mutateRow(new RowMutations(row).add(new Put(row) + .addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]))); + fail("Should fail since the put exceeds the max key value size"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("KeyValue size too large")); + } + } + + @Test + public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException { + final byte[] row = Bytes.toBytes(0); + try { + getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER) + .build(new RowMutations(row).add(new Put(row)))); + fail("Should fail since the put does not contain any cells"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("No columns to insert")); + } + + try { + getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, QUALIFIER) + .build(new RowMutations(row).add(new Put(row) + .addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])))); + fail("Should fail since the put exceeds the max key value size"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("KeyValue size too large")); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java index 4fb050ea287c..78dbf0bac8b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java @@ -335,6 +335,56 @@ public void testInvalidPut() { } } + @Test + public void testInvalidPutInRowMutations() throws IOException { + final byte[] row = Bytes.toBytes(0); + + AsyncTable table = tableGetter.apply(TABLE_NAME); + try { + table.batch(Arrays.asList(new Delete(row), new RowMutations(row).add(new Put(row)))); + fail("Should fail since the put does not contain any cells"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("No columns to insert")); + } + + try { + table.batch( + Arrays.asList(new RowMutations(row).add(new Put(row) + .addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE])), + new Delete(row))); + fail("Should fail since the put exceeds the max key value size"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("KeyValue size too large")); + } + } + + @Test + public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException { + final byte[] row = Bytes.toBytes(0); + + AsyncTable table = tableGetter.apply(TABLE_NAME); + try { + table.batch(Arrays.asList(new Delete(row), CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, CQ) + .build(new RowMutations(row).add(new Put(row))))); + fail("Should fail since the put does not contain any cells"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("No columns to insert")); + } + + try { + table.batch( + Arrays.asList(CheckAndMutate.newBuilder(row) + .ifNotExists(FAMILY, CQ) + .build(new RowMutations(row).add(new Put(row) + .addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]))), + new Delete(row))); + fail("Should fail since the put exceeds the max key value size"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("KeyValue size too large")); + } + } + @Test public void testWithCheckAndMutate() throws Exception { AsyncTable table = tableGetter.apply(TABLE_NAME);