Skip to content

Commit

Permalink
HBASE-27649 WALPlayer does not properly dedupe overridden cell versio…
Browse files Browse the repository at this point in the history
…ns (#5058)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
bbeaudreault authored Feb 25, 2023
1 parent 6b672cc commit 4b7815d
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Similar to CellSerialization, but includes the sequenceId from an ExtendedCell. This is necessary
* so that CellSortReducer can sort by sequenceId, if applicable. Note that these two serializations
* are not compatible -- data serialized by CellSerialization cannot be deserialized with
* ExtendedCellSerialization and vice versa. This is ok for {@link HFileOutputFormat2} because the
* serialization is not actually used for the actual written HFiles, just intermediate data (between
* mapper and reducer of a single job).
*/
@InterfaceAudience.Private
public class ExtendedCellSerialization implements Serialization<ExtendedCell> {
@Override
public boolean accept(Class<?> c) {
return ExtendedCell.class.isAssignableFrom(c);
}

@Override
public ExtendedCellDeserializer getDeserializer(Class<ExtendedCell> t) {
return new ExtendedCellDeserializer();
}

@Override
public ExtendedCellSerializer getSerializer(Class<ExtendedCell> c) {
return new ExtendedCellSerializer();
}

public static class ExtendedCellDeserializer implements Deserializer<ExtendedCell> {
private DataInputStream dis;

@Override
public void close() throws IOException {
this.dis.close();
}

@Override
public KeyValue deserialize(ExtendedCell ignore) throws IOException {
KeyValue kv = KeyValueUtil.create(this.dis);
PrivateCellUtil.setSequenceId(kv, this.dis.readLong());
return kv;
}

@Override
public void open(InputStream is) throws IOException {
this.dis = new DataInputStream(is);
}
}

public static class ExtendedCellSerializer implements Serializer<ExtendedCell> {
private DataOutputStream dos;

@Override
public void close() throws IOException {
this.dos.close();
}

@Override
public void open(OutputStream os) throws IOException {
this.dos = new DataOutputStream(os);
}

@Override
public void serialize(ExtendedCell kv) throws IOException {
dos.writeInt(PrivateCellUtil.estimatedSerializedSizeOf(kv) - Bytes.SIZEOF_INT);
PrivateCellUtil.writeCell(kv, dos, true);
dos.writeLong(kv.getSequenceId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -159,6 +160,15 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
"hbase.mapreduce.use.multi.table.hfileoutputformat";

/**
* ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
* package-private for internal usage for jobs like WALPlayer which need to use features of
* ExtendedCell.
*/
static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;

public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
Expand Down Expand Up @@ -619,9 +629,7 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}

conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
CellSerialization.class.getName());
mergeSerializations(conf);

if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
LOG.info("bulkload locality sensitive enabled");
Expand Down Expand Up @@ -670,6 +678,33 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
}

private static void mergeSerializations(Configuration conf) {
List<String> serializations = new ArrayList<>();

// add any existing values that have been set
String[] existing = conf.getStrings("io.serializations");
if (existing != null) {
Collections.addAll(serializations, existing);
}

serializations.add(MutationSerialization.class.getName());
serializations.add(ResultSerialization.class.getName());

// Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's
// SerializationFactory runs through serializations in the order they are registered.
// We want to register ExtendedCellSerialization before CellSerialization because both
// work for ExtendedCells but only ExtendedCellSerialization handles them properly.
if (
conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT)
) {
serializations.add(ExtendedCellSerialization.class.getName());
}
serializations.add(CellSerialization.class.getName());

conf.setStrings("io.serializations", serializations.toArray(new String[0]));
}

public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor)
throws IOException {
Configuration conf = job.getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
Expand Down Expand Up @@ -105,6 +106,13 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}

// Set sequenceId from WALKey, since it is not included by WALCellCodec. The sequenceId
// on WALKey is the same value that was on the cells in the WALEdit. This enables
// CellSortReducer to use sequenceId to disambiguate duplicate cell timestamps.
// See HBASE-27649
PrivateCellUtil.setSequenceId(cell, key.getSequenceId());

byte[] outKey = multiTableSupport
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
: CellUtil.cloneRow(cell);
Expand Down Expand Up @@ -308,6 +316,11 @@ public Job createSubmittableJob(String[] args) throws IOException {
if (hfileOutPath != null) {
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);

// WALPlayer needs ExtendedCellSerialization so that sequenceId can be propagated when
// sorting cells in CellSortReducer
job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
true);

// the bulk HFile case
List<TableName> tableNames = getTableNameList(tables);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.mapreduce;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -29,6 +33,7 @@
import java.io.File;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -50,8 +55,10 @@
import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.hbase.wal.WAL;
Expand Down Expand Up @@ -131,6 +138,80 @@ public void testPlayingRecoveredEdit() throws Exception {
assertTrue(TEST_UTIL.countRows(tn) > 0);
}

/**
* Tests that when you write multiple cells with the same timestamp they are properly sorted by
* their sequenceId in WALPlayer/CellSortReducer so that the correct one wins when querying from
* the resulting bulkloaded HFiles. See HBASE-27649
*/
@Test
public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName() + "1");
final byte[] family = Bytes.toBytes("family");
final byte[] column1 = Bytes.toBytes("c1");
final byte[] column2 = Bytes.toBytes("c2");
final byte[] row = Bytes.toBytes("row");
Table table = TEST_UTIL.createTable(tableName, family);

long now = EnvironmentEdgeManager.currentTime();
// put a row into the first table
Put p = new Put(row);
p.addColumn(family, column1, now, column1);
p.addColumn(family, column2, now, column2);

table.put(p);

byte[] lastVal = null;

for (int i = 0; i < 50; i++) {
lastVal = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
p = new Put(row);
p.addColumn(family, column1, now, lastVal);

table.put(p);

// wal rolling is necessary to trigger the bug. otherwise no sorting
// needs to occur in the reducer because it's all sorted and coming from a single file.
if (i % 10 == 0) {
WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();
}
}

WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
HConstants.HREGION_LOGDIR_NAME).toString();

Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
String outPath = "/tmp/" + name.getMethodName();
configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY, outPath);
configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);

WALPlayer player = new WALPlayer(configuration);
assertEquals(0, ToolRunner.run(configuration, player,
new String[] { walInputDir, tableName.getNameAsString() }));

Get g = new Get(row);
Result result = table.get(g);
byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal)));

table = TEST_UTIL.truncateTable(tableName);
g = new Get(row);
result = table.get(g);
assertThat(result.listCells(), nullValue());

BulkLoadHFiles.create(configuration).bulkLoad(tableName,
new Path(outPath, tableName.getNamespaceAsString() + "/" + tableName.getNameAsString()));

g = new Get(row);
result = table.get(g);
value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));

assertThat(result.listCells(), notNullValue());
assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal)));
}

/**
* Simple end-to-end test
*/
Expand Down

0 comments on commit 4b7815d

Please sign in to comment.