Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce sync job checkpoints and implement simple checkpointing in GitHub connector #2879

Closed
wants to merge 30 commits into from

Conversation

artem-shelkovnikov
Copy link
Member

@artem-shelkovnikov artem-shelkovnikov commented Oct 11, 2024

Closes https://github.com/elastic/search-team/issues/8289

This PR is WIP: it's here to demonstrate the approach to be able to discuss it.

This PR adds support for sync checkpoints: connector reports that it handled some atomic piece of work and if sync restarts this piece of work can be skipped. GitHub connector is updated to use this approach on repo-per-repo basis: if a repo was synced and sync gets suspended, then the sync will skip the synced repo after it's restarted.

Protocol

Protocol update is:

  • Introduce "checkpoint" field into .elastic-connectors-sync-jobs index of type object that stores free-form checkpoints.
  • (not done) Update connectors API to add checkpoint as an argument for update_job_metadata method OR add a separate method.

I've purposely named it different than cursor to be able to distinguish these two easier - cursors are for transferring state between syncs, checkpoints are for keeping state within the sync.

Checkpoints are stored in the job. When a connector is created, a new field checkpoint is populated with content of the field from the index and is accessible from connector via _checkpoint field similar to how cursors work. So during get_docs it's possible to call _checkpoint

❓ Might be worth to call it sync_state or job_state rather than a checkpoint.

GitHub connector

The logic update is very primitive - during get_docs every time a repository is processed save state to the _checkpoint. Before a repository is processed check if it was already processed, and if so just skip to the next one

Sink / Extractor arrangement

Fun starts here. So once a document is yielded from the connector, it's added to the queue, not sent directly to Elasticsearch. So to save a checkpoint we need to verify that the data that was extracted was already saved - otherwise we would save checkpoint before the data is saved. This could cause inconsistency issues when connector is suspended before the batch is saved.

To avoid this I've introduced the following logic:

  1. Checkpoints are updated every time job metadata is updated, not immediately. This does not have direct impact on the inconsistency, just something to keep in mind.
  2. Once the job stats tracking mechanism sees that a checkpoint is updated it calls to the SyncOrchestrator and says: "forcefully ingest the current batch without waiting for batch to be generated"
  3. Once the batch is ingested, the checkpoint is saved

Some more technical details/things to consider:

  • While data is ingested, connector keeps extracting the data from 3rd-party. This means that while batch is being sent, another checkpoint might be queued up for saving. We need to first save the first one, and then correctly flush data/save the second one. Current mechanism enforces it, but please double check my logic here
  • The way the above is implemented is SyncOrchestrator adding a new abstraction - ForceFlushSignal into the queue. Once queue meets this signal it stops reading and immediately sends data to Elasticsearch. In the meantime the owner of ForceFlushSignal awaits for it to be processed (see ForceFlushSignal.trigger). While this is happening, the logic that does job metadata/checkpoint processing is not executed! This prevents from having concurrent flushes.
  • The above means that in certain cases it will slow down the connector due to some inefficiency of the process, but I believe the slowdown will be insignificant

Checklists

Pre-Review Checklist

  • this PR does NOT contain credentials of any kind, such as API keys or username/passwords (double check config.yml.example)
  • this PR has a meaningful title
  • this PR links to all relevant github issues that it fixes or partially addresses
  • if there is no GH issue, please create it. Each PR should have a link to an issue
  • this PR has a thorough description
  • Covered the changes with automated tests
  • Tested the changes locally
  • Added a label for each target release version (example: v7.13.2, v7.14.0, v8.0.0)
  • Considered corresponding documentation changes
  • Contributed any configuration settings changes to the configuration reference
  • if you added or changed Rich Configurable Fields for a Native Connector, you made a corresponding PR in Kibana

Changes Requiring Extra Attention

  • Security-related changes (encryption, TLS, SSRF, etc)
  • New external service dependencies added.

Release Note

Framework now supports sync state: if connector implements it and sync for this connector gets suspended, then when sync is resumed it'll resume from the last checkpoint saved by a connector.

Github connector implements this logic: stores checkpoints per repository.

@artem-shelkovnikov artem-shelkovnikov requested a review from a team as a code owner October 11, 2024 16:22
@artem-shelkovnikov artem-shelkovnikov marked this pull request as draft October 11, 2024 16:22
@artem-shelkovnikov artem-shelkovnikov changed the title WIP Introduce sync job checkpoints and implement simple checkpointing in GitHub connector Oct 16, 2024
Copy link
Member

