Skip to content

Commit

Permalink
HBASE-23705 Add CellComparator to HFileContext
Browse files Browse the repository at this point in the history
Codecs don't have access to what CellComparator to use.  Backfill.

M hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
 Adds a new compareRows with default implementation that takes a ByteBuffer.
 Needed by the index in a block encoder implementation.

M hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparatorImpl.java
 Adds implementation for meta of new compareRows method. Adds utility
 method for figuring comparator based off tablename.

M hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/AbstractDataBlockEncoder.java
M hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
M hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java
M hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
 Comparator is in context. Remove redundant handling.

M hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
 Comparator is in context. Remove redundant handling. Clean javadoc.

M hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java
 Clean javadoc.

M hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java
 Cache context so can use it to get comparator to use later.

M hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
 Cache cellcomparator to use. Javdoc on diff between HFileContext and
 HFileInfo.
M hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java
 Add CellComparator

M hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
 Remove comparator caching. Get from context instead.

M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
 Skip a reflection if we can.

M hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileInfo.java
 Javadoc. Removed unused filed.
  • Loading branch information
saintstack committed Jan 18, 2020
1 parent 167892c commit f45287b
Show file tree
Hide file tree
Showing 49 changed files with 236 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase;

import java.nio.ByteBuffer;
import java.util.Comparator;

