Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23705 Add CellComparator to HFileContext #1062

Merged
merged 1 commit into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
*/
package org.apache.hadoop.hbase;

import java.nio.ByteBuffer;
import java.util.Comparator;

import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

Expand All @@ -31,11 +32,12 @@
public interface CellComparator extends Comparator<Cell> {
/**
* A comparator for ordering cells in user-space tables. Useful when writing cells in sorted
* order as necessary for bulk import (i.e. via MapReduce)
* order as necessary for bulk import (i.e. via MapReduce).
* <p>
* CAUTION: This comparator may provide inaccurate ordering for cells from system tables,
* and should not be relied upon in that case.
*/
// For internal use, see CellComparatorImpl utility methods.
static CellComparator getInstance() {
return CellComparatorImpl.COMPARATOR;
}
Expand Down Expand Up @@ -80,6 +82,24 @@ static CellComparator getInstance() {
*/
int compareRows(Cell cell, byte[] bytes, int offset, int length);

/**
* @param row ByteBuffer that wraps a row; will read from current position and will reading all
* remaining; will not disturb the ByteBuffer internal state.
* @return greater than 0 if leftCell is bigger, less than 0 if rightCell is bigger, 0 if both
* cells are equal
*/
default int compareRows(ByteBuffer row, Cell cell) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BB passed here contain only the row bytes? The BB is sliced for row alone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Javadoc tries to make this explicit. Should I add more?

" * @param row ByteBuffer that wraps a row; will read from current position and will reading all
* remaining; will not disturb the ByteBuffer internal state."

if (cell instanceof ByteBufferExtendedCell) {
return ByteBufferUtils.compareTo(row, row.position(), row.remaining(),
((ByteBufferExtendedCell) cell).getRowByteBuffer(),
((ByteBufferExtendedCell) cell).getRowPosition(),
cell.getRowLength());
}
return ByteBufferUtils.compareTo(row, row.position(), row.remaining(),
cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
}

/**
* Lexographically compares the two cells excluding the row part. It compares family, qualifier,
* timestamp and the type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
*/
package org.apache.hadoop.hbase;

import java.nio.ByteBuffer;
import java.util.Comparator;

import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.primitives.Longs;

/**
Expand Down Expand Up @@ -377,6 +376,26 @@ private static int compareRows(byte[] left, int loffset, int llength, byte[] rig
return result;
}

@Override
public int compareRows(ByteBuffer row, Cell cell) {
byte [] array;
int offset;
int len = row.remaining();
if (row.hasArray()) {
array = row.array();
offset = row.position() + row.arrayOffset();
} else {
// We copy the row array if offheap just so we can do a compare. We do this elsewhere too
// in BBUtils when Cell is backed by an offheap ByteBuffer. Needs fixing so no copy. TODO.
array = new byte[len];
offset = 0;
ByteBufferUtils.copyFromBufferToArray(array, row, row.position(),
0, len);
}
// Reverse result since we swap the order of the params we pass below.
return -compareRows(cell, array, offset, len);
}

@Override
public Comparator getSimpleComparator() {
return this;
Expand All @@ -387,4 +406,24 @@ public Comparator getSimpleComparator() {
public Comparator getSimpleComparator() {
return new BBKVComparator(this);
}

/**
* Utility method that makes a guess at comparator to use based off passed tableName.
* Use in extreme when no comparator specified.
* @return CellComparator to use going off the {@code tableName} passed.
*/
public static CellComparator getCellComparator(TableName tableName) {
return getCellComparator(tableName.toBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use TableName.isMetaTableName(TableName) here? Why to have the indirection of toBytes and then compare bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need both. In actual filecontext, it hosts tablename as bytes only -- not as a TableName object. The TableName.toBytes doesn't actually make bytes. TN itself hosts the name in bytes.

Maybe I should be clearer in a comment that no new arrays are being made in this code?

}

/**
* Utility method that makes a guess at comparator to use based off passed tableName.
* Use in extreme when no comparator specified.
* @return CellComparator to use going off the {@code tableName} passed.
*/
public static CellComparator getCellComparator(byte [] tableName) {
// FYI, TableName.toBytes does not create an array; just returns existing array pointer.
return Bytes.equals(tableName, TableName.META_TABLE_NAME.toBytes())?
CellComparatorImpl.META_COMPARATOR: CellComparatorImpl.COMPARATOR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ public String getQualifierAsString() {
return qualifierAsString;
}

/**
* @return A pointer to TableName as String bytes.
*/
public byte[] toBytes() {
return name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
Expand Down Expand Up @@ -59,14 +57,13 @@ protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) {
}
}

protected abstract static class AbstractEncodedSeeker implements
EncodedSeeker {
/**
* Decorates EncodedSeeker with a {@link HFileBlockDecodingContext}
*/
protected abstract static class AbstractEncodedSeeker implements EncodedSeeker {
protected HFileBlockDecodingContext decodingCtx;
protected final CellComparator comparator;

public AbstractEncodedSeeker(CellComparator comparator,
HFileBlockDecodingContext decodingCtx) {
this.comparator = comparator;
public AbstractEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
this.decodingCtx = decodingCtx;
}

Expand All @@ -77,7 +74,5 @@ protected boolean includesMvcc() {
protected boolean includesTags() {
return this.decodingCtx.getHFileContext().isIncludesTags();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,8 @@ protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>
protected final ObjectIntPair<ByteBuffer> tmpPair = new ObjectIntPair<>();
protected STATE current, previous;

public BufferedEncodedSeeker(CellComparator comparator,
HFileBlockDecodingContext decodingCtx) {
super(comparator, decodingCtx);
public BufferedEncodedSeeker(HFileBlockDecodingContext decodingCtx) {
super(decodingCtx);
if (decodingCtx.getHFileContext().isCompressTags()) {
try {
tagCompressionContext = new TagCompressionContext(LRUDictionary.class, Byte.MAX_VALUE);
Expand Down Expand Up @@ -1008,11 +1007,7 @@ protected STATE createSeekerState() {
}

/**
* @param cell
* @param out
* @param encodingCtx
* @return unencoded size added
* @throws IOException
*/
protected final int afterEncodingKeyValue(Cell cell, DataOutputStream out,
HFileBlockDefaultEncodingContext encodingCtx) throws IOException {
Expand Down Expand Up @@ -1102,7 +1097,7 @@ protected static void ensureSpace(ByteBuffer out, int length)
public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx, DataOutputStream out)
throws IOException {
if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
throw new IOException (this.getClass().getName() + " only accepts "
throw new IOException(this.getClass().getName() + " only accepts "
+ HFileBlockDefaultEncodingContext.class.getName() + " as the " +
"encoding context.");
}
Expand Down Expand Up @@ -1154,8 +1149,8 @@ public void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputSt
.getEncodingState();
// Write the unencodedDataSizeWritten (with header size)
Bytes.putInt(uncompressedBytesWithHeader,
HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE, state.unencodedDataSizeWritten
);
HConstants.HFILEBLOCK_HEADER_SIZE + DataBlockEncoding.ID_SIZE,
state.unencodedDataSizeWritten);
postEncoding(encodingCtx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
Expand All @@ -48,7 +46,8 @@ public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx,
+ "encoding context.");
}

HFileBlockDefaultEncodingContext encodingCtx = (HFileBlockDefaultEncodingContext) blkEncodingCtx;
HFileBlockDefaultEncodingContext encodingCtx =
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
encodingCtx.prepareEncoding(out);

NoneEncoder encoder = new NoneEncoder(out, encodingCtx);
Expand Down Expand Up @@ -81,9 +80,8 @@ public String toString() {
}

@Override
public EncodedSeeker createSeeker(CellComparator comparator,
final HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) {
public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<SeekerState>(decodingCtx) {
@Override
protected void decodeNext() {
current.keyLength = currentBuffer.getInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
Expand All @@ -45,39 +44,27 @@ public interface DataBlockEncoder {
* Starts encoding for a block of KeyValues. Call
* {@link #endBlockEncoding(HFileBlockEncodingContext, DataOutputStream, byte[])} to finish
* encoding of a block.
* @param encodingCtx
* @param out
* @throws IOException
*/
void startBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException;

/**
* Encodes a KeyValue.
* @param cell
* @param encodingCtx
* @param out
* @return unencoded kv size written
* @throws IOException
*/
int encode(Cell cell, HFileBlockEncodingContext encodingCtx, DataOutputStream out)
throws IOException;

/**
* Ends encoding for a block of KeyValues. Gives a chance for the encoder to do the finishing
* stuff for the encoded block. It must be called at the end of block encoding.
* @param encodingCtx
* @param out
* @param uncompressedBytesWithHeader
* @throws IOException
*/
void endBlockEncoding(HFileBlockEncodingContext encodingCtx, DataOutputStream out,
byte[] uncompressedBytesWithHeader) throws IOException;

/**
* Decode.
* @param source Compressed stream of KeyValues.
* @param decodingCtx
* @return Uncompressed block of KeyValues.
* @throws IOException If there is an error in source.
*/
Expand All @@ -96,11 +83,9 @@ ByteBuffer decodeKeyValues(DataInputStream source, HFileBlockDecodingContext dec

/**
* Create a HFileBlock seeker which find KeyValues within a block.
* @param comparator what kind of comparison should be used
* @param decodingCtx
* @return A newly created seeker.
*/
EncodedSeeker createSeeker(CellComparator comparator, HFileBlockDecodingContext decodingCtx);
EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx);

/**
* Creates a encoder specific encoding context
Expand Down Expand Up @@ -188,8 +173,6 @@ interface EncodedSeeker {

/**
* Compare the given key against the current key
* @param comparator
* @param key
* @return -1 is the passed key is smaller than the current key, 0 if equal and 1 if greater
*/
public int compareKey(CellComparator comparator, Cell key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
Expand Down Expand Up @@ -382,9 +380,8 @@ protected void copyFromNext(SeekerState that) {
}

@Override
public EncodedSeeker createSeeker(CellComparator comparator,
HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<DiffSeekerState>(comparator, decodingCtx) {
public EncodedSeeker createSeeker(HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<DiffSeekerState>(decodingCtx) {
private byte[] familyNameWithSize;
private static final int TIMESTAMP_WITH_TYPE_LENGTH =
Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
Expand Down Expand Up @@ -397,9 +395,8 @@ protected void copyFromNext(SeekerState that) {
}

@Override
public EncodedSeeker createSeeker(CellComparator comparator,
final HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<FastDiffSeekerState>(comparator, decodingCtx) {
public EncodedSeeker createSeeker(final HFileBlockDecodingContext decodingCtx) {
return new BufferedEncodedSeeker<FastDiffSeekerState>(decodingCtx) {
private void decode(boolean isFirst) {
byte flag = currentBuffer.get();
if ((flag & FLAG_SAME_KEY_LENGTH) == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@
package org.apache.hadoop.hbase.io.encoding;

import java.io.IOException;

import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A decoding context that is created by a reader's encoder, and is shared
* across the reader's all read operations.
* across all of the reader's read operations.
*
* @see HFileBlockEncodingContext for encoding
*/
@InterfaceAudience.Private
public interface HFileBlockDecodingContext {

/**
* Perform all actions that need to be done before the encoder's real decoding
* process. Decompression needs to be done if
Expand All @@ -46,7 +44,6 @@ public interface HFileBlockDecodingContext {
* ByteBuffer pointed after the header but before the data
* @param onDiskBlock
* on disk data to be decoded
* @throws IOException
*/
void prepareDecoding(
int onDiskSizeWithoutHeader,
Expand Down
Loading