Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27539 Encapsulate and centralise access to ref count through StoreFileInfo #4928

Merged
merged 2 commits into from
Dec 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -31,6 +30,7 @@
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -69,13 +69,12 @@ public class HalfStoreFileReader extends StoreFileReader {
* @param fileInfo HFile info
* @param cacheConf CacheConfig
* @param r original reference file (contains top or bottom)
* @param refCount reference count
* @param conf Configuration
*/
public HalfStoreFileReader(final ReaderContext context, final HFileInfo fileInfo,
final CacheConfig cacheConf, final Reference r, AtomicInteger refCount,
final CacheConfig cacheConf, final Reference r, StoreFileInfo storeFileInfo,
final Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, refCount, conf);
super(context, fileInfo, cacheConf, storeFileInfo, conf);
// This is not actual midkey for this half-file; its just border
// around which we split top and bottom. Have to look in files to find
// actual last and first keys for bottom and top halves. Half-files don't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,12 @@ public boolean isCompactedAway() {
}

public int getRefCount() {
return fileInfo.refCount.get();
return fileInfo.getRefCount();
}

/** Returns true if the file is still used in reads */
public boolean isReferencedInReads() {
int rc = fileInfo.refCount.get();
int rc = fileInfo.getRefCount();
assert rc >= 0; // we should not go negative.
return rc > 0;
}
Expand Down Expand Up @@ -647,11 +647,11 @@ Set<String> getCompactedStoreFiles() {
}

long increaseRefCount() {
return this.fileInfo.refCount.incrementAndGet();
return this.fileInfo.increaseRefCount();
}

long decreaseRefCount() {
return this.fileInfo.refCount.decrementAndGet();
return this.fileInfo.decreaseRefCount();
}

static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class StoreFileInfo implements Configurable {
// Counter that is incremented every time a scanner is created on the
// store file. It is decremented when the scan on the store file is
// done.
final AtomicInteger refCount = new AtomicInteger(0);
private final AtomicInteger refCount = new AtomicInteger(0);

/**
* Create a Store File Info
Expand Down Expand Up @@ -275,12 +275,13 @@ public HDFSBlocksDistribution getHDFSBlockDistribution() {
return this.hdfsBlocksDistribution;
}

StoreFileReader createReader(ReaderContext context, CacheConfig cacheConf) throws IOException {
public StoreFileReader createReader(ReaderContext context, CacheConfig cacheConf)
throws IOException {
StoreFileReader reader = null;
if (this.reference != null) {
reader = new HalfStoreFileReader(context, hfileInfo, cacheConf, reference, refCount, conf);
reader = new HalfStoreFileReader(context, hfileInfo, cacheConf, reference, this, conf);
} else {
reader = new StoreFileReader(context, hfileInfo, cacheConf, refCount, conf);
reader = new StoreFileReader(context, hfileInfo, cacheConf, this, conf);
}
return reader;
}
Expand Down Expand Up @@ -681,7 +682,7 @@ boolean isNoReadahead() {
return this.noReadahead;
}

HFileInfo getHFileInfo() {
public HFileInfo getHFileInfo() {
return hfileInfo;
}

Expand Down Expand Up @@ -713,4 +714,16 @@ public void initHFileInfo(ReaderContext context) throws IOException {
this.hfileInfo = new HFileInfo(context, conf);
}

int getRefCount() {
return this.refCount.get();
}

int increaseRefCount() {
return this.refCount.incrementAndGet();
}

int decreaseRefCount() {
return this.refCount.decrementAndGet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
Expand Down Expand Up @@ -78,24 +77,26 @@ public class StoreFileReader {
private int prefixLength = -1;
protected Configuration conf;

// Counter that is incremented every time a scanner is created on the
// store file. It is decremented when the scan on the store file is
// done. All StoreFileReader for the same StoreFile will share this counter.
private final AtomicInteger refCount;
/**
* All {@link StoreFileReader} for the same StoreFile will share the
* {@link StoreFileInfo#refCount}. Counter that is incremented every time a scanner is created on
* the store file. It is decremented when the scan on the store file is done.
*/
private final StoreFileInfo storeFileInfo;
private final ReaderContext context;

private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, ReaderContext context,
private StoreFileReader(HFile.Reader reader, StoreFileInfo storeFileInfo, ReaderContext context,
Configuration conf) {
this.reader = reader;
bloomFilterType = BloomType.NONE;
this.refCount = refCount;
this.storeFileInfo = storeFileInfo;
this.context = context;
this.conf = conf;
}

public StoreFileReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
AtomicInteger refCount, Configuration conf) throws IOException {
this(HFile.createReader(context, fileInfo, cacheConf, conf), refCount, context, conf);
StoreFileInfo storeFileInfo, Configuration conf) throws IOException {
this(HFile.createReader(context, fileInfo, cacheConf, conf), storeFileInfo, context, conf);
}