import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand All @@ -31,11 +33,12 @@
public interface CellComparator extends Comparator<Cell> {
/**
* A comparator for ordering cells in user-space tables. Useful when writing cells in sorted
* order as necessary for bulk import (i.e. via MapReduce)
* order as necessary for bulk import (i.e. via MapReduce).
* <p>
* CAUTION: This comparator may provide inaccurate ordering for cells from system tables,
* and should not be relied upon in that case.
*/
// For internal use, see CellComparatorImpl utility methods.
static CellComparator getInstance() {
return CellComparatorImpl.COMPARATOR;
}
Expand Down Expand Up @@ -80,6 +83,24 @@ static CellComparator getInstance() {
*/
int compareRows(Cell cell, byte[] bytes, int offset, int length);

/**
* @param row ByteBuffer that wraps a row; will read from current position and will reading all
* remaining; will not disturb the ByteBuffer internal state.
* @return greater than 0 if leftCell is bigger, less than 0 if rightCell is bigger, 0 if both
* cells are equal
*/
default int compareRows(ByteBuffer row, Cell cell) {
if (cell instanceof ByteBufferExtendedCell) {
return ByteBufferUtils.compareTo(row, row.position(), row.remaining(),
((ByteBufferExtendedCell) cell).getRowByteBuffer(),
((ByteBufferExtendedCell) cell).getRowPosition(),
cell.getRowLength());
}
return ByteBufferUtils.compareTo(row, row.position(), row.remaining(),
cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
}

/**
* Lexographically compares the two cells excluding the row part. It compares family, qualifier,
* timestamp and the type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase;

import java.nio.ByteBuffer;
import java.util.Comparator;

import org.apache.hadoop.hbase.KeyValue.Type;
Expand Down Expand Up @@ -377,6 +378,27 @@ private static int compareRows(byte[] left, int loffset, int llength, byte[] rig
return result;
}

@Override
public int compareRows(ByteBuffer row, Cell cell) {
byte [] array;
int offset;
int len = row.remaining();
if (row.hasArray()) {
array = row.array();
offset = row.position() + row.arrayOffset();
} else {
// This is awful, we copy the row array if offheap just so we can do a compare.
// We do this elsewhere too when Cell is backed by an offheap ByteBuffer.
// Needs fixing. TODO.
array = new byte[len];
offset = 0;
ByteBufferUtils.copyFromBufferToArray(array, row, row.position(),
0, len);
}
// Reverse result since we swap the order of the params we pass below.
return -compareRows(cell, array, offset, len);
}

@Override
public Comparator getSimpleComparator() {
return this;
Expand All @@ -387,4 +409,23 @@ public Comparator getSimpleComparator() {
public Comparator getSimpleComparator() {
return new BBKVComparator(this);
}

/**
* Utility method that makes a guess at comparator to use based off passed tableName.
* Use in extreme when no comparator specified.
* @return CellComparator to use going off the <code>tableName</code> passed.
*/
public static CellComparator getCellComparator(TableName tableName) {
return getCellComparator(tableName.toBytes());
}

/**
* Utility method that makes a guess at comparator to use based off passed tableName.
* Use in extreme when no comparator specified.
* @return CellComparator to use going off the <code>tableName</code> passed.
*/
public static CellComparator getCellComparator(byte [] tableName) {
return Bytes.equals(tableName, TableName.META_TABLE_NAME.toBytes())?
CellComparatorImpl.META_COMPARATOR: CellComparatorImpl.COMPARATOR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
Expand Down Expand Up @@ -59,14 +58,13 @@ protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) {
}
}

protected abstract static class AbstractEncodedSeeker implements
EncodedSeeker {
/**
* Decorates EncodedSeeker with a {@link HFileBlockDecodingContext}
*/
protected abstract static class AbstractEncodedSeeker implements EncodedSeeker {
protected HFileBlockDecodingContext decodingCtx;
protected final CellComparator comparator;

public AbstractEncodedSeeker(CellComparator comparator,
HFileBlockDecodingContext decodingCtx) {
this.comparator = comparator;
public AbstractEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
this.decodingCtx = decodingCtx;
}

Expand All @@ -77,7 +75,5 @@ protected boolean includesMvcc() {
protected boolean includesTags() {
return this.decodingCtx.getHFileContext().isIncludesTags();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,8 @@ protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
protected STATE current, previous;

public BufferedEncodedSeeker(CellComparator comparator,
HFileBlockDecodingContext decodingCtx) {
super(comparator, decodingCtx);
public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
super(decodingCtx);
if (decodingCtx.getHFileContext().isCompressTags()) {
try {
tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -81,9 +80,8 @@ public String toString() {
}

@Override
public EncodedSeeker createSeeker(CellComparator comparator,
final HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) {
public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<SeekerState>(decodingCtx) {
@Override
protected void decodeNext() {
current.keyLength = currentBuffer.getInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,39 +45,27 @@ public interface DataBlockEncoder {
* Starts encoding for a block of KeyValues. Call
* {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[])} to finish
* encoding of a block.
* @param encodingCtx
* @param out
* @throws IOException
*/
void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException;

/**
* Encodes a KeyValue.
* @param cell
* @param encodingCtx
* @param out
* @return unencoded kv size written
* @throws IOException
*/
int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException;

/**
* Ends encoding for a block of KeyValues. Gives a chance for the encoder to do the finishing
* stuff for the encoded block. It must be called at the end of block encoding.
* @param encodingCtx
* @param out
* @param uncompressedBytesWithHeader
* @throws IOException
*/
void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
byte[] uncompressedBytesWithHeader) throws IOException;

/**
* Decode.
* @param source Compressed stream of KeyValues.
* @param decodingCtx
* @return Uncompressed block of KeyValues.
* @throws IOException If there is an error in source.
*/
Expand All @@ -96,11 +84,9 @@ ByteBuffer decodeKeyValues(DataInputStream source, HFileBlockDecodingContext dec

/**
* Create a HFileBlock seeker which find KeyValues within a block.
* @param comparator what kind of comparison should be used
* @param decodingCtx
* @return A newly created seeker.
*/
EncodedSeeker createSeeker(CellComparator comparator, HFileBlockDecodingContext decodingCtx);
EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx);

/**
* Creates a encoder specific encoding context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
Expand Down Expand Up @@ -382,9 +381,8 @@ protected void copyFromNext(SeekerState that) {
}

@Override
public EncodedSeeker createSeeker(CellComparator comparator,
HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<DiffSeekerState>(decodingCtx) {
private byte[] familyNameWithSize;
private static final int TIMESTAMP_WITH_TYPE_LENGTH =
Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
Expand Down Expand Up @@ -397,9 +396,8 @@ protected void copyFromNext(SeekerState that) {
}

@Override
public EncodedSeeker createSeeker(CellComparator comparator,
final HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<FastDiffSeekerState>(comparator, decodingCtx) {
public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<FastDiffSeekerState>(decodingCtx) {
private void decode(boolean isFirst) {
byte flag = currentBuffer.get();
if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@

/**
* A decoding context that is created by a reader's encoder, and is shared
* across the reader's all read operations.
* across all of the reader's read operations.
*
* @see HFileBlockEncodingContext for encoding
*/
@InterfaceAudience.Private
public interface HFileBlockDecodingContext {

/**
* Perform all actions that need to be done before the encoder's real decoding
* process. Decompression needs to be done if
Expand All @@ -46,7 +45,6 @@ public interface HFileBlockDecodingContext {
* ByteBuffer pointed after the header but before the data
* @param onDiskBlock
* on disk data to be decoded
* @throws IOException
*/
void prepareDecoding(
int onDiskSizeWithoutHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@
*
*/
@InterfaceAudience.Private
public class HFileBlockDefaultDecodingContext implements
HFileBlockDecodingContext {
public class HFileBlockDefaultDecodingContext implements HFileBlockDecodingContext {
private final HFileContext fileContext;
private TagCompressionContext tagCompressionContext;

public HFileBlockDefaultDecodingContext(HFileContext fileContext) {
this.fileContext = fileContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
*
*/
@InterfaceAudience.Private
public class HFileBlockDefaultEncodingContext implements
HFileBlockEncodingContext {
public class HFileBlockDefaultEncodingContext implements HFileBlockEncodingContext {
private BlockType blockType;
private final DataBlockEncoding encodingAlgo;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ public interface HFileBlockEncodingContext {
/**
* Do any action that needs to be performed after the encoding.
* Compression is also included if a non-null compression algorithm is used
*
* @param blockType
* @throws IOException
*/
void postEncoding(BlockType blockType) throws IOException;

Expand All @@ -64,7 +61,6 @@ public interface HFileBlockEncodingContext {

/**
* Sets the encoding state.
* @param state
*/
void setEncodingState(EncodingState state);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
Expand Down Expand Up @@ -195,9 +194,8 @@ public String toString() {
}

@Override
public EncodedSeeker createSeeker(CellComparator comparator,
final HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) {
public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<SeekerState>(decodingCtx) {
@Override
protected void decodeNext() {
current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -115,8 +113,7 @@ public ByteBuffer decodeKeyValues(DataInputStream source,
dup.limit(sourceAsBuffer.position() + onDiskSize);
return dup.slice();
} else {
RowIndexSeekerV1 seeker = new RowIndexSeekerV1(CellComparatorImpl.COMPARATOR,
decodingCtx);
RowIndexSeekerV1 seeker = new RowIndexSeekerV1(decodingCtx);
seeker.setCurrentBuffer(new SingleByteBuff(sourceAsBuffer));
List<Cell> kvs = new ArrayList<>();
kvs.add(seeker.getCell());
Expand Down Expand Up @@ -151,9 +148,7 @@ public Cell getFirstKeyCellInBlock(ByteBuff block) {
}

@Override
public EncodedSeeker createSeeker(CellComparator comparator,
HFileBlockDecodingContext decodingCtx) {
return new RowIndexSeekerV1(comparator, decodingCtx);
public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) {
return new RowIndexSeekerV1(decodingCtx);
}

}
Loading

0 comments on commit f45287b

Please sign in to comment.