Skip to content

Commit

Permalink
HBASE-27539 Encapsulate and centralise access to ref count through St…
Browse files Browse the repository at this point in the history
…oreFileInfo (#4928)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
comnetwork authored Dec 24, 2022
1 parent dcfde79 commit 7ef63b6
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -31,6 +30,7 @@
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -69,13 +69,12 @@ public class HalfStoreFileReader extends StoreFileReader {
* @param fileInfo HFile info
* @param cacheConf CacheConfig
* @param r original reference file (contains top or bottom)
* @param refCount reference count
* @param conf Configuration
*/
public HalfStoreFileReader(final ReaderContext context, final HFileInfo fileInfo,
final CacheConfig cacheConf, final Reference r, AtomicInteger refCount,
final CacheConfig cacheConf, final Reference r, StoreFileInfo storeFileInfo,
final Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, refCount, conf);
super(context, fileInfo, cacheConf, storeFileInfo, conf);
// This is not actual midkey for this half-file; its just border
// around which we split top and bottom. Have to look in files to find
// actual last and first keys for bottom and top halves. Half-files don't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,12 @@ public boolean isCompactedAway() {
}

public int getRefCount() {
return fileInfo.refCount.get();
return fileInfo.getRefCount();
}

/** Returns true if the file is still used in reads */
public boolean isReferencedInReads() {
int rc = fileInfo.refCount.get();
int rc = fileInfo.getRefCount();
assert rc >= 0; // we should not go negative.
return rc > 0;
}
Expand Down Expand Up @@ -647,11 +647,11 @@ Set<String> getCompactedStoreFiles() {
}

long increaseRefCount() {
return this.fileInfo.refCount.incrementAndGet();
return this.fileInfo.increaseRefCount();
}

long decreaseRefCount() {
return this.fileInfo.refCount.decrementAndGet();
return this.fileInfo.decreaseRefCount();
}

static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class StoreFileInfo implements Configurable {
// Counter that is incremented every time a scanner is created on the
// store file. It is decremented when the scan on the store file is
// done.
final AtomicInteger refCount = new AtomicInteger(0);
private final AtomicInteger refCount = new AtomicInteger(0);

/**
* Create a Store File Info
Expand Down Expand Up @@ -275,12 +275,13 @@ public HDFSBlocksDistribution getHDFSBlockDistribution() {
return this.hdfsBlocksDistribution;
}

StoreFileReader createReader(ReaderContext context, CacheConfig cacheConf) throws IOException {
public StoreFileReader createReader(ReaderContext context, CacheConfig cacheConf)
throws IOException {
StoreFileReader reader = null;
if (this.reference != null) {
reader = new HalfStoreFileReader(context, hfileInfo, cacheConf, reference, refCount, conf);
reader = new HalfStoreFileReader(context, hfileInfo, cacheConf, reference, this, conf);
} else {
reader = new StoreFileReader(context, hfileInfo, cacheConf, refCount, conf);
reader = new StoreFileReader(context, hfileInfo, cacheConf, this, conf);
}
return reader;
}
Expand Down Expand Up @@ -681,7 +682,7 @@ boolean isNoReadahead() {
return this.noReadahead;
}

HFileInfo getHFileInfo() {
public HFileInfo getHFileInfo() {
return hfileInfo;
}

Expand Down Expand Up @@ -713,4 +714,16 @@ public void initHFileInfo(ReaderContext context) throws IOException {
this.hfileInfo = new HFileInfo(context, conf);
}

int getRefCount() {
return this.refCount.get();
}

int increaseRefCount() {
return this.refCount.incrementAndGet();
}

int decreaseRefCount() {
return this.refCount.decrementAndGet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
Expand Down Expand Up @@ -78,24 +77,26 @@ public class StoreFileReader {
private int prefixLength = -1;
protected Configuration conf;

// Counter that is incremented every time a scanner is created on the
// store file. It is decremented when the scan on the store file is
// done. All StoreFileReader for the same StoreFile will share this counter.
private final AtomicInteger refCount;
/**
* All {@link StoreFileReader} for the same StoreFile will share the
* {@link StoreFileInfo#refCount}. Counter that is incremented every time a scanner is created on
* the store file. It is decremented when the scan on the store file is done.
*/
private final StoreFileInfo storeFileInfo;
private final ReaderContext context;

private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, ReaderContext context,
private StoreFileReader(HFile.Reader reader, StoreFileInfo storeFileInfo, ReaderContext context,
Configuration conf) {
this.reader = reader;
bloomFilterType = BloomType.NONE;
this.refCount = refCount;
this.storeFileInfo = storeFileInfo;
this.context = context;
this.conf = conf;
}

public StoreFileReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
AtomicInteger refCount, Configuration conf) throws IOException {
this(HFile.createReader(context, fileInfo, cacheConf, conf), refCount, context, conf);
StoreFileInfo storeFileInfo, Configuration conf) throws IOException {
this(HFile.createReader(context, fileInfo, cacheConf, conf), storeFileInfo, context, conf);
}

void copyFields(StoreFileReader storeFileReader) throws IOException {
Expand All @@ -120,7 +121,7 @@ public boolean isPrimaryReplicaReader() {
*/
@InterfaceAudience.Private
StoreFileReader() {
this.refCount = new AtomicInteger(0);
this.storeFileInfo = null;
this.reader = null;
this.context = null;
}
Expand Down Expand Up @@ -151,23 +152,23 @@ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread,
* is opened.
*/
int getRefCount() {
return refCount.get();
return storeFileInfo.getRefCount();
}

/**
* Indicate that the scanner has started reading with this reader. We need to increment the ref
* count so reader is not close until some object is holding the lock
*/
void incrementRefCount() {
refCount.incrementAndGet();
storeFileInfo.increaseRefCount();
}

/**
* Indicate that the scanner has finished reading with this reader. We need to decrement the ref
* count, and also, if this is not the common pread reader, we should close it.
*/
void readCompleted() {
refCount.decrementAndGet();
storeFileInfo.decreaseRefCount();
if (context.getReaderType() == ReaderType.STREAM) {
try {
reader.close(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ HFile.Writer getHFileWriter() {
* @param dir Directory to create file in.
* @return random filename inside passed <code>dir</code>
*/
static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
if (!fs.getFileStatus(dir).isDirectory()) {
throw new IOException("Expecting " + dir.toString() + " to be a directory");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,10 +753,11 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
StoreFileWriter halfWriter = null;
try {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
HFileInfo hfile = new HFileInfo(context, conf);
halfReader =
new HalfStoreFileReader(context, hfile, cacheConf, reference, new AtomicInteger(0), conf);
hfile.initMetaAndIndex(halfReader.getHFileReader());
StoreFileInfo storeFileInfo =
new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference);
storeFileInfo.initHFileInfo(context);
halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader());
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();

int blocksize = familyDescriptor.getBlocksize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -39,10 +38,10 @@
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -118,10 +117,12 @@ public void testHalfScanAndReseek() throws IOException {
private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf)
throws IOException {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
HFileInfo fileInfo = new HFileInfo(context, TEST_UTIL.getConfiguration());
final HalfStoreFileReader halfreader = new HalfStoreFileReader(context, fileInfo, cacheConf,
bottom, new AtomicInteger(0), TEST_UTIL.getConfiguration());
fileInfo.initMetaAndIndex(halfreader.getHFileReader());
StoreFileInfo storeFileInfo =
new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
storeFileInfo.initHFileInfo(context);
final HalfStoreFileReader halfreader =
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);

Expand Down Expand Up @@ -214,10 +215,12 @@ public void testHalfScanner() throws IOException {
private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, Cell seekBefore,
CacheConfig cacheConfig) throws IOException {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
HFileInfo fileInfo = new HFileInfo(context, TEST_UTIL.getConfiguration());
final HalfStoreFileReader halfreader = new HalfStoreFileReader(context, fileInfo, cacheConfig,
bottom, new AtomicInteger(0), TEST_UTIL.getConfiguration());
fileInfo.initMetaAndIndex(halfreader.getHFileReader());
StoreFileInfo storeFileInfo =
new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
storeFileInfo.initHFileInfo(context);
final HalfStoreFileReader halfreader =
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConfig);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);
scanner.seekBefore(seekBefore);
Expand Down
Loading

0 comments on commit 7ef63b6

Please sign in to comment.