Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Dec 12, 2024
1 parent 38ea3c0 commit d08af89
Showing 1 changed file with 54 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,8 @@ public void maybeFetchRegions(
logger.info("No free regions, skipping loading regions [{}-{}]", i, lastRegion);
break;
} else {
final CacheFileRegion<KeyType> entry = get(cacheKey, blobLength, i);
try {
final CacheFileRegion<KeyType> entry = get(cacheKey, blobLength, i);
entry.incRefEnsureOpen();
regionsToFetch.add(entry);
} catch (AlreadyClosedException e) {
Expand All @@ -649,63 +649,67 @@ public void maybeFetchRegions(
ActionListener.releaseAfter(listener, () -> regionsToFetch.forEach(AbstractRefCounted::decRef))
)
) {
final List<Tuple<CacheFileRegion<KeyType>, RegionGaps>> gaps = new ArrayList<>(regionsToFetch.size());
for (CacheFileRegion<KeyType> toFetch : regionsToFetch) {
int region = toFetch.regionKey.region();
ByteRange regionRange = ByteRange.of(0, computeCacheFileRegionSize(blobLength, region));
if (regionRange.isEmpty() == false) {
List<SparseFileTracker.Gap> regionGaps = toFetch.tracker.waitForRange(
regionRange,
regionRange,
regionsListener.acquire()
);
if (regionGaps.isEmpty() == false) {
gaps.add(new Tuple<>(toFetch, new RegionGaps((long) region * regionSize, regionGaps)));
try {
final List<Tuple<CacheFileRegion<KeyType>, RegionGaps>> gaps = new ArrayList<>(regionsToFetch.size());
for (CacheFileRegion<KeyType> toFetch : regionsToFetch) {
int region = toFetch.regionKey.region();
ByteRange regionRange = ByteRange.of(0, computeCacheFileRegionSize(blobLength, region));
if (regionRange.isEmpty() == false) {
List<SparseFileTracker.Gap> regionGaps = toFetch.tracker.waitForRange(
regionRange,
regionRange,
regionsListener.acquire()
);
if (regionGaps.isEmpty() == false) {
gaps.add(new Tuple<>(toFetch, new RegionGaps((long) region * regionSize, regionGaps)));
}
}
}
}

if (gaps.isEmpty()) {
regionsListener.acquire().onResponse(null);
return;
}
final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps.stream().map(Tuple::v2).toList());
logger.trace(
() -> Strings.format(
"fill regions gaps %s %s shared input stream factory",
gaps,
streamFactory == null ? "without" : "with"
)
);

if (streamFactory == null) {
for (Tuple<CacheFileRegion<KeyType>, RegionGaps> gapsToFetch : gaps) {
for (SparseFileTracker.Gap gap : gapsToFetch.v2().gaps()) {
fetchExecutor.execute(gapsToFetch.v1().fillGapRunnable(gap, writer, null, regionsListener.acquire()));
}
if (gaps.isEmpty()) {
regionsListener.acquire().onResponse(null);
return;
}
} else {
var regionsToRelease = regionsListener.acquire();
try (
var sequentialGapsListener = new RefCountingListener(ActionListener.runBefore(regionsToRelease, streamFactory::close))
) {
ArrayList<Runnable> gapFillingTasks = new ArrayList<>();
final SourceInputStreamFactory streamFactory = writer.sharedInputStreamFactory(gaps.stream().map(Tuple::v2).toList());
logger.trace(
() -> Strings.format(
"fill regions gaps %s %s shared input stream factory",
gaps,
streamFactory == null ? "without" : "with"
)
);

if (streamFactory == null) {
for (Tuple<CacheFileRegion<KeyType>, RegionGaps> gapsToFetch : gaps) {
int offset = (gapsToFetch.v1().regionKey.region() - firstRegion) * regionSize;
gapFillingTasks.addAll(
gapsToFetch.v1().gapFillingTasks(offset, writer, sequentialGapsListener, streamFactory, gapsToFetch.v2().gaps())
);
for (SparseFileTracker.Gap gap : gapsToFetch.v2().gaps()) {
fetchExecutor.execute(gapsToFetch.v1().fillGapRunnable(gap, writer, null, regionsListener.acquire()));
}
}
} else {
var regionsToRelease = regionsListener.acquire();
try (
var sequentialGapsListener = new RefCountingListener(
ActionListener.runBefore(regionsToRelease, streamFactory::close)
)
) {
ArrayList<Runnable> gapFillingTasks = new ArrayList<>();
for (Tuple<CacheFileRegion<KeyType>, RegionGaps> gapsToFetch : gaps) {
int offset = (gapsToFetch.v1().regionKey.region() - firstRegion) * regionSize;
gapFillingTasks.addAll(
gapsToFetch.v1()
.gapFillingTasks(offset, writer, sequentialGapsListener, streamFactory, gapsToFetch.v2().gaps())
);
}
fetchExecutor.execute(() -> {
// Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next
// gap will still be executed.
gapFillingTasks.forEach(Runnable::run);
});
}
fetchExecutor.execute(() -> {
// Fill the gaps in order. If a gap fails to fill for whatever reason, the task for filling the next
// gap will still be executed.
gapFillingTasks.forEach(Runnable::run);
});
}
} catch (Exception e) {
regionsListener.acquire().onFailure(e);
}
} catch (Exception e) {
// TODO: Double call?
listener.onFailure(e);
}
}

Expand Down

0 comments on commit d08af89

Please sign in to comment.