Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27649 WALPlayer does not properly dedupe overridden cell versions #5047

Merged
merged 8 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Similar to CellSerialization, but includes the sequenceId from an ExtendedCell. This is necessary
* so that CellSortReducer can sort by sequenceId, if applicable. Note that these two serializations
* are not compatible -- data serialized by CellSerialization cannot be deserialized with
* ExtendedCellSerialization and vice versa. This is ok for {@link HFileOutputFormat2} because the
* serialization is not actually used for the actual written HFiles, just intermediate data (between
* mapper and reducer of a single job).
*/
@InterfaceAudience.Private
public class ExtendedCellSerialization implements Serialization<ExtendedCell> {
@Override
public boolean accept(Class<?> c) {
return ExtendedCell.class.isAssignableFrom(c);
}

@Override
public ExtendedCellDeserializer getDeserializer(Class<ExtendedCell> t) {
return new ExtendedCellDeserializer();
}

@Override
public ExtendedCellSerializer getSerializer(Class<ExtendedCell> c) {
return new ExtendedCellSerializer();
}

public static class ExtendedCellDeserializer implements Deserializer<ExtendedCell> {
private DataInputStream dis;

@Override
public void close() throws IOException {
this.dis.close();
}

@Override
public KeyValue deserialize(ExtendedCell ignore) throws IOException {
KeyValue kv = KeyValueUtil.create(this.dis);
PrivateCellUtil.setSequenceId(kv, this.dis.readLong());
return kv;
}

@Override
public void open(InputStream is) throws IOException {
this.dis = new DataInputStream(is);
}
}

public static class ExtendedCellSerializer implements Serializer<ExtendedCell> {
private DataOutputStream dos;

@Override
public void close() throws IOException {
this.dos.close();
}

@Override
public void open(OutputStream os) throws IOException {
this.dos = new DataOutputStream(os);
}

@Override
public void serialize(ExtendedCell kv) throws IOException {
dos.writeInt(PrivateCellUtil.estimatedSerializedSizeOf(kv) - Bytes.SIZEOF_INT);
PrivateCellUtil.writeCell(kv, dos, true);
dos.writeLong(kv.getSequenceId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -159,6 +160,15 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
"hbase.mapreduce.use.multi.table.hfileoutputformat";

/**
* ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
* package-private for internal usage for jobs like WALPlayer which need to use features of
* ExtendedCell.
*/
static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;

public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
Expand Down Expand Up @@ -619,9 +629,7 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}

conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
CellSerialization.class.getName());
mergeSerializations(conf);

if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
LOG.info("bulkload locality sensitive enabled");
Expand Down Expand Up @@ -670,6 +678,33 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
}

private static void mergeSerializations(Configuration conf) {
List<String> serializations = new ArrayList<>();

// add any existing values that have been set
String[] existing = conf.getStrings("io.serializations");
if (existing != null) {
Collections.addAll(serializations, existing);
}

serializations.add(MutationSerialization.class.getName());
serializations.add(ResultSerialization.class.getName());

// Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's
// SerializationFactory runs through serializations in the order they are registered.
// We want to register ExtendedCellSerialization before CellSerialization because both
// work for ExtendedCells but only ExtendedCellSerialization handles them properly.
if (
conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT)
) {
serializations.add(ExtendedCellSerialization.class.getName());
}
serializations.add(CellSerialization.class.getName());

conf.setStrings("io.serializations", serializations.toArray(new String[0]));
}

public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor)
throws IOException {
Configuration conf = job.getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
Expand Down Expand Up @@ -105,6 +106,13 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}

// Set sequenceId from WALKey, since it is not included by WALCellCodec. The sequenceId
// on WALKey is the same value that was on the cells in the WALEdit. This enables
// CellSortReducer to use sequenceId to disambiguate duplicate cell timestamps.
// See HBASE-27649
PrivateCellUtil.setSequenceId(cell, key.getSequenceId());

byte[] outKey = multiTableSupport
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
: CellUtil.cloneRow(cell);
Expand Down Expand Up @@ -308,6 +316,11 @@ public Job createSubmittableJob(String[] args) throws IOException {
if (hfileOutPath != null) {
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);

// WALPlayer needs ExtendedCellSerialization so that sequenceId can be propagated when
// sorting cells in CellSortReducer
job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
true);

// the bulk HFile case
List<TableName> tableNames = getTableNameList(tables);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.File;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -50,6 +51,7 @@
import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
Expand Down Expand Up @@ -131,6 +133,84 @@ public void testPlayingRecoveredEdit() throws Exception {
assertTrue(TEST_UTIL.countRows(tn) > 0);
}

/**
* Tests that when you write multiple cells with the same timestamp they are properly sorted by
* their sequenceId in WALPlayer/CellSortReducer so that the correct one wins when querying from
* the resulting bulkloaded HFiles. See HBASE-27649
*/
@Test
public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName() + "1");
final byte[] family = Bytes.toBytes("family");
final byte[] column1 = Bytes.toBytes("c1");
final byte[] column2 = Bytes.toBytes("c2");
final byte[] row = Bytes.toBytes("row");
Table table = TEST_UTIL.createTable(tableName, family);

long now = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use EnvironmentEdgeManager.currentTime if possible

// put a row into the first table
Put p = new Put(row);
p.addColumn(family, column1, now, column1);
p.addColumn(family, column2, now, column2);

table.put(p);

byte[] lastVal = null;

for (int i = 0; i < 50; i++) {
lastVal = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
p = new Put(row);
p.addColumn(family, column1, now, lastVal);

table.put(p);

// wal rolling is necessary to trigger the bug. otherwise no sorting
// needs to occur in the reducer because it's all sorted and coming from a single file.
if (i % 10 == 0) {
WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();
}
}

WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
HConstants.HREGION_LOGDIR_NAME).toString();

Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
String outPath = "/tmp/" + name.getMethodName();
configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY, outPath);
configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);

WALPlayer player = new WALPlayer(configuration);
assertEquals(0, ToolRunner.run(configuration, player,
new String[] { walInputDir, tableName.getNameAsString() }));

Get g = new Get(row);
Result result = table.get(g);
byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
assertTrue(
"Expected " + Bytes.toStringBinary(column1) + " latest value to be equal to lastVal="
+ Bytes.toStringBinary(lastVal) + " but was " + Bytes.toStringBinary(value),
Bytes.equals(lastVal, value));

table = TEST_UTIL.truncateTable(tableName);
g = new Get(row);
result = table.get(g);
assertTrue("expected row to be empty after truncate but got " + result, result.isEmpty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use assertThat and Matchers.empty?


BulkLoadHFiles.create(configuration).bulkLoad(tableName,
new Path(outPath, tableName.getNamespaceAsString() + "/" + tableName.getNameAsString()));

g = new Get(row);
result = table.get(g);
value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
assertTrue(
"Expected " + Bytes.toStringBinary(column1) + " latest value to be equal to lastVal="
+ Bytes.toStringBinary(lastVal) + " but was " + Bytes.toStringBinary(value),
Bytes.equals(lastVal, value));
}

/**
* Simple end-to-end test
*/
Expand Down