-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cancel python model job when dbt exit #690
cancel python model job when dbt exit #690
Conversation
Hi @benc-db Could you please help review on this PR? the code is not ready, but I want you look it first, see if the method is fine. it related to this issue: #684 I think we can utilize Databricks workspace, we create a job_run_ids dir, each python model create one file named run_id in the job_run_ids dir. When dbt canceled, we read from this job_run_ids dir, and cancel all run_id in it, then we delete the file. |
And I have tested it, the job can be canceled, and fail-fast works ok. |
for run_id in run_ids: | ||
self._cancel_run_id(run_id_dir, run_id) | ||
|
||
return super().cancel_open() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below you raise an exception on a non-200, but that will interrupt cancelling the other operations. Better to log a warning on non-200 I think.
|
||
return super().cancel_open() | ||
|
||
def _cancel_run_id(self, run_id_dir: str, run_id: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since neither of these methods rely on anything in self, I think I would prefer them as static functions in python_submissions.py, so they are closer to the code that they are cleaning up.
If we can locate the target folder, I think I would prefer writing there, so that we don't rely on an API operation to store and retrieve. Also, I think we need to lock around writing the file to ensure clean operation when kicking off multiple python models concurrently. |
@mikealfare we're trying to figure out how to cancel python jobs as part of cleanup, similar to what is done for SQL queries when the user ctrl-Cs. Is there a better way to communicate run_ids from the python job helper to the connection manager? We were wondering if maybe there was some global state that would help the python job helper figure out the target directory? |
@jtcohen6 as well |
Hi @benc-db Many thanks for you help. I didn't found a way to get target path in python_submission.py, I'll try to find today. |
I'm reaching out to dbt Labs folks to see if there is a better way. In particular, @mikealfare has worked on the dbt-spark adapter, so if we figure it out, it might be good for that library too. |
Hi @benc-db I'm thinking can we use a class static variable to share run_ids? |
Good point. I'm generally so anti-global state that I didn't even think of it :P. We still need to protect it from concurrency issues, but global state is going to be our best bet until we get support from dbt-core, and better to store in memory than cloud files. |
Yes...I don't want to use it too...but seems like if we don't want to do things in dbt-core, it's the only way we can share the state in two class..Let's me write some code in this way! |
Hi @benc-db, I revise the code, using a global variable to store all the run id, then cancel them in ConnectManager. please help to review this, thank you very much! |
@@ -475,6 +476,17 @@ class DatabricksConnectionManager(SparkConnectionManager): | |||
TYPE: str = "databricks" | |||
credentials_provider: Optional[TCredentialProvider] = None | |||
|
|||
def cancel_open(self) -> List[str]: | |||
from dbt.adapters.databricks.python_submissions import BaseDatabricksHelper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import at the top please. We only import in place like this if the thing we're importing is too heavy to do at start up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
from dbt.adapters.databricks.python_submissions import BaseDatabricksHelper | ||
|
||
for run_id in BaseDatabricksHelper.run_ids: | ||
logger.info(f"cancel run id {run_id}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be debug, and we should mention that it's a python model job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
@@ -15,9 +15,11 @@ | |||
|
|||
class token_auth(CredentialsProvider): | |||
_token: str | |||
_host: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why store this on the token? It's already on the DatabricksCredentials.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like I can't get DatabricksCredentials in DatabricksConnectionManager.
I can just use self.credentials_provider in DatabricksConnectionManager, but there are no host in credentials_provider, so I put a host in the token_auth class.
Could you give me a direction how to get DatabricksCredentials in DatabricksConnectionManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The BaseDatabricksHelper has a copy of DatabricksCredentials.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, but that's an instance...let me think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to pull down a copy of this PR and see if i can figure it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes..think I can't get instance in DatabricksConnectionManager...
Thank you very much!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, did you already fix this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I can't found a way.. so I still use the way put host in token_auth. so that the host can be retrieve from DatabricksConnectionManager.credentials_provider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good, just some minor comments to clean up.
It looks like we might be trying to do something similar here for |
def cancel_open(self) -> List[str]: | ||
for run_id in BaseDatabricksHelper.run_ids: | ||
logger.debug(f"Cancel python model job: {run_id}") | ||
BaseDatabricksHelper.cancel_run_id(run_id, self.credentials_provider.as_dict()['token'], self.credentials_provider.as_dict()['host']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, this is where you need to retrieve, and here you don't have an instance...we can use singleton pattern maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
give me an hour to take a crack at refactoring this; I have an idea :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Singleton maybe its a way, Let me try some code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah! thank you very much!
This is annoying, but is actually teaching me a lot about our python model support lol. It's taking me longer than I expected because I'm trying to get it to work with all credentials and all execution formats (i.e. commands vs notebooks) |
No Rush! thanks for your support. please let me know when you done, and I can modify the code in this PR~ |
Hi @benc-db I'm reaching out to see if there are any things I can help! |
I'll have a new PR up shortly...got stalled because weather knocked out my internet yesterday. After I put up my PR, if you could download and validate that it works for your scenario, that would be great. |
@gaoshihang closing in favor of #693. Please take a look and verify it works for your use case. |
Hi @benc-db , thank you very much! will do that later, and let you know! |
Resolves #684
Description
Checklist
CHANGELOG.md
and added information about my change to the "dbt-databricks next" section.