Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CHECK the Source an Destination before SYNC to denote config_error #12423

Merged
merged 41 commits into from
May 17, 2022

Conversation

evantahler
Copy link
Contributor

@evantahler evantahler commented Apr 27, 2022

What

This change runs the CHECK command of both the Source and Destination connector as the first step of the SYNC workflow. This will allow the worker to quickly exit the sync and share with the user that an update to the connection's configuration is required, saving time and credits. This PR will assign the "config_error" failure reason so these types of errors can be excluded from our connector metrics.

Screen Shot 2022-05-16 at 3 26 46 PM

How

The main changes are to SyncWorkflowImpl.java and ConnectionManagerWorkflowImpl.java to:

  • Run the CHECK activities for the source and destination
  • Craft a StandardSyncOutput response for those CHECK failures that has the same properties as a failed sync - CheckFailureSyncOutput
  • Various renames and utility classes

User Impact

There will be a slight slow-down when starting every sync going forward. Future work can choose to only run these CHECKs periodically. Situations where CHECK will fail will also fail the sync, but this will be quicker now.

Notes

To run this PR, you will need to use the new scheduler: NEW_SCHEDULER=true VERSION=dev docker-compose -f docker-compose.yaml -f docker-compose.debug.yaml up. The new scheduler will make it to OSS Airbyte soon, via #10021

Closes #12281
Closes #12282

TODO:

  • Get it working
  • Prevent subsequent attempts
  • Test it
  • Check on any special considerations when running on K8s 🤷

@github-actions github-actions bot added area/platform issues related to the platform area/worker Related to worker labels Apr 27, 2022
@evantahler evantahler temporarily deployed to more-secrets April 27, 2022 22:27 Inactive
@evantahler evantahler temporarily deployed to more-secrets April 27, 2022 22:27 Inactive
@evantahler evantahler changed the title Research into how the Worker works for #12281 Research into how the Worker works for #12281 (CHECK the Source an Destination before the SYNC) Apr 27, 2022
@evantahler evantahler changed the title Research into how the Worker works for #12281 (CHECK the Source an Destination before the SYNC) CHECK the Source an Destination before SYNC to denote config_error Apr 28, 2022
@evantahler evantahler temporarily deployed to more-secrets April 28, 2022 21:39 Inactive
@evantahler evantahler temporarily deployed to more-secrets April 28, 2022 21:40 Inactive
@sherifnada sherifnada self-requested a review April 28, 2022 22:51
@evantahler evantahler temporarily deployed to more-secrets April 28, 2022 23:54 Inactive
@evantahler evantahler temporarily deployed to more-secrets April 28, 2022 23:54 Inactive
@evantahler evantahler temporarily deployed to more-secrets April 29, 2022 16:46 Inactive
@evantahler evantahler temporarily deployed to more-secrets April 29, 2022 16:46 Inactive
Copy link
Contributor Author

@evantahler evantahler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[OUTDATED]

Getting closer, and learning more about how temporal works!

At the moment, I'm struggling to stop the workflow early and indicate that the workflow itself was a success (e.g. nothing threw and/or should be retried), but still save off a FailureReason.

The code I have in this commit (as of ac29d3d) does stop executing after a failed check, but then temporal tries to retry the workflow and has an error:

airbyte-worker      | 2022-04-29 16:42:47 WARN i.t.i.r.ReplayWorkflowTaskHandler(failureToWFTResult):245 - Workflow task processing failure. startedEventId=43, WorkflowId=f4fb1134-e4c4-4bf9-b18f-070713d48e6e, RunId=ce24c42a-b55f-4d6b-b36e-e0b26504ceb0. If seen continuously the workflow might be stuck.
airbyte-worker      | io.temporal.worker.NonDeterministicException: Failure handling event 17 of type 'EVENT_TYPE_ACTIVITY_TASK_SCHEDULED' during replay. Event 17 of type EVENT_TYPE_ACTIVITY_TASK_SCHEDULED does not match command type COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION. {PreviousStartedEventId=15, workflowTaskStartedEventId=43, Currently Processing StartedEventId=15}
airbyte-worker      | 	at io.temporal.internal.statemachines.WorkflowStateMachines.handleCommandEvent(WorkflowStateMachines.java:329) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at io.temporal.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:228) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:199) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at io.temporal.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:175) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:175) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at io.temporal.internal.replay.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:145) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTaskWithQuery(ReplayWorkflowTaskHandler.java:122) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at io.temporal.internal.replay.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:97) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:241) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at io.temporal.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:199) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93) ~[temporal-sdk-1.8.1.jar:?]
airbyte-worker      | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
airbyte-worker      | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
airbyte-worker      | 	at java.lang.Thread.run(Thread.java:833) [?:?]

@evantahler evantahler temporarily deployed to more-secrets May 2, 2022 19:38 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 2, 2022 19:38 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 2, 2022 19:45 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 2, 2022 19:45 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 2, 2022 21:03 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 2, 2022 21:03 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 2, 2022 22:10 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 2, 2022 22:10 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 2, 2022 22:23 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 2, 2022 22:23 Inactive
@evantahler
Copy link
Contributor Author

@lmossman The tests randomly time out, so I'm re-running, but things are green locally - ready for re-review!

@evantahler
Copy link
Contributor Author

evantahler commented May 17, 2022

The UI to show both the internal and external failure messages happens via #12896

@evantahler evantahler temporarily deployed to more-secrets May 17, 2022 00:01 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 17, 2022 00:01 Inactive
final StandardCheckConnectionInput sourceConfiguration = new StandardCheckConnectionInput().withConnectionConfiguration(sourceConfig);
final CheckConnectionInput checkSourceInput = new CheckConnectionInput(jobRunConfig, sourceLauncherConfig, sourceConfiguration);

if (workflowState.isResetConnection() || checkFailure.isFailed()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep - just future-proofing.

@evantahler evantahler temporarily deployed to more-secrets May 17, 2022 00:18 Inactive
@evantahler evantahler temporarily deployed to more-secrets May 17, 2022 00:18 Inactive
Copy link
Contributor

@lmossman lmossman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@evantahler evantahler merged commit aea7205 into master May 17, 2022
@evantahler evantahler deleted the evan/check-before-sync branch May 17, 2022 16:08
suhomud pushed a commit that referenced this pull request May 23, 2022
…or` (#12423)

* WIP scafolding for WorkerRun

* register check activity in temporal for the sync job type (#12425)

* WIP with a lot of comments and trials

* cleanup, sync still tryies to repeat

* cleanup CheckConnectionWorkflowImpl

* Syncs stop (but retry)

* Consider workflow version

* Use logger

* remove logging

* Lint updates

* attempting to stop jobs with failureType=config_error

* do not retry with another attempt

* stop workflow when CONFIG_ERROR

* Lint

* unit tests

* Lint

* missing space

* cleaner retry logic

* lint

* Safety around null standardSyncOutput

* try skipping withRetryable(false)

* revert reportFailure/reportSuccess logic changes

* revert the revert

* Move logic to ConnectionManager

* more reverts in SyncWorkflow

* revert test imports

* Test destination failure

* version-gate checkConnections

* Lint

* revert method name back to `run` and consider special case for RESET docker image

* log when skipped

* So that's how you compare strings in Java...

* Adress linter comments

* simplify checkConnections response type with SyncCheckConnectionFailure POJO

* Simplify args

* more actionable error message

* use `isResetConnection` for workflow logic

* Move jobId and attemptId handling into pojo

* Fix and update tests

* Lint

* Remove null checks

Co-authored-by: Sherif A. Nada <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/platform issues related to the platform area/worker Related to worker team/extensibility
Projects
None yet
5 participants