Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing dask-based parallel backend for the AnalysisBase.run() #4158

Closed
marinegor opened this issue May 29, 2023 · 40 comments · Fixed by #4162
Closed

Introducing dask-based parallel backend for the AnalysisBase.run() #4158

marinegor opened this issue May 29, 2023 · 40 comments · Fixed by #4162

Comments

@marinegor
Copy link
Contributor

Motivation

This project focuses on improving MDAnalysis speed and scalability. I am planning to implement a parallel backend for the MDAnalysis library. The backend is supposed to be implemented using dask library, allowing users to seamlessly run their analysis either on powerful local machines or various clusters, such as SLURM. There was a proof-of-concept fork of the MDAnalysis library, pmda, which implemented this idea for a subset of analysis methods implemented in the MDAnalysis.

Basically, proposal idea is to refactor pmda methods into the current version of MDAnalysis.

Proposed solution

A key component of the MDAnalysis library is the AnalysisBase class, from which all objects that allow user to run an analysis of a trajectory are inherited. Namely, it implements a run method, that looks somewhat like that:

def run(self, start=None, stop=None, step=None, frames=None, ...):	
	self._setup_frames(self._trajectory, start=start, stop=stop, step=step, frames=frames)
	
	self._prepare()
	for i, ts in enumerate(self._sliced_trajectory, ...):
		...
		self._single_frame()
	self._conclude()

and consists of three steps:

setting up frames for reading – may include options to analyze only a part of the trajectory in time coordinate (change time-step, start/stop, etc)
preparing for the analysis: may include preparation of some arrays storing intermediate data, etc
running analysis of a single frame
and concluding the results – e.g. average some quantity calculated for each frame separately.

For a setup with multiple worker processes, this protocol will require an additional step of first separating a trajectory into blocks. Each block will be processed with a single separate process, and also results from different blocks will potentially be concluded separately:

def run(self, start=None, stop=None, step=None, frames=None, scheduler: Optional[str]=None):
	if scheduler is None:
	# fallback to the old behavior
		self._setup_frames(self._trajectory, start=start, stop=stop, step=step, frames=frames)
		
		self._prepare()
		for i, ts in enumerate(self._sliced_trajectory, ...):
			...
			self._single_frame()
		self._conclude()
		
	else:
		self._configure_scheduler(scheduler=scheduler)
		self._setup_blocks(start=start, stop=stop, step=step, frames=frames) 
		# split trajectory into blocks according to scheduler settings

		tasks = []
		for block in self._blocks:
			# create separate tasks 
			# that would fall back to the old behavior 
			# and schedule them as dask tasks
			subrun = self.__class__(start=block.start, stop=block.stop, step=block.step, frames=block.frames, scheduler=None)
			dask_task = dask.delayed(subrun.run)
			tasks.append(dask_task)
		
		# perform dask computation
		tasks = dask.delayed(tasks)
		res = tasks.compute(**self._scheduler_params)
		self._parallel_conclude()

Which requires introducing following methods for the AnalysisBase class:

class AnalysisBase(object):
	def _configure_scheduler(self, scheduler=scheduler, **params):
	    ...

	@property
	def _blocks(self):
		...
	
	def _setup_blocks(start=start, stop=stop, step=step, frames=frames):
		# will also update `self._blocks` accordingly
		...

	def _parallel_conclude(...):
		...

Which is similar to the protocol implemented in pmda. Such a modular design has following advantages:

  • it reuses the previously existing code for prepare and conclude methods for the subclasses, and hence will successfully run on sub-trajectories
  • it requires developers to introduce only a proper non-default parallel_conclude method for sophisticated results combination
  • it allows to raise an exception for subclasses that for some reason don’t allow such parallelization (e.g. rely on results from previous frames when computing current one), via re-implementing _configure_scheduler and raising an exception in not-None cases.

Alternatives

If you're looking into speeding up of your MD trajectory analysis, I'd recommend looking into pmda. It is an older fork of MDAnalysis (before 2.0) that explores the same idea that I want to implement here, but for limited amount of AnalysisBase subclasses.

Additional context

This issue is a part of GSOC project, and most of the code-related communication is supposed to happen here.

@IAlibay
Copy link
Member

IAlibay commented May 29, 2023

An initial curveball here - dask is a rather heavy dependency, could all this be achieved with dask as an optional dependency?

@marinegor
Copy link
Contributor Author

@IAlibay I think it is possible to configure imports and dependencies this way, yes.
I'm not sure if it's worth making this a default option, but I guess it depends on the project's success. If it's minor, we can always introduce something like

# default option -- without dask
$ pip install --upgrade MDAnalysis

# more advanced option -- with it, perhaps supporting less python versions/platforms/etc
$ pip install --upgrade MDAnalysis[dask]

As for the run() protocol itself, the fallback to the old behavior happens when scheduler=None, which as well might be a default option.

@hmacdope
Copy link
Member

Very exciting!

@yuxuanzhuang
Copy link
Contributor

Could you create a table to show e.g.

| Analysis class | available backend | PR | Merged? |
|---------------|-------------------|-----|----------|

@p-j-smith
Copy link
Member

This is very cool! One suggestion though about:

it allows to raise an exception for subclasses that for some reason don’t allow such parallelization (e.g. rely on results from previous frames when computing current one), via re-implementing _configure_scheduler and raising an exception in not-None cases.

Similar to the suggestion in #4259, I think it would be good to keep AnalysisBase be serial only, and have a ParallelizableAnalysisBase that inherits from it and offers other backends. This way users won't try to set n_workers, n_parts, backend, or client in the call to run, and developers don't need to raise an exception if any of these arguments are not None

@orbeckst orbeckst linked a pull request Aug 27, 2023 that will close this issue
4 tasks
@orbeckst
Copy link
Member

I am not quite sure how we would do ParallelizableAnalysisBase and AnalysisBase without duplicating a lot of code. In the current PR #4162 the default is that any AnalysisBase is not capable of parallel analysis by default. The availability of parallelization needs to be explicitly enabled by defining available_backends with more than the ("local",) (=serial only) backend (the default), e.g., ('local', 'multiprocessing', 'dask', 'dask.distributed'). The reason is that it is impossible to reliably aggregate the data without helping the code with some explicit hints (which is what the new AnalysisBase._get_aggregator() method together with ResultsGroup does). We made the choice to resist the temptation to guess and not automatically upgrade every existing AnalysisBase-based analysis to parallel because it would almost certainly lead to crashing code and potentially incorrect results.

So if all goes according to plan then

  1. code inside MDA will be updated to enable parallel analysis where we determine it safe to do
  2. other existing code sees no changes (and will only run in serial)
  3. other existing code can be easily upgraded to enable parallel analysis when the authors determine that it's safe to do so
  4. new analysis classes should decide from the beginning if they can support parallelization and enable it when possible

Please feel free to comment on the PR, too!

@orbeckst
Copy link
Member

Specific comment on marking AnalysisBase as parallelizable or not in #4162 (comment) on the PR.

@IAlibay
Copy link
Member

IAlibay commented Aug 28, 2023

My 2 cents

What I would prefer is to:

  1. Avoid class proliferation where possible
  2. Avoid the issue where this still occurs because we have various backends available that work for some things but not others (i.e. a case where you have both 'mpi' and 'dask', but one class can only work with 'dask' and the other 'mpi').

I would prefer a usage pattern where analysis class authors opt in on backends, rather than blanket say "it's parallelisable" or not. This is why my main ask was that the default AnalysisBase should always only have a 'local' / 'serial' backend as its available options (see: https://github.com/MDAnalysis/mdanalysis/pull/4162/files#r1278286718).

Analysis authors then have to make a conscious decision as to whether or not a given backend will work for them.

Caveat

One of the things I've not had a chance to review in #4162 yet is:

a) How this will look in terms of documentation - it might be that this is far too complicated for users to get behind
b) How adding a new backend would look - if I develop a new method and I want to register a new backend because "my particular implementation needs this", how does that work?

@orbeckst
Copy link
Member

orbeckst commented Aug 28, 2023

Quick comment: conceptually the question if an algorithm is parallelizable is different from which backend can be used to run it.

Given that backends are “just” implementation details I really don’t think that user code should need to track what happens in the background. As a user I want to be able to choose and try out any backend that becomes available. If the underlying algorithm can be parallelized with split-apply-combine then I should be able to select any backend and in the worst case it crashes.

@orbeckst
Copy link
Member

I agree that docs from both user and developer perspective are still lacking. For the user side, have a look at @marinegor ‘s final blog post https://marinegor.github.io/posts/2023/06/gsoc-summary (scroll down).

@IAlibay
Copy link
Member

IAlibay commented Aug 28, 2023

Given that backends are “just” implementation details I really don’t think that user code should need to track what happens in the background. As a user I want to be able to choose and try out any backend that becomes available. If the underlying algorithm can be parallelized with split-apply-combine then I should be able to select any backend and in the worst case it crashes.

@orbeckst I'm not sure I follow / agree here - if a developer knows that something is a bad idea, they should be able to dictate the allowed parameters of their method. I.e. ring fencing users as a design choice is ok, not an implementation detail?

@p-j-smith
Copy link
Member

I am not quite sure how we would do ParallelizableAnalysisBase and AnalysisBase without duplicating a lot of code.

Yeah that's a fair point - it could increase the maintainance burden. But from looking at the existing AnalysisBase and the one in #4162 it seems like there wouldn't be much duplicated code: the __init__, _setup_frames, _prepare, _single_frame, and _conclude methods are identical - the only one that differs is the run method (plus the additional ones for handling parallelization. So I imagine ParallelizableAnalysisBase would look something like this (excluding the methods for handling parallelization):

class ParallelizableAnalysisBase(AnalysisBase):

    def __init__(self, trajectory, verbose=False, **kwargs):
        super().__init__(trajectory, verbose, **kwargs)

    def run(
        self,
        start: int = None,
        stop: int = None,
        step: int = None,
        frames: Iterable = None,
        verbose: bool = None,
        n_workers: int = None,
        n_parts: int = None,
        backend: str = None,
        *,
        client: object = None,
        progressbar_kwargs={},
    ):
        if backend == "local":
            return super().run(
                start,
                stop,
                step,
                frames,
                verbose,
                progressbar_kwargs={},
            )
        
        # Otherwise handle some parallel stuff

Although I get that you may want to avoid having additional classes for various reasons, as @IAlibay mentioned, which is completely understandable.

So if all goes according to plan then

  • code inside MDA will be updated to enable parallel analysis where we determine it safe to do
  • other existing code sees no changes (and will only run in serial)
  • other existing code can be easily upgraded to enable parallel analysis when the authors determine that it's safe to do so
  • new analysis classes should decide from the beginning if they can support parallelization and enable it when possible

That sounds great! I hadn't realised that by default only local would be a permitted backend - that was the main thing I was concerned about with these changes :)

@p-j-smith
Copy link
Member

Given that backends are “just” implementation details I really don’t think that user code should need to track what happens in the background. As a user I want to be able to choose and try out any backend that becomes available. If the underlying algorithm can be parallelized with split-apply-combine then I should be able to select any backend and in the worst case it crashes.

@orbeckst I'm not sure I follow / agree here - if a developer knows that something is a bad idea, they should be able to dictate the allowed parameters of their method. I.e. ring fencing users as a design choice is ok, not an implementation detail?

It's quite probable that I am missing somethings about the differences between using multiprocessing, dask, and dask distributed, but I agree with @orbeckst in that if an algorithm can safely be run in parallel with e.g. multiprocessing then I would assume it should also be safe to run it in parallel with dask or dask distributed too?

@IAlibay
Copy link
Member

IAlibay commented Aug 28, 2023

if an algorithm can safely be run in parallel with e.g. multiprocessing then I would assume it should also be safe to run it in parallel with dask or dask distributed too?

Sure, but what happens when you throw in mpi as a backend?

@orbeckst
Copy link
Member

orbeckst commented Aug 28, 2023

Sure, but what happens when you throw in mpi as a backend?

It's on us to provide an implementation of ParallelExecutor._compute_with_mpi() that properly uses MPI to parallelize the run. (To be honest, I am not sure yet how we would do an MPI implementation but it would still just do split-apply-combine over blocks of frames ("compute groups"), i.e., the same algorithm for parallelization that we use for multiprocessing and dask.)

I don't see a way to automate complicated parallelization (e.g., within a trajectory frame) within the generic AnalysisBase framework. But by restricting ourselves to a simple parallel algorithm we can separate the implementation details from the algorithm via @marinegor 's ParallelExecutor. Either our ParallelExecutor._compute_with_FRAMEWORK() works for everything or it works for nothing and hence I don't believe that developers should need to care about it and thus ultimately the only questions is if the analysis can be run with split-apply-combine.

@marinegor please voice your opinion, too, especially if I am misunderstanding how the current approach is supposed work. (@yuxuanzhuang and @RMeli I'd very much value your views, too, of course.)

@IAlibay
Copy link
Member

IAlibay commented Aug 28, 2023

@orbeckst if I'm understanding this correctly, you're saying that the onus on ensuring that all backends work for all methods / future subclasses, etc.. is squarely on the MDA core library? This seems like we're undertaking a massive maintenance burden here.

@IAlibay
Copy link
Member

IAlibay commented Aug 28, 2023

Either our ParallelExecutor._compute_with_FRAMEWORK() works for everything or it works for nothing and hence I don't believe that developers should need to care about it and thus ultimately the only questions is if the analysis can be run with split-apply-combine.

Maybe I'm just completely misunderstanding what you mean here, but it looks like we have a difference in opinion between the idea of how a method should work and it's eventual application. My take is that whilst it should always work, we know that edge cases exist, they could be anything from no handling file writing in a certain way to some arbitrary race condition that only happens in certain backends. If that happens, the a developer should have the right to ring fence users into a set of backends that they know things will work for them.

edit: an additional consideration here is if someone creates say an mdakit that is just fundamentally incompatible with a backend, due to whatever complicated dependency hell folks have gotten themselves into.

@orbeckst
Copy link
Member

if I'm understanding this correctly, you're saying that the onus on ensuring that all backends work for all methods / future subclasses, etc.. is squarely on the MDA core library? This seems like we're undertaking a massive maintenance burden here.

If we enable parallelization for analysis classes in MDAnalysis.analysis then we should be able to test them. In principle, the current testing framework can do that. It just seems a heavy lift for CI so we might not be able to do this for every commit. That's a point for discussion.

We can't guarantee that it will work for code that we didn't write but I think that's the same for every piece of code that we publish.

The current parallel backends are the ones we'd expect people to use — multiprocessing, dask, dask.distributed. What's the alternative to offering these?

@IAlibay
Copy link
Member

IAlibay commented Aug 28, 2023

We can't guarantee that it will work for code that we didn't write but I think that's the same for every piece of code that we publish.

I think that's where my current thought pattern is circling around - because we can't assume that everything will work with what we provide, the onus on ensuring a downstream analysis class works with all backends is on the developer not on the core library.

Hence my view that by consequence, the decision to expose / restrict a given set of backends is down to the analysis class author?

Edit: it's not immediately clear that we're talking about the same scope - I'm mostly talking about the downstream world.

@orbeckst
Copy link
Member

Maybe I'm just completely misunderstanding what you mean here, but it looks like we have a difference in opinion between the idea of how a method should work and it's eventual application. My take is that whilst it should always work, we know that edge cases exist, they could be anything from no handling file writing in a certain way to some arbitrary race condition that only happens in certain backends. If that happens, the a developer should have the right to ring fence users into a set of backends that they know things will work for them.

That's fair. And they can certainly change available_backends in their derived class to only list the ones they like. Or we could have a blacklisted_backends that anyone can use to remove any backends from whatever MDA ships as default available_backends.

edit: an additional consideration here is if someone creates say an mdakit that is just fundamentally incompatible with a backend, due to whatever complicated dependency hell folks have gotten themselves into.

Yes, that's complicated. I want to say that it's the MDAKit author's responsibility to test all functionality, including parallel tests. However, that seems unreasonable if we promise "magic parallelization of anything that uses 'AnalysisBase'". We also can't just run their serial tests "in parallel" as this requires specialized tests.

I don't think that dependency hell will be the issue. multiprocessing is standard library and if people choose dask as a backend they should have installed it. At the end of the day, parallel execution is opt-in (via argument processing of run()) so it requires user choices.

From the discussion I take away

  • We need to be clear what we promise — perhaps we should mark parallel support as experimental (it's opt-in, after all).
  • We need to provide good docs for developers for how to test in parallel and what they need to consider (in addition to what requirements the split-apply-combine approach has).
  • We need to figure out how we will sustainably ensure that our own code works in parallel.

@orbeckst
Copy link
Member

orbeckst commented Aug 28, 2023

@p-j-smith thanks for analysing the proposed code. I can see the appeal of leaving AnalysisBase as is (if that's possible) as it presents much less of a danger to horribly break everything. I also appreciate and share @IAlibay 's concern about class proliferation so I am seeing our discussion here as a thought exercise to see if advantages would outweigh the disadvantages.

Assuming that it is possible to rewrite PR #4162 so that

  1. we keep the current serial AnalysisBase unchanged
  2. derive ParallelAnalysisBase(AnalysisBase)

then I see the following pros and cons for splitting AnalysisBase (serial) and ParallelAnalysisBase (parallel)

cons

  • more classes
  • potentially some code duplication
  • developers may ignore ParallelAnalysisBase (because they don't know about it, because outdated tutorials say so) and use AnalysisBase and code that could be parallelized automatically never benefits from the new feature
  • developers may only develop for standard AnalysisBase and then rely so much on "serial"-only features in AnalysisBase (if they exist?) that it becomes difficult to easily parallelize code that would otherwise be perfectly parallelizable
  • development and maintenance leaves ParallelAnalysisBase behind and code rots — basically all related to potentially poor uptake of ParallelAnalysisBase

pro

  • AnalysisBase (serial) is isolated from large changes in ParallelAnalysisBase, which should increase reliability of existing code
  • If we use a two-class approach for transformations as well (see discussion in Raise an error is parallelizable=True is passed to the NoJump transformation #4259) then we establish a consistent pattern.
  • Clear distinction between classes that can work with split-apply-combined (isinstance(self, ParallelAnalysisBase)) vs the ones who are not.

@p-j-smith
Copy link
Member

Thanks @orbeckst for pretty thoroughly covering the pros and cons of having serpate classes for serial and parallel analysis. I think the most important con - and one I hadn't considered - is your point:

developers may ignore ParallelAnalysisBase (because they don't know about it, because outdated tutorials say so) and use AnalysisBase and code that could be parallelized automatically never benefits from the new feature

I don't know if there's an easy solutions to this. I've added a proof-of-concept of how a ParallelAnalysisBase might look in #4269.In that pr, I've updated RMSD to use the new parallel base. The changes required were:

  • inherit from ParallelAnalysisBase
  • add a _get_aggregator method

This is pretty similar to the changes required to enable parallelisation in #4162, which instead requires:

  • explicitly define allow backends through the available_backends
  • add a _get_aggregator method

But you're still right that people may not be aware that ParallelAnalysisBase exists. But equally downstream developers may be unaware that the available_backends property has been added

@p-j-smith
Copy link
Member

p-j-smith commented Aug 29, 2023

Regarding available backends - I think it would be reasonable to say to users that MDAnalysis has some official backends for its core analysis classes and:

  1. all core (parallel) analysis classes (i.e. those that ship with MDAnalysis) can be used with all official backends (i.e. those implemented by MDAnalysis)
  2. you can use the official backends with your own analysis but it's not guaranteed to work
  3. you can use you own backend with core analysis classes (or your own) but it's not guaranteed to work

In terms of 3., in #4269 I've changed the way backends are defined. Now a class (or None) must be passed to ParallelAnalysis.run, and this class must implement a apply method that has the following signature:

def apply(self, func: Callable, computations: list, n_workers: int) -> list:

This apply method is equivalent to each of the backend-specific _compute methods in #4162 (e.g. _compute_with_multiprocessing) - it will perform the actual analysis, in parallel. I think there are several benefits to this approach, and probably some downsides too:

cons

  • yet more classes added
  • it's more difficult for developers to limit exactly which backends are allowed (unless e.g. there's a list of allowed backends, or list of disallowed backends)
  • ...

pros

  • as alluded to above - downstream developers and end users can easily add their own backends by implementing a class with an apply method with the correct signature. So for example, if someone likes using spark, ray, or mpi, they can implement their own backend and try this out with all existing parallel analysis classes - MDAnalysis ones, downstream ones, and their own analyses. With the approach in [GSoC] Parallelisation of AnalysisBase with multiprocessing and dask #4162, to add you own backend you would need to:
    • subclass ParallelExecutor and update the __init__, available_backends, and apply methods, as well as add their own backend-specific compute method
    • subclass every analysis class they want to use and re-implement the _configure_client method to use their ParallelExecutor subclass
  • for similar reasons, it's (in principle) easy to add support for new official backends in MDAnalysis - add the class to mda.analysis.backends and write some tests for it
  • In [GSoC] Parallelisation of AnalysisBase with multiprocessing and dask #4162 the run method has a backend and a client arguments. Theclient argument must be set in order to use the Dask Distributed backend, and must be None otherwise. With the addition of more backends there might be more backend-specific arguments to run, making the api more complicated and difficult to use. By implementing backends as classes this is avoided as the backend configuration (e.g. setting the client) can be done when instantiating the backend class
  • ...

Edit: Added 'pro' about preventing the .run api from becoming more complicated when more backends are added

@orbeckst
Copy link
Member

Thanks for putting up PR #4269 . It's an interesting approach abd there are quite a few things in your code that I liked. I only had time for a cursory glance, though. I hope others also have a look.

@p-j-smith correctly points out that even with the approach in PR #4162 we will not be able to automagically add parallel capability to downstream analysis classes purely via inheritance — as shown for the RMSD class in PR #4162 and nicely summarized in #4158 (comment) we still need modifications (available_backends and _get_aggregator()). My view is that available_backends should be defined in the base class and then potentially filtered (see #4158 (comment)) so defining available_backends may be made optional. However, aggregating the results properly is difficult and one decisions during the course of the last few months was that we do not want to guess how to aggregate but rather have the user specify this important part of the algorithm. So in conclusion, I agree that regardless what we do, users/developers will have to explicitly upgrade their code to make use of parallelization. Ultimately, that seems the safer approach and might go some way to alleviate @IAlibay 's concerns about breaking of downstream code.

@yuxuanzhuang
Copy link
Contributor

@p-j-smith Thanks so much for your suggestions! At the moment, I won't be able to offer my thoughts, but I'm definitely looking forward to participating at a later time.

@marinegor
Copy link
Contributor Author

Hi everyone, sorry for being silent for the last few days -- rain travels took the soul out of me. I'm following the discussion closely, and will respond in more details later this week.

@marinegor
Copy link
Contributor Author

marinegor commented Aug 31, 2023

Hi everyone, I feel like the discussion above now has condensed to this list of questions:

principal:

  1. Maintenance: how to propagate new backends onto dependent packages/user code?
  2. Flexibility: how can users (=dependent packages/user code) can add their own backends / parallelizable analysis classes?

and design:

  1. Inheritance: should we separate AnalysisBase from ParallelAnalysisBase or not?
  2. Generalization: should we introduce some classification on backends (i.e. mark methods parallelizable or whatever) that would enable automatic addition of new backends that follow the same principle (i.e. "split-apply-combine")?

My point of view on them -- sorry for being too wordy sometimes :)


  1. Maintenance: how to propagate new backends onto dependent packages/user code?

I agree with @orbeckst here: "regardless what we do, users/developers will have to explicitly upgrade their code to make use of parallelization". As MDAnalysis library, we must maintain following: a) all core objects are properly serializable; b) ParallelExecutor works the same for all backends (without the AnalysisBase) and c) properly interacts with AnalysisBase. After that, its on users/developers to ensure their objects are properly serializeable & their computations can indeed be performed in "split-apply-combine" manner, and if yes -- add necessary available_backends property to their class.

  1. Flexibility: how can users (=dependent packages/user code) can add their own backends / parallelizable analysis classes?

Although @p-j-smith proposed an interesting solution with ParallelAnalysisBase class, I think that in the current approach it's still quite simple to introduce a new backend option (although indeed requires some changes to the codebase).

For instance, I imagine we could introduce a custom_executor_class into _configure_client and separate options dictionary in ParallelExecutor.apply() into an attribute:

class AnalysisBase:
	...
    def _configure_client(self, backend: str, n_workers: int, client, **, executor_class: ParallelExecutor):
    	if executor_class is not ParallelExecutor:
    		return executor_class(backend, n_workers)

        ...

class ParallelExecutor:
	def _options(self):
        options = {
            self._compute_with_client: self.client is not None,
            self._compute_with_dask: self.backend == "dask",
            self._compute_with_multiprocessing: self.backend == "multiprocessing",
            self._compute_with_local: self.backend == "local",
        }
		return options
	
	def apply(self, func: Callable, computations: list) -> list:
        for map_, condition in self._options().items():
            if condition:
                return map_(func, computations)
        raise ValueError(f"Backend or client is not set properly: {self.backend=}, {self.client=}")

By that, introducing a new backend would work like this -- very similarly to @p-j-smith:

  • subclass ParallelExecutor and implement a new _compute_with_YOUR_BACKEND method & update available_backends
  • subclass every analysis class they want to use and re-implement the _configure_client method to use their ParallelExecutor subclass using simply:
class CustomExecutor(ParallelExecutor):
	def _compute_with_SOMETHING(self, func: Callable, computations: list):
		...

	def _options(self):
		options = super()._options()
		options.update({self._compute_with_SOMETHING: self.backend == 'SOMETHING'})

class SomeCustomClass(AnalysisBase):
    def _configure_client(self, backend: str, n_workers: int, client):
    	super()._configure_client(backend, n_workers, CustomExecutor)

    def available_backends(self):
    	return (..., 'SOMETHING')

with like 4 lines of boilerplate and the actual code to generate a new backend.

Of course, the example above must be documented and also put into "Contributing" section on the documentation.

  1. Inheritance: should we separate AnalysisBase from ParallelAnalysisBase or not?

I agree with @IAlibay that we should avoid class proliferation, and design our code in a way that focuses on class as something that does one thing but does it good. Ultimately, AnalysisBase does following: prepares trajectory, runs _single_frame method over it, and gathers the results. I don't see any reason to have two classes doing it -- if there are multiple ways of doing it e.g. with different backends, than we must introduce separate objects that do this thing well, instead of creating instances of AnalysisBase that do one thing slightly differently. Also, @p-j-smith is right saying that some developers might not even be aware of the second AnalysisBase class, so I'd really prefer keeping it in one.
Side note: since I'm not that experienced in library design, I took my inspiration here.

  1. Generalization: should we introduce some classification on backends (i.e. mark methods parallelizable or whatever) that would enable automatic addition of new backends that follow the same principle (i.e. "split-apply-combine")?

Since in pt. 1 I already followed @orbeckst on being explicit and defining allowed backends for each class (otherwise it's only local), I feel that there is not much sense anymore to define some is_parallelizable properties -- we'll anyway list all necessary backends explicitly.


Miscelaneous:

If we use a two-class approach for transformations as well (see discussion in #4259) then we establish a consistent pattern (by @orbeckst)

Following "composition over inheritance", I'd rather introduce a separate little class that iterates over the trajectory depending on transformation, and then feed it into _compute method. For now, ProgressBar serves this purpose, but it can be any other class, I imagine. And we can extend it indefinitely without introducing more and more classes, which is nice.

If we now introduce a two-class approach (twice already!), we'll end up with 4 classes that do the same thing in 4 different ways, and it's reeeeeally unclear for a developer which one to use in which case. Also, we have to decide from which of them to inherit each of the existing subclasses every time, and modify tests accordingly. Both seem like a huge maintenance burden.

@IAlibay
Copy link
Member

IAlibay commented Aug 31, 2023

Thanks for summarising the conversation @marinegor - would organising a call to discuss this all further be wise here? I feel like there's a lot here and it might be better suited to synchronous communication.

One extra note on maintenance

One thing I don't see explicitly in the summary, but I want to make sure is at the forefront of this discussion is not only "downstream users/packages", but also the maintenance of this current library.

Unfortunately the core library already has a medium to large maintenance burden spread on a very small amount of part-time developers. I know that all new feature will involve their own additional maintenance burdens, and "more work" shouldn't be a reason to shy away from improvements. However, as part of our discussions we should aim to assign a relative maintenance cost to all proposed solutions - including what backends we expose etc...

@p-j-smith
Copy link
Member

p-j-smith commented Aug 31, 2023

Thanks for the detailed reply @marinegor ! I think @IAlibay is right and it would perhaps be easier to have a call to discuss some of these points.

I'll just try to clarify one point I perhaps didn't explain very well. You mentioned that if a user wants to implement a new backend:

By that, introducing a new backend would work like this -- very similarly to @p-j-smith:

  • subclass ParallelExecutor and implement a new _compute_with_YOUR_BACKEND method & update available_backends
  • subclass every analysis class they want to use and re-implement the _configure_client method to use their ParallelExecutor subclass using simply:
class CustomExecutor(ParallelExecutor):
  def _compute_with_SOMETHING(self, func: Callable, computations: list):
  	...

  def _options(self):
  	options = super()._options()
  	options.update({self._compute_with_SOMETHING: self.backend == 'SOMETHING'})

class SomeCustomClass(AnalysisBase):
   def _configure_client(self, backend: str, n_workers: int, client):
   	super()._configure_client(backend, n_workers, CustomExecutor)

   def available_backends(self):
   	return (..., 'SOMETHING')

with like 4 lines of boilerplate and the actual code to generate a new backend.

This is how a user could implement their own backend for their own analysis with the approach you've taken in #4162, which at first doesn't seem llke too much effort.

However, if a user wants to use their backend with any of the analyses in MDAnalysis.analysis, they would also have to subclass those too, e.g.:

class HBondsWithAnotherBakend(HydrogenBondsAnalysis):
    def _configure_client(self, backend: str, n_workers: int, client):
    	super()._configure_client(backend, n_workers, CustomExecutor)

    def available_backends(self):
    	return (..., 'another_backend')

class RMSDWithAnotherBakend(RMSD):
    def _configure_client(self, backend: str, n_workers: int, client):
    	super()._configure_client(backend, n_workers, CustomExecutor)

    def available_backends(self):
    	return (..., 'another_backend')

etc.

However, with the approach in #4269 it is much simpler to add a new backend. You would define your backend:

class MyBackend:

    def __init__(self, <some input parameters if needed>):
        # do some initialisation if needed

    def apply(self, func: Callable, computations: list, n_workers: int) -> list:
        # do something
        return results

and then you can pass this backend to any parallel analysis class - your own analysis as well as MDAnalysis ones - without the need to subclass them

The benefit of this is that MDAnalysis could decide to implement e.g. only the MultiProcessing backend and then have tutorials showing users how they can easily add their own backend (using e.g. Dask, Dask Distributed). This might go some way to alleviating the maintainance burden on MDAnalysis developers that @IAlibay quite rightly brought up

@p-j-smith
Copy link
Member

would organising a call to discuss this all further be wise here? I feel like there's a lot here and it might be better suited to synchronous communication.

I think that's a great idea

@marinegor
Copy link
Contributor Author

@p-j-smith ah, I see now, thanks!

Let me rephrase that to ensure I understand it correctly: you're basically suggesting to move from combination "backend/client + ParallelExecutor class" to "Backend class", and pass this class as an argument instead of backend: str, right?

If yes, I would imagine we could have it the following way (without introducing a new class):

  • backend: str remains as an argument for AnalysisBase.run()
  • alternatively one could pass executor: BackendExecutor with following signature:
class BackendExecutor:
    def __init__(self, n_workers: int):
        self.n_workers = n_workers
   
    def apply(self, func: Callable, computations: list) -> list:
        raise NotImplementedError

class MyExecutor(BackendExecutor):
    def apply(self, func: Callable, computations: list) -> list:
        ... # do some real work here

then current ParallelExecutor gets renamed to BuiltinExecutor, and carries all built-in backends, while developers can add new backends simply via passing MyExecutor instance to SubclassOfAnalysisBase.run().

@marinegor
Copy link
Contributor Author

Regarding synchronous discussion -- I suggest we move it to #developers room on MDAnalysis discord. I just left a message there!

@marinegor
Copy link
Contributor Author

Hi everyone, I implemented what I meant here: marinegor#1

It introduces a BackendBase class and few built-in subclasses of it for executing with dask, dask.distributed and multiprocessing backends. If user doesn't want to use those, anything that has a apply method can be passed to AnalysisBase.run() method as backend=SomethingThatHasApply argument.

@orbeckst
Copy link
Member

Here's the summary of our discussion today (@marinegor @RMeli @p-j-smith ); sorry you couldn't be there @yuxuanzhuang but please have a look at the outcomes below:

Overall goal

Provide a simple approach to parallelize code following the AnalysisBase structure with the split-apply-combine algorithm.

This will not happen automagically for any AnalysisBase-derived class because developers need to ensure that their specific code still produces correct results in parallel (we cannot guarantee that) but required changes to code are minimal.

General guiding principles

  • be correct
  • be conservative in making new features available (put burden of checking if parallel backends work on code developers and have them explicitly update code)
  • start with requiring explicit user choices and add conveniences/simple defaults later (it's ok to mark the code initially as "experimental" and improve user interface later)
  • consider the maintenance burden on the core library and CI

Specific decisions

  1. One AnalysisBase class, but use class attribute parallel to indicate if the code can work with split-apply-combine:
    • parallel == True means that we (or the class author) promise that the class's algorithm (in _single_frame() and _conclude()) works with the split-apply-combine approach
    • list the backends that work with this code in attribute available_backends
  2. Use backend classes.
    • Allow run(..., backend=str|class) to select backend.
    • check SubClass.available_backends for what you can pass as backend=; if not listed raise an error (but see below how users can experiment with custom backends)
    • If parallel == False, only allow the serial backend, raise error for anything else.
    • If parallel == True, any backend in available_backends can be be chosen.
    • Allow users to pass any backend (including custom ones that have uses YourBackend.apply() method defined as a computation engine) but users have to be explicit about it and pass unsafe=True to run(). (We should issue a warning.)
  3. Only support the use case of parallel analysis on a single machine (multi-core workstation or laptop) in the core library.
    • backends: serial, multiprocessing, and dask
    • move HPC enabled parallelization (currently dask.distributed) into a MDAKit of some kind, effectively replacing PMDA (either literally gutting PMDA or naming it "PMDA2" or HPCMDA or xMDA or ....)
      • reduces maintenance burden on core library and CI
      • allows more testing in the wild
      • Separate development cycle, possibly easier for devs to contribute
      • Separate testing
      • possible JOSS/SciPyProc publication??

@shafayetrahat
Copy link

Is this issue solved? Does dask is integrated into MDAnalysis? Also, does dask-cudf integrated into MDAnalysis ? Does MDAnalysis have GPU support?

@hmacdope
Copy link
Member

hmacdope commented Dec 4, 2023

Is this issue solved? Does dask is integrated into MDAnalysis? Also, does dask-cudf integrated into MDAnalysis ? Does MDAnalysis have GPU support?

This issue is currently being worked on in a late stage PR #4162.

In general we do not use pandas so pandas-cudf was not an optimisation target of ours.

MDAnalysis does not have GPU support.

@RMeli
Copy link
Member

RMeli commented Dec 4, 2023

@shafayetrahat, as @hmacdope mentioned this is WIP in #4162. The PR needs some cleanup and review, but it's in an usable state if you want to give it a try. We would very much welcome early feedback! Unfortunately only a handful of analysis methods are currently parallelized in the PR (RMSD/RMSF).

For more information you can have a look at @marinegor's blog posts, describing his work during GSoC. In particular, the GSoC Summary.

There is also a gist with some preliminary analysis.

@shafayetrahat
Copy link

shafayetrahat commented Dec 4, 2023 via email

@shafayetrahat
Copy link

shafayetrahat commented Dec 4, 2023 via email

@RMeli
Copy link
Member

RMeli commented Dec 4, 2023

Though my analysis is not rmsf/rmsd surely I want to look at the code to
check how it's done.

The PR provides support to add parallelization to your own analysis.

The code is available in PR #4162, which you can check out locally.

Does you guys have future plans for GPU support?

There is no such plan at this time.

talagayev added a commit to talagayev/mdanalysis that referenced this issue Sep 9, 2024
moved the parallelization below Issue MDAnalysis#4158
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment