A Pipeline / Chain of Responsibility design pattern implementation based on Laravel's Pipeline implementation.
A Pipeline
is an object that accepts input data and sends it through a series of handlers (or "pipes") — which are functions, closures, and/or classes — to get a result at the end.
flowchart LR
input[Input data]
subgraph Pipeline
pipe1[Pipe 1]
pipe2[Pipe 2]
pipe3[Pipe 3]
pipe1 --> pipe2
pipe2 --> pipe3
end
output[Output data]
input --> pipe1
pipe3 --> output
- Installation
- Getting started
- Methods
pipe()
(aliases:add_pipe()
)send()
set_container()
(aliases:setContainer()
)then()
(aliases:run()
)then_return()
(aliases:run_and_return()
,thenReturn()
)through()
(aliases:pipes()
)via()
Install PyPipeline via pip:
pip install pipelinechain
Luckily, there isn't a whole lot to Pipelines from a code perspective, so implementing them is pretty easy. We'll walk through some of the basic concepts.
Important note: The examples provide a string as input data. That is just for simplicity's sake! You can actually pass whatever you want - a scalar, an object, an array, whatever.
Let's say you have a string that you want to pass through a series of steps in order to manipulate it. You can create a pipeline to do that like so:
flowchart LR
input[String]
subgraph Pipeline
pipe1[title]
pipe2[trim]
pipe1 --> pipe2
end
output[Output data]
input --> pipe1
pipe2 --> output
from pipelinechain import Pipeline
# Create a new pipeline instance.
pipeline = Pipeline()
# Send a string through the pipeline.
result = pipeline.send(' hello world ').through([
str.title,
str.strip
]).then_return()
print(result) # Output: 'Hello world'
You don't need to build the pipeline all at once, you can spread it out over a number of lines.
flowchart LR
input[String]
subgraph Pipeline
pipe1[title]
pipe2[trim]
pipe3[reverse_string]
pipe1 --> pipe2
pipe2 --> pipe3
end
output[Output data]
input --> pipe1
pipe3 --> output
from pipelinechain import Pipeline
# Create a new pipeline instance.
pipeline = Pipeline()
# Set the pipes you want to use.
pipeline.through([ str.title, str.strip ])
# Add another transformation.
def replace(s):
return s.replace('Hello', 'Goodbye')
# Add it to the pipeline.
pipeline.pipe(replace)
# Send data through the pipeline.
result = pipeline.send(' hello world ').then_return()
print(result) # Output: 'Goodbye World'
If you have a more complicated function that you wish to use as a pipe, you can pass in a callable instead of a string. Your closure will need to accept two parameters, the first being the input data and the second being the next item in the pipeline.
flowchart LR
input[String]
subgraph Pipeline
pipe1[custom_pipe]
pipe2[captialize]
pipe1 --> pipe2
end
output[Output data]
input --> pipe1
pipe2 --> output
from pipelinechain import Pipeline
pipeline = Pipeline()
def custom_pipe(passable, next_pipe):
passable = passable.replace('hello', 'goodbye')
return next_pipe(passable)
pipeline.through([custom_pipe, str.title])
result = pipeline.send('hello world').then_return()
print(result) # Output: 'Goodbye world'
You can even create your own classes to use as pipes in the pipeline. For a class to be usable in the pipeline, it needs a method that accepts two parameters, the first being the input data and the second being the next item in the pipeline.
By default, the Pipeline expects that the method is called handle
. If you want to use that method name, you can
optionally implement the StellarWP\Pipeline\Contracts\Pipe
interface to enforce that method convention.
First class:
class TitlePipe:
def handle(self, passable, next_pipe):
return next_pipe(passable.title())
Second class:
class StripPipe:
def handle(self, passable, next_pipe):
return next_pipe(passable.strip())
flowchart LR
input[String]
subgraph Pipeline
pipe1[TitlePipe::handle]
pipe2[StripPipe::handle]
pipe1 --> pipe2
end
output[Output data]
input --> pipe1
pipe2 --> output
from pipelinechain import Pipeline
pipeline = Pipeline().through([TitlePipe(), StripPipe()])
result = pipeline.send(' hello world ').then_return()
print(result) # Output: 'Hello world'
If you want to use classes but want to use a different method than the expected default (handle
), you can declare
the alternate method name using the via()
method.
First class:
class TitlePipe:
def execute(self, passable, next_pipe):
return next_pipe(passable.title())
Second class:
class StripPipe:
def execute(self, passable, next_pipe):
return next_pipe(passable.strip())
flowchart LR
input[String]
subgraph Pipeline
pipe1[ReversePipe::execute]
pipe2[StripPipe::execute]
pipe1 --> pipe2
end
output[Output data]
input --> pipe1
pipe2 --> output
from pipelinechain import Pipeline
pipeline = Pipeline().via('execute').through([StripPipe(), ReversePipe()])
result = pipeline.send(' hello ').then_return()
print(result) # Output: 'Hello'
Sometimes in the middle of a pipeline, you want to stop processing the rest of the pipes and return a value. Luckily, you
can do this with a return
statement!
from pipelinechain import Pipeline
def check_content(passable, next_pipe):
if 'stop' in passable:
return 'Early termination'
return next_pipe(passable)
pipeline = Pipeline().through([check_content, str.upper])
result = pipeline.send('please stop').then_return()
print(result) # Output: 'Early termination'
Sometimes you may want to do more than returning the result when the pipeline completes. You can do that by
using the then()
(or its alias, run()
) method instead of then_return()
.
flowchart LR
input[String]
pipe3[Closure]
subgraph Pipeline
pipe1[strip]
pipe2[upper]
pipe1 --> pipe2
end
output[Output data]
input --> pipe1
pipe2 --> pipe3
pipe3 --> output
from pipelinechain import Pipeline
pipeline = Pipeline().through([str.strip, str.upper])
result = pipeline.send(' hello world ').then(lambda x: len(x))
print(result) # Output: 11
This method is used to add a pipe to the pipeline.
def pipe(pipes: Union[List, Any]): Pipeline
Aliases: add_pipe()
pipeline.pipe( str.strip )
# or
pipeline.add_pipe( str.strip )
# or
pipeline.pipe( [ str.title, str.strip ] )
This method is used to set the object being passed through the pipeline.
def send( passable: Any ): Pipeline
# Send a scalar.
pipeline.send( 'Some string' )
# Send an object.
pipeline.send( my_object )
This method is used to run the pipeline and return the result.
def then( destination: Optional[Callable] = None ): Any
Aliases: run()
pipeline.then()
# Use the alias.
pipeline.run()
# Provide a function to run before returning the result.
pipeline.then( str.strip )
# Provide a closure to run before returning the result.
pipeline.then( lambda passable: passable.strip() )
# Provide an object as a pipe to run before returning the result.
pipeline.then( StripPipe() )
This method is used to run the pipeline and return the result.
def then_return(): Any
Aliases: run_and_return()
, thenReturn()
pipeline.then_return()
# Use an alias.
pipeline.thenReturn()
# Use the other alias.
pipeline.run_and_return()
This method is used to set the handlers (or "pipes") that are used to process the data.
def through( pipes: Union[List, Any] ): Pipeline
Aliases: pipes()
# You can provide any number of pipes.
pipeline.through([ str.title, str.strip ])
# Using the alias.
pipeline.pipes([ str.title, str.strip ])
# Pass closures as pipes.
pipeline.through([ str.title, lambda passable, next: next_pipe(passable.strip)])
# Pass objects as pipes.
pipeline.through([ TitlePipe(), StripPipe() ])
This method is used to set the method to call on all the pipes in the pipeline.
def via( method: str ): Pipeline
# Set the method used in all classes in the pipeline to process the data as `execute()`.
pipeline.via( 'execute' )
# Set the method used in all classes in the pipeline to process the data as `borkborkbork()`.
pipeline.via( 'borkborkbork' )