@seanstory seanstory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few nits/questions, but overall I think this is solid. Really extensible too, where the next service type needs only add minimal logic to add checkpointing.

connectors/es/sink.py Outdated Show resolved Hide resolved
connectors/es/sink.py Outdated Show resolved Hide resolved
connectors/es/sink.py Show resolved Hide resolved
@@ -290,6 +290,10 @@ def sync_cursor(self):
def terminated(self):
return self.status in (JobStatus.ERROR, JobStatus.COMPLETED, JobStatus.CANCELED)

@property
def checkpoint(self):
return self.get("checkpoint")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And remind me why you don't want to to use cursor?

Copy link
Member Author

@artem-shelkovnikov artem-shelkovnikov Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly to have them different cognitively: cursor is to pass state between sync jobs. checkpoint is to pass state within the job. I'm thinking of changing this variable to sync_state, how do you feel about it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would personally prefer checkpoint over sync_state. I would worry about sync state being conflated with the health of a sync.
I think pointer could be a good word as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like the original of checkpoint, if we're adding a new field.

cursor is to pass state between sync jobs. checkpoint is to pass state within the job

I don't think I agree with this distinction, though. Checkpoint is really for in case the job fails or is interrupted, right? In which case, it is for passing state between two different sync jobs (or, the same job, but two different runs of it). IMO, this feature is blurring the lines between full and incremental syncs, which I think is a good thing. I'm trying to come up with a situation where you wouldn't want this checkpoint capability to overwrite your cursor, and I'm struggling to come up with one. Which is why I'm thinking they should use the same field. Can you describe a counter-example?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to look at it from the following perspective:

  1. Immutable "cursor" that gets never updated on the job record after it's initialised is a great audit/tracing mechanism: you can look back in time to see what point the job started from
  2. The cursor gets updated only after successful full sync, meaning that if an incremental job is throttled/cancelled mid-way, the next one will try from same checkpoint. This is IMO pretty good, because the data get stale over time. If we choose to combine cursor with checkpoint, it can potentially bring bigger data inconsistencies: e.g. connector synced 50% of data from target system, hang up for a day, restarted and synced the rest 50%, whatever it means. First 50% of data will be a day older than the rest. Not sure if it's a large problem, because I don't know use-cases of our target audience.

So all in all having two fields is a bit more problematic in a short run - I'll also need to write a migration to remove the checkpoint field from mapping, so that there would be no field explosion/mapping conflicts. I don't know if all of the above make it worth to have both fields - theoretically the outcome is very similar.

I also like the blurred line between full/incremental sync - in UI they can be the same, but connector will have a single method that changes behaviour based on cursor provided.

Copy link
Member

@jedrazb jedrazb Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets pretty confusing as there a connector sync_cursor and sync job cursor that references connector cursor.

I'm trying to come up with a situation where you wouldn't want this checkpoint capability to overwrite your cursor, and I'm struggling to come up with one. Which is why I'm thinking they should use the same field. Can you describe a counter-example?

Resonating with what Artem mentioned, the sync_cursor acts as an immutable entry point state indicating where sync jobs should start (e.g., the last sync timestamp in google_drive). On the other hand, checkpoints track an arbitrarily defined unit of work within the same sync operation (e.g., synced drives in gdrive). These checkpoints do not necessarily need to align with the sync_cursor abstraction.

