-
Notifications
You must be signed in to change notification settings - Fork 15
feature: Expose API to let client define number of expected values in a Get. #6655
Changes from 15 commits
6c2aa5f
99f2125
7e76c60
e3224bd
8d5320d
d3afa8d
37ac526
b557bc7
90477bd
ffaa25c
648e54e
f273357
773e1ad
34c0b25
0a13148
d54908e
3d913ff
797e635
2926f79
e8c08c7
f48fd14
89cb066
bf5b271
2739a48
918ead8
6a1426f
2ab80f9
a42100a
f91d0ba
8f0a05d
e147c14
9d61cc7
b2292ce
82da62a
ad4d984
3227916
806abe0
b8d981f
9e25ffa
11951f1
f1912e1
f11d972
1f68638
cad58d2
b3f7284
86831da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,7 +124,28 @@ public Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells) { | |
return getWithLoader( | ||
tableRef, | ||
cells, | ||
(tableReference, toRead) -> Futures.immediateFuture(super.get(tableReference, toRead))) | ||
(tableReference, _cachedCells, toRead) -> | ||
Futures.immediateFuture(super.get(tableReference, toRead))) | ||
.get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw Throwables.rewrapAndThrowUncheckedException(e.getCause()); | ||
LucasIME marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
@Override | ||
public Map<Cell, byte[]> getWithExpectedNumberOfCells( | ||
TableReference tableRef, Set<Cell> cells, long expectedNumberOfPresentCells) { | ||
try { | ||
return getWithLoader(tableRef, cells, (tableReference, cachedCells, toRead) -> { | ||
long nonEmptyValuesInCache = cachedCells.values().stream() | ||
.filter(value -> value != PtBytes.EMPTY_BYTE_ARRAY) | ||
.count(); | ||
long numberOfCellsExpectingValuePostCache = | ||
expectedNumberOfPresentCells - nonEmptyValuesInCache; | ||
|
||
return Futures.immediateFuture(super.getWithExpectedNumberOfCells( | ||
tableReference, toRead, numberOfCellsExpectingValuePostCache)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to add some tests to this class verifying that we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. It is tangential, but if we pass in a mock delegate to the constructor we can verify stuff on it right? Admittedly this kind of tests ForwardingTransaction itself too but I think that's fine |
||
}) | ||
.get(); | ||
} catch (InterruptedException | ExecutionException e) { | ||
throw Throwables.rewrapAndThrowUncheckedException(e.getCause()); | ||
|
@@ -133,7 +154,7 @@ public Map<Cell, byte[]> get(TableReference tableRef, Set<Cell> cells) { | |
|
||
@Override | ||
public ListenableFuture<Map<Cell, byte[]>> getAsync(TableReference tableRef, Set<Cell> cells) { | ||
return getWithLoader(tableRef, cells, super::getAsync); | ||
return getWithLoader(tableRef, cells, (table, _cacheCells, cellsToLoad) -> super.getAsync(table, cellsToLoad)); | ||
} | ||
|
||
private ListenableFuture<Map<Cell, byte[]>> getWithLoader( | ||
|
@@ -156,7 +177,7 @@ private ListenableFuture<Map<Cell, byte[]>> getWithLoader( | |
} | ||
|
||
return Futures.transform( | ||
cellLoader.load(tableRef, toLoad), | ||
cellLoader.load(tableRef, cacheHit, toLoad), | ||
loadedCells -> { | ||
cacheLoadedCells(tableRef, toLoad, loadedCells); | ||
cacheHit.putAll(loadedCells); | ||
|
@@ -271,6 +292,7 @@ public void abort() { | |
|
||
@FunctionalInterface | ||
private interface CellLoader { | ||
ListenableFuture<Map<Cell, byte[]>> load(TableReference tableReference, Set<Cell> toRead); | ||
ListenableFuture<Map<Cell, byte[]>> load( | ||
TableReference tableReference, Map<Cell, byte[]> cachedCells, Set<Cell> toRead); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,10 +43,9 @@ | |
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import javax.annotation.concurrent.ThreadSafe; | ||
import org.immutables.value.Value; | ||
|
||
@ThreadSafe | ||
final class TransactionScopedCacheImpl implements TransactionScopedCache { | ||
public final class TransactionScopedCacheImpl implements TransactionScopedCache { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason why we're now making this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah I see, in SnapshotTransactionTest There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, also felt a bit uneasy when doing the change to allow easier instantiation from the tests. But felt better than mocking or re-implementing a test version of it. |
||
private final TransactionCacheValueStore valueStore; | ||
private final CacheMetrics metrics; | ||
private volatile boolean finalised = false; | ||
|
@@ -56,7 +55,7 @@ private TransactionScopedCacheImpl(TransactionCacheValueStore valueStore, CacheM | |
this.metrics = metrics; | ||
} | ||
|
||
static TransactionScopedCache create(ValueCacheSnapshot snapshot, CacheMetrics metrics) { | ||
public static TransactionScopedCache create(ValueCacheSnapshot snapshot, CacheMetrics metrics) { | ||
return new TransactionScopedCacheImpl(new TransactionCacheValueStoreImpl(snapshot), metrics); | ||
} | ||
|
||
|
@@ -81,15 +80,34 @@ public Map<Cell, byte[]> get( | |
return AtlasFutures.getUnchecked(getAsync(tableReference, cells, valueLoader)); | ||
} | ||
|
||
@Override | ||
public Map<Cell, byte[]> getWithCachedRef( | ||
TableReference tableReference, | ||
Set<Cell> cells, | ||
Function<CacheLookupResult, ListenableFuture<Map<Cell, byte[]>>> valueLoader) { | ||
ensureNotFinalised(); | ||
return AtlasFutures.getUnchecked(getAsyncWithCachedRef(tableReference, cells, valueLoader)); | ||
} | ||
|
||
@Override | ||
public ListenableFuture<Map<Cell, byte[]>> getAsync( | ||
TableReference tableReference, | ||
Set<Cell> cells, | ||
Function<Set<Cell>, ListenableFuture<Map<Cell, byte[]>>> valueLoader) { | ||
ensureNotFinalised(); | ||
return getAsyncWithCachedRef( | ||
tableReference, cells, cacheLookupResult -> valueLoader.apply(cacheLookupResult.missedCells())); | ||
} | ||
|
||
@Override | ||
public ListenableFuture<Map<Cell, byte[]>> getAsyncWithCachedRef( | ||
TableReference tableReference, | ||
Set<Cell> cells, | ||
Function<CacheLookupResult, ListenableFuture<Map<Cell, byte[]>>> valueLoader) { | ||
ensureNotFinalised(); | ||
// Short-cut all the logic below if the table is not watched. | ||
if (!valueStore.isWatched(tableReference)) { | ||
return valueLoader.apply(cells); | ||
return valueLoader.apply(CacheLookupResult.of(Map.of(), cells)); | ||
} | ||
|
||
CacheLookupResult cacheLookup = cacheLookup(tableReference, cells); | ||
|
@@ -98,7 +116,7 @@ public ListenableFuture<Map<Cell, byte[]>> getAsync( | |
return Futures.immediateFuture(filterEmptyValues(cacheLookup.cacheHits())); | ||
} else { | ||
return Futures.transform( | ||
valueLoader.apply(cacheLookup.missedCells()), | ||
valueLoader.apply(cacheLookup), | ||
uncachedValues -> processUncachedCells( | ||
tableReference, cacheLookup.cacheHits(), cacheLookup.missedCells(), uncachedValues), | ||
MoreExecutors.directExecutor()); | ||
|
@@ -246,18 +264,4 @@ private static Map<Cell, byte[]> filterEmptyValues(Map<Cell, CacheValue> snapsho | |
.map(value -> value.value().get()) | ||
.collectToMap(); | ||
} | ||
|
||
@Value.Immutable | ||
interface CacheLookupResult { | ||
Map<Cell, CacheValue> cacheHits(); | ||
|
||
Set<Cell> missedCells(); | ||
|
||
static CacheLookupResult of(Map<Cell, CacheValue> cachedValues, Set<Cell> missedCells) { | ||
return ImmutableCacheLookupResult.builder() | ||
.cacheHits(cachedValues) | ||
.missedCells(missedCells) | ||
.build(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could have also just replaced the existing
get
andgetAsync
to receive theCacheLookupResult
instead of creating new two separate methods. Let me know what you prefer.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How bad would this change have been? Basically I think it's nicer to just have one get/getAsync, but I understand refactoring this might be very difficult/costly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very, tbh. Although this PR is already getting quite big, so I'll try to do the refactor in a separate one.