void copyFields(StoreFileReader storeFileReader) throws IOException {
Expand All @@ -120,7 +121,7 @@ public boolean isPrimaryReplicaReader() {
*/
@InterfaceAudience.Private
StoreFileReader() {
this.refCount = new AtomicInteger(0);
this.storeFileInfo = null;
this.reader = null;
this.context = null;
}
Expand Down Expand Up @@ -151,23 +152,23 @@ public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread,
* is opened.
*/
int getRefCount() {
return refCount.get();
return storeFileInfo.getRefCount();
}

/**
* Indicate that the scanner has started reading with this reader. We need to increment the ref
* count so reader is not close until some object is holding the lock
*/
void incrementRefCount() {
refCount.incrementAndGet();
storeFileInfo.increaseRefCount();
}

/**
* Indicate that the scanner has finished reading with this reader. We need to decrement the ref
* count, and also, if this is not the common pread reader, we should close it.
*/
void readCompleted() {
refCount.decrementAndGet();
storeFileInfo.decreaseRefCount();
if (context.getReaderType() == ReaderType.STREAM) {
try {
reader.close(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ HFile.Writer getHFileWriter() {
* @param dir Directory to create file in.
* @return random filename inside passed <code>dir</code>
*/
static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
if (!fs.getFileStatus(dir).isDirectory()) {
throw new IOException("Expecting " + dir.toString() + " to be a directory");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,10 +753,11 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
StoreFileWriter halfWriter = null;
try {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
HFileInfo hfile = new HFileInfo(context, conf);
halfReader =
new HalfStoreFileReader(context, hfile, cacheConf, reference, new AtomicInteger(0), conf);
hfile.initMetaAndIndex(halfReader.getHFileReader());
StoreFileInfo storeFileInfo =
new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference);
storeFileInfo.initHFileInfo(context);
halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader());
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();

int blocksize = familyDescriptor.getBlocksize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -39,10 +38,10 @@
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -118,10 +117,12 @@ public void testHalfScanAndReseek() throws IOException {
private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf)
throws IOException {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
HFileInfo fileInfo = new HFileInfo(context, TEST_UTIL.getConfiguration());
final HalfStoreFileReader halfreader = new HalfStoreFileReader(context, fileInfo, cacheConf,
bottom, new AtomicInteger(0), TEST_UTIL.getConfiguration());
fileInfo.initMetaAndIndex(halfreader.getHFileReader());
StoreFileInfo storeFileInfo =
new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
storeFileInfo.initHFileInfo(context);
final HalfStoreFileReader halfreader =
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);

Expand Down Expand Up @@ -214,10 +215,12 @@ public void testHalfScanner() throws IOException {
private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, Cell seekBefore,
CacheConfig cacheConfig) throws IOException {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
HFileInfo fileInfo = new HFileInfo(context, TEST_UTIL.getConfiguration());
final HalfStoreFileReader halfreader = new HalfStoreFileReader(context, fileInfo, cacheConfig,
bottom, new AtomicInteger(0), TEST_UTIL.getConfiguration());
fileInfo.initMetaAndIndex(halfreader.getHFileReader());
StoreFileInfo storeFileInfo =
new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom);
storeFileInfo.initHFileInfo(context);
final HalfStoreFileReader halfreader =
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConfig);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);
scanner.seekBefore(seekBefore);
Expand Down
Loading