diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExtendedCellSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExtendedCellSerialization.java new file mode 100644 index 000000000000..c784b2561881 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExtendedCellSerialization.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Similar to CellSerialization, but includes the sequenceId from an ExtendedCell. This is necessary + * so that CellSortReducer can sort by sequenceId, if applicable. Note that these two serializations + * are not compatible -- data serialized by CellSerialization cannot be deserialized with + * ExtendedCellSerialization and vice versa. This is ok for {@link HFileOutputFormat2} because the + * serialization is not actually used for the actual written HFiles, just intermediate data (between + * mapper and reducer of a single job). + */ +@InterfaceAudience.Private +public class ExtendedCellSerialization implements Serialization { + @Override + public boolean accept(Class c) { + return ExtendedCell.class.isAssignableFrom(c); + } + + @Override + public ExtendedCellDeserializer getDeserializer(Class t) { + return new ExtendedCellDeserializer(); + } + + @Override + public ExtendedCellSerializer getSerializer(Class c) { + return new ExtendedCellSerializer(); + } + + public static class ExtendedCellDeserializer implements Deserializer { + private DataInputStream dis; + + @Override + public void close() throws IOException { + this.dis.close(); + } + + @Override + public KeyValue deserialize(ExtendedCell ignore) throws IOException { + KeyValue kv = KeyValueUtil.create(this.dis); + PrivateCellUtil.setSequenceId(kv, this.dis.readLong()); + return kv; + } + + @Override + public void open(InputStream is) throws IOException { + this.dis = new DataInputStream(is); + } + } + + public static class ExtendedCellSerializer implements Serializer { + private DataOutputStream dos; + + @Override + public void close() throws IOException { + this.dos.close(); + } + + @Override + public void open(OutputStream os) throws IOException { + this.dos = new DataOutputStream(os); + } + + @Override + public void serialize(ExtendedCell kv) throws IOException { + dos.writeInt(PrivateCellUtil.estimatedSerializedSizeOf(kv) - Bytes.SIZEOF_INT); + PrivateCellUtil.writeCell(kv, dos, true); + dos.writeLong(kv.getSequenceId()); + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 23488ad985f2..40c95acfa4ed 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -30,6 +30,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -184,6 +185,15 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = "hbase.mapreduce.use.multi.table.hfileoutputformat"; + /** + * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config + * package-private for internal usage for jobs like WALPlayer which need to use features of + * ExtendedCell. + */ + static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY = + "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; + static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; + public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; @@ -655,9 +665,7 @@ static void configureIncrementalLoad(Job job, List multiTableInfo, LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } - conf.setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - CellSerialization.class.getName()); + mergeSerializations(conf); if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { LOG.info("bulkload locality sensitive enabled"); @@ -705,6 +713,33 @@ static void configureIncrementalLoad(Job job, List multiTableInfo, LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); } + private static void mergeSerializations(Configuration conf) { + List serializations = new ArrayList<>(); + + // add any existing values that have been set + String[] existing = conf.getStrings("io.serializations"); + if (existing != null) { + Collections.addAll(serializations, existing); + } + + serializations.add(MutationSerialization.class.getName()); + serializations.add(ResultSerialization.class.getName()); + + // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's + // SerializationFactory runs through serializations in the order they are registered. + // We want to register ExtendedCellSerialization before CellSerialization because both + // work for ExtendedCells but only ExtendedCellSerialization handles them properly. + if ( + conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, + EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT) + ) { + serializations.add(ExtendedCellSerialization.class.getName()); + } + serializations.add(CellSerialization.class.getName()); + + conf.setStrings("io.serializations", serializations.toArray(new String[0])); + } + public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws IOException { Configuration conf = job.getConfiguration(); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 63cc3e1604f3..5f2848c22e1d 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -148,6 +149,13 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException { if (WALEdit.isMetaEditFamily(cell)) { continue; } + + // Set sequenceId from WALKey, since it is not included by WALCellCodec. The sequenceId + // on WALKey is the same value that was on the cells in the WALEdit. This enables + // CellSortReducer to use sequenceId to disambiguate duplicate cell timestamps. + // See HBASE-27649 + PrivateCellUtil.setSequenceId(cell, key.getSequenceId()); + byte[] outKey = multiTableSupport ? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell)) : CellUtil.cloneRow(cell); @@ -355,6 +363,11 @@ public Job createSubmittableJob(String[] args) throws IOException { throw new IOException("Exactly one table must be specified for the bulk export option"); } + // WALPlayer needs ExtendedCellSerialization so that sequenceId can be propagated when + // sorting cells in CellSortReducer + job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, + true); + // the bulk HFile case List tableNames = getTableNameList(tables); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 52bfd8fbc18d..c6e51eee40ff 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -29,6 +33,7 @@ import java.io.File; import java.io.PrintStream; import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,8 +55,10 @@ import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LauncherSecurityManager; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; @@ -131,6 +138,80 @@ public void testPlayingRecoveredEdit() throws Exception { assertTrue(TEST_UTIL.countRows(tn) > 0); } + /** + * Tests that when you write multiple cells with the same timestamp they are properly sorted by + * their sequenceId in WALPlayer/CellSortReducer so that the correct one wins when querying from + * the resulting bulkloaded HFiles. See HBASE-27649 + */ + @Test + public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + "1"); + final byte[] family = Bytes.toBytes("family"); + final byte[] column1 = Bytes.toBytes("c1"); + final byte[] column2 = Bytes.toBytes("c2"); + final byte[] row = Bytes.toBytes("row"); + Table table = TEST_UTIL.createTable(tableName, family); + + long now = EnvironmentEdgeManager.currentTime(); + // put a row into the first table + Put p = new Put(row); + p.addColumn(family, column1, now, column1); + p.addColumn(family, column2, now, column2); + + table.put(p); + + byte[] lastVal = null; + + for (int i = 0; i < 50; i++) { + lastVal = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); + p = new Put(row); + p.addColumn(family, column1, now, lastVal); + + table.put(p); + + // wal rolling is necessary to trigger the bug. otherwise no sorting + // needs to occur in the reducer because it's all sorted and coming from a single file. + if (i % 10 == 0) { + WAL log = cluster.getRegionServer(0).getWAL(null); + log.rollWriter(); + } + } + + WAL log = cluster.getRegionServer(0).getWAL(null); + log.rollWriter(); + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), + HConstants.HREGION_LOGDIR_NAME).toString(); + + Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); + String outPath = "/tmp/" + name.getMethodName(); + configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY, outPath); + configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); + + WALPlayer player = new WALPlayer(configuration); + assertEquals(0, ToolRunner.run(configuration, player, + new String[] { walInputDir, tableName.getNameAsString() })); + + Get g = new Get(row); + Result result = table.get(g); + byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); + assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); + + table = TEST_UTIL.truncateTable(tableName); + g = new Get(row); + result = table.get(g); + assertThat(result.listCells(), nullValue()); + + BulkLoadHFiles.create(configuration).bulkLoad(tableName, + new Path(outPath, tableName.getNameAsString())); + + g = new Get(row); + result = table.get(g); + value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); + + assertThat(result.listCells(), notNullValue()); + assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); + } + /** * Simple end-to-end test */