-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Rollup] Improve ID scheme for rollup documents #32558
Conversation
Previously, we were using a simple CRC32 for the IDs of rollup documents. This is a very poor choice however, since 32bit IDs leads to collisions between documents very quickly. This commit moves Rollups over to a 128bit ID. The ID is a concatenation of all the keys in the document (similar to the rolling CRC before), hashed with 128bit Murmur3, then base64 encoded. Finally, the job ID and a delimiter (`$`) are prepended to the ID. This gurantees that there are 128bits per-job. 128bits should essentially remove all chances of collisions, and the prepended job ID means that _if_ there is a collision, it stays "within" the job. BWC notes: We can only upgrade the ID scheme after we know there has been a good checkpoint during indexing. We don't rely on a STARTED/STOPPED status since we can't guarantee that resulted from a real checkpoint, or other state. So we only upgrade the ID after we have reached a checkpoint state during an active index run, and only after the checkpoint has been confirmed. Once a job has been upgraded and checkpointed, the version increments and the new ID is used in the future. All new jobs use the new ID from the start
Pinging @elastic/es-search-aggs |
@@ -54,6 +55,7 @@ | |||
private final AtomicReference<IndexerState> state; | |||
private final AtomicReference<Map<String, Object>> position; | |||
private final Executor executor; | |||
protected final AtomicBoolean upgradedDocumentID; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: an atomic is probably overkill here... only the indexer thread can update it, so we probably could have used a volatile here. But I didn't want to mess around with concurrency semantics on this bugfix so went with a simple, easily-reasoned atomic.
@@ -240,6 +262,8 @@ public synchronized void start(ActionListener<StartRollupJobAction.Response> lis | |||
listener.onResponse(new StartRollupJobAction.Response(true)); | |||
}, | |||
(exc) -> { | |||
// We were unable to update the persistent status, so we need to shutdown the indexer too. | |||
indexer.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unrelated to the bugfix, but looked like a bug in it's own right so I fixed here. We shouldn't let the indexer keep running if we weren't able to persist the state... could lead to a strange situation where we are persisted as stopped
but indexing a bunch of data.
// 2. we persist STOPPED now, indexer continues a bit but then dies. When/if we resume we'll pick up | ||
// at last checkpoint, overwrite some docs and eventually checkpoint. At that time we'll also | ||
// upgrade the ID scheme | ||
RollupJobStatus state = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition(), upgradedDocumentID.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a clarification because the old comment was equal parts vague, confusing and a bit wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @polyfractal ! It looks good overall.
I left some minor comments regarding the hash that we generate but otherwise LGTM.
byte[] hashedBytes = new byte[16]; | ||
System.arraycopy(Numbers.longToBytes(hasher.h1), 0, hashedBytes, 0, 8); | ||
System.arraycopy(Numbers.longToBytes(hasher.h2), 0, hashedBytes, 8, 8); | ||
return jobId + "$" + Base64.getUrlEncoder().withoutPadding().encodeToString(hashedBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we use 128 bits I think it's ok to just update the hash with the jobId instead of adding it as a prefix ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The prefix compression should handle the jobId efficiently so let s keep it this way. This will prevent collision accross jobs so please forget my last comment ;).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that works for me. Just need to tweak the tests a bit, since I was using the job ID as a shortcut for "old vs new" ID... but we can use length instead :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For posterity we chatted in slack and decided to leave the job name prepended. Prefix encoding should make the overhead minimal, and it is convenient to have the job ID attached (as well as preventing some other forms of collisions due to same job but different interval, etc)
initialState = IndexerState.STOPPED; | ||
} else { | ||
initialState = existingState; | ||
} | ||
initialPosition = state.getPosition(); | ||
|
||
// Since we have state, there could a incomplete checkpoint so | ||
// use the state's ID scheme |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can you clarify ? ;)
*/ | ||
public static class Murmur3 extends RollupIDGenerator { | ||
private static final long SEED = 19; | ||
private static final BytesRef DELIM = new BytesRef("$".getBytes(StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new BytesRef("$")
should be enough
public static class Murmur3 extends RollupIDGenerator { | ||
private static final long SEED = 19; | ||
private static final BytesRef DELIM = new BytesRef("$".getBytes(StandardCharsets.UTF_8)); | ||
private static final byte[] NULL_PLACEHOLDER = "__NULL_PLACEHOLDER__830f1de2__".getBytes(StandardCharsets.UTF_8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use a BytesRef ?
public abstract void addNull(); | ||
public abstract String getID(); | ||
|
||
private boolean generated = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't proved this to myself by writing a multithreaded driver, but, looking at the code, I am pretty certain that this class can't be used in a multithreaded env, because check
and setFlag
are not coordinated. if I am not wrong, we should probably either document this, or, potentially re-implement this class as a builder, where mutated methods return a modified instance? 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a javadoc explaining the non-threadsafe nature.
This is used by the indexer which is single-threaded in Rollup (only a single thread-per-job is running the indexer). We could make it an AtomicBoolean
and make it thread safe, but given the current Rollup design it's overkill. I don't see this being used outside of Rollup right now, so ++ to just documenting the limitation.
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private Map<String, Object> getJob(Map<String, Object> jobsMap, String targetJobId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be static? not sure our convention here, but I've always biased towards making methods that don't depend on instance state static. same for getJob
. kind of a nit, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can do. :)
client().performRequest(indexRequest); | ||
|
||
// create the rollup job | ||
final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-id-test"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worth creating a constant for _xpack/rollup/job/
part of path?
|
||
if (CLUSTER_TYPE == ClusterType.MIXED && Booleans.parseBoolean(System.getProperty("tests.first_round"))) { | ||
final Request indexRequest = new Request("POST", "/target/_doc/2"); | ||
indexRequest.setJsonEntity("{\"timestamp\":\"2018-01-02T00:00:01\",\"value\":345}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kind of a nit, but when I've done this kind of thing, I always regret it when I use hardcoded timestamps. Can we refactor to use a base timestamp, and then use date math to add a day and such? that would make it easier to randomize base timestamp, and more clear what the test is doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can... but it will complicate the test and may make it flakier.
The IDs that are generated in rollup docs are essentially a concatenation of the composite agg keys, which are then hashed (either CRC32 in old, or Murmur3 in new). The composite keys will be values like the "2018-01-02T00:00:01"
and 345
.
So by hardcoding the timestamps we know what IDs are going to be generated at each phase of the rolling upgrade and can test for them explicitly.
If we randomize, we'll also have to run the values through the ID generator to create the hashes. I always get worried when I have to use part of the thing under test to verify the test itself. E.g. if we break the ID generator in the future, the "broken" IDs would then change in the test and we might not notice.
We do have some unit tests that check the ID itself (IndexerUtilsTests#testKeyOrderingOldID()
, IndexerUtilsTests#testKeyOrderingNewID()
, IndexerUtilsTests#testKeyOrderingNewIDLong()
) so maybe it's not an issue... but it does cause me a bit of concern.
WDYT?
In either case, I can document what this test is doing so it's more clear that the values being indexed are directly affecting the IDs which are being verified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that seems fine. fwiw, I am not saying we should necessarily randomize, but that for readability and maintainability, it's clearer to have (forgive the pseudocode)
DateTime baseDate = new DateTime("2010-01-01")
indexRequest.setJsonEntity("timestamp"+baseDate.plusDays(1).toString());
....
indexRequest.setJsonEntity("timestamp"+baseDate.plusDays(2).toString());
...
again this is kind of a nit but I've found as test scope grows, that hardcoded dates that depend on each other throughout the tests are hard to manage, and mutating a base date makes refactoring things easier and clearer to our future selves what the test is doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roger, parameterizing but not-randomizing seems fine with me. Will make the change :)
|
||
if (CLUSTER_TYPE == ClusterType.MIXED && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false) { | ||
final Request indexRequest = new Request("POST", "/target/_doc/3"); | ||
indexRequest.setJsonEntity("{\"timestamp\":\"2018-01-03T00:00:01\",\"value\":456}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above comment on timestamps
|
||
if (CLUSTER_TYPE == ClusterType.UPGRADED) { | ||
final Request indexRequest = new Request("POST", "/target/_doc/4"); | ||
indexRequest.setJsonEntity("{\"timestamp\":\"2018-01-04T00:00:01\",\"value\":567}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above comment on timestamps :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work!
Thanks @jimczi @pcsanwald! :) |
Previously, we were using a simple CRC32 for the IDs of rollup documents. This is a very poor choice however, since 32bit IDs leads to collisions between documents very quickly. This commit moves Rollups over to a 128bit ID. The ID is a concatenation of all the keys in the document (similar to the rolling CRC before), hashed with 128bit Murmur3, then base64 encoded. Finally, the job ID and a delimiter (`$`) are prepended to the ID. This gurantees that there are 128bits per-job. 128bits should essentially remove all chances of collisions, and the prepended job ID means that _if_ there is a collision, it stays "within" the job. BWC notes: We can only upgrade the ID scheme after we know there has been a good checkpoint during indexing. We don't rely on a STARTED/STOPPED status since we can't guarantee that resulted from a real checkpoint, or other state. So we only upgrade the ID after we have reached a checkpoint state during an active index run, and only after the checkpoint has been confirmed. Once a job has been upgraded and checkpointed, the version increments and the new ID is used in the future. All new jobs use the new ID from the start
Previously, we were using a simple CRC32 for the IDs of rollup documents. This is a very poor choice however, since 32bit IDs leads to collisions between documents very quickly. This commit moves Rollups over to a 128bit ID. The ID is a concatenation of all the keys in the document (similar to the rolling CRC before), hashed with 128bit Murmur3, then base64 encoded. Finally, the job ID and a delimiter (`$`) are prepended to the ID. This gurantees that there are 128bits per-job. 128bits should essentially remove all chances of collisions, and the prepended job ID means that _if_ there is a collision, it stays "within" the job. BWC notes: We can only upgrade the ID scheme after we know there has been a good checkpoint during indexing. We don't rely on a STARTED/STOPPED status since we can't guarantee that resulted from a real checkpoint, or other state. So we only upgrade the ID after we have reached a checkpoint state during an active index run, and only after the checkpoint has been confirmed. Once a job has been upgraded and checkpointed, the version increments and the new ID is used in the future. All new jobs use the new ID from the start
…pe-detection-with-leading-whitespace * elastic/master: (34 commits) Cross-cluster search: preserve cluster alias in shard failures (elastic#32608) Handle AlreadyClosedException when bumping primary term [TEST] Allow to run in FIPS JVM (elastic#32607) [Test] Add ckb to the list of unsupported languages (elastic#32611) SCRIPTING: Move Aggregation Scripts to their own context (elastic#32068) Painless: Use LocalMethod Map For Lookup at Runtime (elastic#32599) [TEST] Enhance failure message when bulk updates have failures [ML] Add ML result classes to protocol library (elastic#32587) Suppress LicensingDocumentationIT.testPutLicense in release builds (elastic#32613) [Rollup] Update wire version check after backport Suppress Wildfly test in FIPS JVMs (elastic#32543) [Rollup] Improve ID scheme for rollup documents (elastic#32558) ingest: doc: move Dot Expander Processor doc to correct position (elastic#31743) [ML] Add some ML config classes to protocol library (elastic#32502) [TEST]Split transport verification mode none tests (elastic#32488) Core: Move helper date formatters over to java time (elastic#32504) [Rollup] Remove builders from DateHistogramGroupConfig (elastic#32555) [TEST} unmutes SearchAsyncActionTests and adds debugging info [ML] Add Detector config classes to protocol library (elastic#32495) [Rollup] Remove builders from MetricConfig (elastic#32536) ...
* 6.x: [Kerberos] Use canonical host name (#32588) Cross-cluster search: preserve cluster alias in shard failures (#32608) [TEST] Allow to run in FIPS JVM (#32607) Handle AlreadyClosedException when bumping primary term [Test] Add ckb to the list of unsupported languages (#32611) SCRIPTING: Move Aggregation Scripts to their own context (#32068) (#32629) [TEST] Enhance failure message when bulk updates have failures [ML] Add ML result classes to protocol library (#32587) Suppress LicensingDocumentationIT.testPutLicense in release builds (#32613) [Rollup] Improve ID scheme for rollup documents (#32558) Mutes failing SQL string function tests due to #32589 Suppress Wildfly test in FIPS JVMs (#32543) Add cluster UUID to Cluster Stats API response (#32206) [ML] Add some ML config classes to protocol library (#32502) [TEST]Split transport verification mode none tests (#32488) [Rollup] Remove builders from DateHistogramGroupConfig (#32555) [ML] Add Detector config classes to protocol library (#32495) [Rollup] Remove builders from MetricConfig (#32536) Fix race between replica reset and primary promotion (#32442) HLRC: Move commercial clients from XPackClient (#32596) Security: move User to protocol project (#32367) Minor fix for javadoc (applicable for java 11). (#32573) Painless: Move Some Lookup Logic to PainlessLookup (#32565) Core: Minor size reduction for AbstractComponent (#32509) INGEST: Enable default pipelines (#32286) (#32591) TEST: Avoid merges in testSeqNoAndCheckpoints [Rollup] Remove builders from HistoGroupConfig (#32533) fixed elements in array of produced terms (#32519) Mutes ReindexFailureTests.searchFailure dues to #28053 Mutes LicensingDocumentationIT due to #32580 Remove the SATA controller from OpenSUSE box [ML] Rename JobProvider to JobResultsProvider (#32551)
* master: Cross-cluster search: preserve cluster alias in shard failures (#32608) Handle AlreadyClosedException when bumping primary term [TEST] Allow to run in FIPS JVM (#32607) [Test] Add ckb to the list of unsupported languages (#32611) SCRIPTING: Move Aggregation Scripts to their own context (#32068) Painless: Use LocalMethod Map For Lookup at Runtime (#32599) [TEST] Enhance failure message when bulk updates have failures [ML] Add ML result classes to protocol library (#32587) Suppress LicensingDocumentationIT.testPutLicense in release builds (#32613) [Rollup] Update wire version check after backport Suppress Wildfly test in FIPS JVMs (#32543) [Rollup] Improve ID scheme for rollup documents (#32558) ingest: doc: move Dot Expander Processor doc to correct position (#31743) [ML] Add some ML config classes to protocol library (#32502) [TEST]Split transport verification mode none tests (#32488) Core: Move helper date formatters over to java time (#32504) [Rollup] Remove builders from DateHistogramGroupConfig (#32555) [TEST} unmutes SearchAsyncActionTests and adds debugging info [ML] Add Detector config classes to protocol library (#32495) [Rollup] Remove builders from MetricConfig (#32536) Tests: Add rolling upgrade tests for watcher (#32428) Fix race between replica reset and primary promotion (#32442)
Previously, we were using a simple CRC32 for the IDs of rollup documents. This is a very poor choice however, since 32bit IDs leads to collisions between documents very quickly.
This PR moves Rollups over to a 128bit ID. The ID is a concatenation of all the keys in the document (similar to the rolling CRC before), hashed with 128bit Murmur3, then base64 encoded. Finally, the job ID and a delimiter (
$
) are prepended to the ID.This guarantees that there are 128bits per-job. 128bits should essentially remove all chances of collisions, and the prepended job ID means that if there is a collision, it stays "within" the job.
BWC notes:
We can only upgrade the ID scheme after we know there has been a good checkpoint during indexing. We don't rely on a STARTED/STOPPED status since we can't guarantee that resulted from a real checkpoint, or some other circumstance. So we only upgrade the ID after we have reached a checkpoint state during an active index run, and only after the checkpoint has been confirmed.
Once a job has been upgraded and checkpointed, the version increments and the new ID is used in the future. All new jobs use the new ID from the start.
The flag for this is stored in the
RollupJobStatus
and persisted through the persistent task framework. I would have preferred this flag be on theRollupJob
itself, but the persistent task framework doesn't allow updates to the Params, only the Status. That's why the"upgraded_doc_id"
flag leaks into the JSON everywhere. But that will probably be a useful diagnostic/support tool so I don't think it's too terrible.Testing:
Adds both a rolling upgrade BWC test, and a full cluster restart test. Might be overkill, but I wanted to make sure it didn't break anything.
Closes #32372