Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global op concurrency limits across all runs (without needing Celery) #12470

Closed
gibsondan opened this issue Feb 22, 2023 · 10 comments
Closed

Global op concurrency limits across all runs (without needing Celery) #12470

gibsondan opened this issue Feb 22, 2023 · 10 comments
Labels
area: concurrency Related to controlling concurrent execution type: feature-request

Comments

@gibsondan
Copy link
Member

What's the use case?

Setting a global limit across all runs for the number of ops with a given tag that can run at a given time, for example to protect access to a shared resources. Right now you can apply run-level concurrency in Dagster, and op-level concurrency within a single run, but you can't apply a global limit for an op (or asset) across all runs.

Ideas of implementation

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@yeachan153
Copy link

Hi @gibsondan, this would be indeed nice to have as a feature. Do you have an idea how you'd implement this? I thought of a couple of approaches and wanted to get your feedback. The first one was implemented, and it seems to work as expected.

Approach 1:
Create a new table in the DB that contains the name of the pool, occupied number of slots for that pool and the total number of slots for that pool as columns. The executor increments the occupied number of slots after checking the current figure is below the total number of allowed slots for that pool. If satisfied, the OP is allowed to start. During this check and increment, we use a lock that prevents reading + updating the number of occupied slots on stale values (think the FOR UPDATE lock in Postgres would do this). Once the task is no longer running (i.e. terminated, success, failure), decrement the number of occupied slots.

We tried this implementation and it seemed to work. We made changes to this function by adding the incrementing functionality:

, and decremented the number of occupied slots in this function conditionally:
def handle_event(self, dagster_event: DagsterEvent) -> None:
.

It would have been nice to pass the pool config as an OP level config, however that would have been a more intrusive change so for now we extended the executor configuration to allow setting which pool to use for which OP.

Pros:

  • Seems like quite simple unintrusive change
  • We just decrement/increment values atomically in one column (occupied slots) for a slot, so doesn't add too much IO operations on the DB.

Cons:

  • Slightly worried what would happen in case there's a connectivity issue (not only due to networking, but maybe the DB itself is down for instance) and we fail to decrement the number of occupied slots.
  • Session per run, which is quite a few more connections to the DB

Approach 2:
We create a new table but only store the name of the pools and the total number of slots for each pool. Instead of incrementing or decrementing, we check the number of tasks that are running and have a pool attached to them. If the total number of tasks running with that pool < total number of slots available in that pool, then we can start the OP. This seemed a bit more of an invasive change. I also cannot currently find any information about which OPs are running other than from the event logs table, and I assume we'd need to add some information to show which pool is attached to which OP.

Pros:

  • No issues with failing to decrement due to connectivity issues

Cons:

  • Definitely seems like more DB I/O operations compared to the first solution
  • Session per run, which is quite a few more connections to the DB

@gibsondan
Copy link
Member Author

Thanks so much for the detailed thoughts @yeachan153 - it'd be interesting to see your implementation of #1 in a PR if that's an option (even if it's not fully baked yet).

We've also been exploring solutions that address one of the cons you listed in both of your proposals (that there's still a session for each run and each run is still running in its own isolated task for the duration of the run, even if it now might be spending much more time just waiting for an op to be available to run). Addressing that would probably require more significant changes to Dagster's execution model - instead of always having each run be in its own isolated process we would need to have some more centralized process within our daemon figuring out which steps are eligible to be launched and then launching those steps.

@yeachan153
Copy link

yeachan153 commented Mar 30, 2023

Sure, it's definitely more of a functional/hacky PR rather than a mergeable PR - #13250. For some context, we're running things on Kubernetes using the multi-process executor, so it might not generalise for other executors.

We're create a Pools table in an init container by calling this function (haven't really spent time trying to integrate this properly within Dagster):

Then incrementing/decrementing when OPs can start/finish. I'm not too familiar with the source code so LMK if I'm incrementing/decrementing in the wrong places.

instead of always having each run be in its own isolated process we would need to have some more centralized process within our daemon figuring out which steps are eligible to be launched and then launching those steps.

I definitely like this aspect about Dagster. It makes things way quite a bit more scaleable and keeps things simple in the daemon, which is a definite positive IMO.

@chriscomeau79
Copy link

chriscomeau79 commented May 23, 2023

I was thinking of trying dask distributed Lock/Semaphore for this myself, but inside the asset ops - maybe something like that could work?

Like a simpler version of the dagster-dask executor that only uses a dask scheduler for centralized concurrency control, and otherwise delegates op execution to the multiprocess executor (or another)

I'm not sure if this is actually viable, might run into similar problems with failing to release locks/leases.

@gustavo-delfosim
Copy link

@sryza
Copy link
Contributor

sryza commented Jul 24, 2023

@gustavo-delfosim - yes (though still experimental).

@sryza sryza closed this as completed Jul 24, 2023
@gustavo-delfosim
Copy link

Hi @sryza !
I and my team are trying the feature today.
We tried the following MWE, inspired in the tests:
io/dagster/blob/5a4a2685b13d21c1fa49d44675ef0756e97670f3/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_active.py#L184

from dagster import Definitions, op, job
import time


GLOBAL_CONCURRENCY_TAG = "dagster/concurrency_key"

T = 20

@op(tags={GLOBAL_CONCURRENCY_TAG: "foo"})
def foo_op():
    time.sleep(T)


@op(tags={GLOBAL_CONCURRENCY_TAG: "foo"})
def bar_op():
    time.sleep(T)


@job
def foo_job():
    foo_op()
    bar_op()


defs = Definitions(jobs=[foo_job])

I just changed the functions to sleep for T=20 seconds.

I set up the foo concurrency in the UI to 1 task and tried to run the job through the UI, expecting it to run in ~40 s, as each op should run sequentially due to concurrency limitations. However, the job runs in ~20 s.

Is this a bug in this feature? Or am I missing something to set it up?

@sryza
Copy link
Contributor

sryza commented Aug 3, 2023

@prha - mind taking a look at this one?

@prha
Copy link
Member

prha commented Aug 3, 2023

Hi @gustavo-delfosim What version of dagster are you running, and what is your storage implementation?

@gustavo-delfosim
Copy link

Hi @gustavo-delfosim What version of dagster are you running, and what is your storage implementation?

We were using SQLite as the storage, and that's the reason why it did not work. After switching to Postgres, as indicated in the docs, it worked as expected.

@garethbrickman garethbrickman added the area: concurrency Related to controlling concurrent execution label Apr 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: concurrency Related to controlling concurrent execution type: feature-request
Projects
None yet
Development

No branches or pull requests

7 participants