This distinction becomes significant when running incremental syncs (Note: I'm aware that Artem’s changes only impact full syncs). Still, I think it's worth discussing a potential scenario with incremental syncs, as we are aware that incremental syncs are necessity for some large connector users.

For instance, in google_drive incremental sync, we cannot overwrite the sync_cursor (which is a timestamp used with a delta API to fetch changes since the last sync) with a periodic checkpoint because:

  • The timestamp cannot be used as checkpoint since we cannot yet confirm that all changes up to that time have been fully synced (e.g., if the last sync was 24 hours ago and there are 10,000 new documents to process).
  • Therefore, for google_drive we might need to track specific units of work such as folders, shared drives, etc., for checkpointing purposes (similar to repos for github connector). This is not the same as timestamp sync cursor
  • If an incremental sync is interrupted (e.g., due to an agentless pod restart), both the last successful sync_cursor and the checkpoint are essential to correctly resume the incremental job.

So I'm supportive to add a new field called checkpoint as this will give us more flexibility with making incremental syncs more robust.

connectors/sources/github.py Outdated Show resolved Hide resolved
connectors/sync_job_runner.py Outdated Show resolved Hide resolved
Copy link
Member

@jedrazb jedrazb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job! Couple of nits and questions.

(not done) Update connectors API to add checkpoint as an argument for update_job_metadata method OR add a separate method.

This also need to include updating the client specification. Adding separate method might be better for o11y if we ever want to track total checkpoint data transfer as checpoints can easily grow (especially in agentless) but adding additional body param to update_job_metadata seems natural to me.

connectors/es/sink.py Show resolved Hide resolved
connectors/es/sink.py Show resolved Hide resolved
connectors/protocol/connectors.py Outdated Show resolved Hide resolved
connectors/sync_job_runner.py Show resolved Hide resolved
@artem-shelkovnikov artem-shelkovnikov marked this pull request as ready for review October 25, 2024 12:59
self._checkpoint = {}

when = datetime.now(timezone.utc)
self._checkpoint[repo_name] = when
Copy link
Contributor

@navarone-feekery navarone-feekery Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the value stored here just referential for logs etc.? Just wondering what (if any) usage plans the timestamp has.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly it's for debugging/tracing, there's no real need to have a timestamp here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with the timestamp, I just thought if it's used for comparison at some point it might run into issues with timezones, but maybe I'm overthinking things anyway.

@navarone-feekery
Copy link
Contributor

This looks really good! I really love the comments, makes it easy to understand the complex parts.

Comment on lines 404 to 405
worker_hostname: string; -> The hostname of the worker to run the job,
checkpoint: object; -> Latest known state of the sync. If sync is suspended and then resumed, connector will continue from this state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lingering

Comment on lines 494 to 495
"worker_hostname" : { "type" : "keyword" },
"checkpoint" : { "type" : "object" },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lingering

Comment on lines 401 to 402
# Similar thing, but for checkpointing to restart sync from the approximate place it crashed/suspended
self._checkpoint = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lingering

@@ -1147,6 +1147,7 @@ def __init__(self, configuration):

self._client = None
self.site_group_cache = {}
self._temporary_cursor = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand what's changed here.
This doesn't implement checkpointing for SPO, AFAICT. But this is just storing the cursor info we used to have in self._sync_cursor instead in self._temporary_cursor, and then overwriting self._sync_cursor with self._temporary_cursor at the end. Why keep it in to variables? As long as the intermediate self._sync_cursor isn't persisted to ES, it's effectively "temporary" because it's just in-memory for this instance of the DataSource, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the thing: to make checkpoints work, connector needs to be able to write its state to some variable. Since we use the same connector.sync_cursor, connector will store checkpoints in self._sync_cursor variable (see github implementation).

Then framework checks this variable from time to time, and if it changed, it's saved into the job state (not connector state).

Now thinking of it, this change is not necessary because the connector state is updated at the end of successful sync, but all of it is just highly confusing.

This is the main reason I wanted checkpoints separated - our current way of arranging things (different job types, incremental cursors) just amplify the complexity of checkpointing a lot. To have something solid we'd need a good revamp of this logic, now it feels it works but is very fragile

connectors/sync_job_runner.py Outdated Show resolved Hide resolved
connectors/sync_job_runner.py Outdated Show resolved Hide resolved
result = self.sync_orchestrator.ingestion_stats()
ingestion_stats = {
INDEXED_DOCUMENT_COUNT: result.get(INDEXED_DOCUMENT_COUNT, 0),
INDEXED_DOCUMENT_VOLUME: result.get(INDEXED_DOCUMENT_VOLUME, 0),
DELETED_DOCUMENT_COUNT: result.get(DELETED_DOCUMENT_COUNT, 0),
}
await self.sync_job.update_metadata(ingestion_stats=ingestion_stats)
await self.sync_job.update_metadata(
ingestion_stats=ingestion_stats, cursor=cursor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to check here that the cursor was updated but non-null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update_metadata internally checks that:

                if cursor:
                    doc["connector"] = {"sync_cursor": cursor}

Idk if it would cause confusion too, maybe there should be a way for the connector to indicate that cursor needs a reset, e.g. if it became corrupted?

tests/test_sync_job_runner.py Show resolved Hide resolved
@artem-shelkovnikov artem-shelkovnikov requested a review from a team November 14, 2024 13:15
connectors/es/sink.py Outdated Show resolved Hide resolved
Co-authored-by: Jedr Blaszyk <[email protected]>
@seanstory
Copy link
Member

Should we close this? We can always re-open or cherry-pick later when we come back to this.

@artem-shelkovnikov
Copy link
Member Author

Good call, closing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants