The primary object in Lithops is the executor. The standard way to get everything set up is to import lithops
, and create an instance of one of the available modes of executions.
Lithops is shipped with 3 modes of execution: Localhost, Serverless and Standalone. In this sense, each mode of execution has its own executor class:
lithops.LocalhostExecutor()
: Executor that uses local processes to run jobs in the local machine.lithops.ServerlessExecutor()
: Executor to run jobs in one of the available serverless compute backends.lithops.StandaloneExecutor()
: Executor to run jobs in one of the available standalone compute backends.
Additionally, Lithops includes a top-level function executor, which encompasses all three previous executors:
lithops.FunctionExecutor()
: Generic executor that will use the configuration to determine its mode of execution, i.e., based on the configuration it will be localhost, serverless or standalone.
By default, the executor load the configuration from the config file. Alternatively, you can pass the configuration with a python dictionary. In any case, note that all the parameters set in the executor will overwrite those set in the configuration.
The available calls within an executor are:
API Call | Type | Description |
---|---|---|
call_async() | Async. | Method used to spawn one function activation |
map() | Async. | Method used to spawn multiple function activations |
map_reduce() | Async. | Method used to spawn multiple function activations with one (or multiple) reducers |
wait() | Sync. | Wait for the function activations to complete. It blocks the local execution until all the function activations finished their execution (configurable) |
get_result() | Sync. | Method used to retrieve the results of all function activations. The results are returned within an ordered list, where each element of the list is the result of one activation |
plot() | Sync. | Method used to create execution plots |
job_summary() | Sync. | Method used to create a summary file of the executed jobs. It includes times and money |
clean() | Async. | Method used to clean the temporary data generated by Lithops |
LocalhostExecutor(**kwargs)
Initialize and return Localhost executor object.
Parameter | Default | Description |
---|---|---|
config | None | Settings passed in here will override those in lithops_config |
runtime | None | Name of the docker image to run the functions |
workers | cpu_count | Max number of parallel workers |
storage | localhost | Storage backend to store temp data |
monitoring | storage | Monitoring system implementation. One of: storage, rabbitmq |
log_level | INFO | Log level printing (INFO, DEBUG, ...). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled |
Usage:
import lithops
fexec = lithops.LocalhostExecutor()
ServerlessExecutor(**kwargs)
Initialize and return a Serverless executor object.
Parameter | Default | Description |
---|---|---|
config | None | Settings passed in here will override those in lithops_config |
backend | ibm_cf | Serverless compute backend to run the functions |
runtime | None | Name of the docker image to run the functions |
runtime_memory | 256 | Memory (in MB) to use to run the functions |
storage | ibm_cos | Storage backend to store temp data |
workers | depends of the backend | Max number of parallel workers |
monitoring | storage | Monitoring system implementation. One of: storage, rabbitmq |
remote_invoker | False | Spawn a function that will perform the actual job invocation (True/False) |
log_level | INFO | Log level printing (INFO, DEBUG, ...). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled |
Usage:
import lithops
fexec = lithops.ServerlessExecutor()
StandaloneExecutor(**kwargs)
Initialize and return an Standalone executor object.
Parameter | Default | Description |
---|---|---|
config | None | Settings passed in here will override those in lithops_config |
backend | ibm_vpc | Standalone compute backend to run the functions |
runtime | python3 | Name of the runtime to run the functions. It can be a docker image or python3 |
workers | cpu_count | Max number of parallel workers |
storage | ibm_cos | Storage backend to store temp data |
monitoring | storage | Monitoring system implementation. One of: storage, rabbitmq |
log_level | INFO | Log level printing (INFO, DEBUG, ...). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled |
Usage:
import lithops
fexec = lithops.StandaloneExecutor()
FunctionExecutor(**kwargs)
Initialize and return a generic function executor.
Parameter | Default | Description |
---|---|---|
mode | serverless | Execution mode. One of: localhost, serverless or standalone |
config | None | Settings passed in here will override those in lithops_config |
backend | None | Compute backend to run the functions |
runtime | None | Name of the runtime to run the functions. |
runtime_memory | None | Memory (in MB) to use to run the functions |
workers | None | Max number of parallel workers |
storage | ibm_cos | Storage backend to store temp data |
monitoring | storage | Monitoring system implementation. One of: storage, rabbitmq |
remote_invoker | False | Spawn a function that will perform the actual job invocation (True/False) |
log_level | INFO | Log level printing (INFO, DEBUG, ...). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled |
Usage:
import lithops
fexec = lithops.FunctionExecutor()
Spawn only one function activation.
call_async(func, data, **kwargs)
Parameter | Default | Description |
---|---|---|
func | The function to map over the data | |
data | A single value of data | |
extra_env | None | Additional environment variables for CF environment |
runtime_memory | 256 | Memory (in MB) to use to run the functions |
timeout | 600 | Max time per function activation (seconds) |
include_modules | [] | Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None |
exclude_modules | [] | Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules |
-
Returns: One future for each job (Futures are also internally stored by Lithops).
-
Usage:
future = fexec.call_async(foo, data)
-
Code example: call_async.py
Spawn multiple function activations based on the items of an input list.
map(map_function, map_iterdata, **kwargs)
Parameter | Default | Description |
---|---|---|
map_function | The function to map over the data | |
map_iterdata | An iterable of input data (e.g python list) | |
chunksize | 1 | Split map_iteradata in chunks of this size. Lithops spawns 1 worker per resulting chunk |
worker_processes | 1 | Number of concurrent/parallel processes in each worker |
extra_args | None | Additional arguments to pass to each map_function activation |
extra_env | None | Additional environment variables for CF environment |
runtime_memory | 256 | Memory (in MB) to use to run the functions |
timeout | 600 | Max time per function activation (seconds) |
include_modules | [] | Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None |
exclude_modules | [] | Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules |
obj_chunk_size | None | Used for data_processing. Chunk size to split each object in bytes. Must be >= 1MiB. 'None' for processing the whole file in one function activation |
obj_chunk_number | None | Used for data_processing. Number of chunks to split each object. 'None' for processing the whole file in one function activation. chunk_n has prevalence over chunk_size if both parameters are set |
obj_newline | '\n' | New line character for keeping line integrity of partitions. 'None' for disabling line integrity logic and get partitions of the exact same size in the functions |
-
Returns: A list with size len(map_iterdata) of futures for each job (Futures are also internally stored by Lithops).
-
Usage:
iterdata = [1, 2, 3, 4] futures = fexec.map(foo, iterdata)
-
Code example: map.py
Spawn multiple map_function activations, based on the items of an input list, eventually spawning one (or multiple) reduce_function activations over the results of the map phase.
map_reduce(map_function, map_iterdata, reduce_function, **kwargs)
Parameter | Default | Description |
---|---|---|
map_function | The function to map over the data | |
map_iterdata | An iterable of input data (e.g python list) | |
chunksize | 1 | Split map_iteradata in chunks of this size. Lithops spawns 1 worker per resulting chunk |
worker_processes | 1 | Number of concurrent/parallel processes in each worker |
extra_args | None | Additional arguments to pass to each map_function activation |
reduce_function | The function to map over the results of map_function | |
spawn_reducer | 20 | Percentage of done map functions before spawning the reduce function. By default the reducer is spawned when 20% of the map activations are done. |
extra_env | None | Additional environment variables for CF environment |
map_runtime_memory | 256 | Memory (in MB) to use to run the map_function |
reduce_runtime_memory | 256 | Memory (in MB) to use to run the reduce_function |
timeout | 600 | Max time per function activation (seconds) |
include_modules | [] | Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None |
exclude_modules | [] | Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules |
obj_chunk_size | None | Used for data_processing. Chunk size to split each object in bytes. Must be >= 1MiB. 'None' for processing the whole file in one function activation |
obj_chunk_number | None | Used for data_processing. Number of chunks to split each object. 'None' for processing the whole file in one function activation. chunk_n has prevalence over chunk_size if both parameters are set |
obj_newline | '\n' | New line character for keeping line integrity of partitions. 'None' for disabling line integrity logic and get partitions of the exact same size in the functions |
obj_reduce_by_key | False | Used for data_processing. Set one reducer per object after running the partitioner (reduce-by-key) |
-
Returns: A list with size len(map_iterdata) of futures for each job (Futures are also internally stored by Lithops).
-
Usage:
iterdata = [1, 2, 3, 4] futures = fexec.map_reduce(foo, iterdata, bar)
-
Code example: map_reduce.py
Waits for the function activations to finish.
wait(**kwargs)
Parameter | Default | Description |
---|---|---|
fs | None | List of futures to wait. If None, Lithops uses the internally stored futures |
throw_except | True | Re-raise exception if call raised |
return_when | ALL_COMPLETED | One of 'ALL_COMPLETED', 'ANY_COMPLETED', 'ALWAYS' |
download_results | False | Whether or not download the results while monitoring activations |
timeout | None | Timeout of waiting for results (in seconds) |
THREADPOOL_SIZE | 128 | Number of threads to use waiting for results |
WAIT_DUR_SEC | 1 | Time interval between each check (seconds) if no rabbitmq_monitor activated |
show_progressbar | True | whether or not to show the progress bar |
-
Returns:
(fs_done, fs_notdone)
wherefs_done
is a list of futures that have completed andfs_notdone
is a list of futures that have not completed. -
Usage:
iterdata = [1, 2, 3, 4] futures = fexec.map(foo, iterdata) fexec.wait()
-
Code example: wait.py
Gets the results from all the function activations. It internally makes use of the Executor.wait()
method.
get_result(**kwargs)
Parameter | Default | Description |
---|---|---|
fs | None | List of futures to get the results. If None, Lithops uses the internally stored futures |
throw_except | True | Re-raise exception if call raised |
timeout | None | Timeout of waiting for results (in seconds) |
THREADPOOL_SIZE | 128 | Number of threads to use waiting for results |
WAIT_DUR_SEC | 1 | Time interval between each check (seconds) if no rabbitmq_monitor activated |
show_progressbar | True | whether or not to show the progress bar |
-
Returns: The results are returned within an ordered list, where each element of the list is the result of one activation.
-
Usage:
iterdata = [1, 2, 3, 4] futures = fexec.map(foo, iterdata) results = fexec.get_result()
-
Code example: call_async.py, map.py, map_reduce.py
Creates 2 detailed execution plots: A timeline plot and a histogram plot.
plot(**kwargs)
Parameter | Default | Description |
---|---|---|
fs | None | List of futures to plot. If None, Lithops uses the internally stored futures |
dst | None | Path to destination file, either absolute or relative. If set, you must specify the path + the file prefix (see example below), then lithops will generate the prefix_histogram.png and prefix_timeline.png files. If None, Lithops will create a new folder called plots in the current directory and use the current timestamp as file prefix |
-
Returns: Nothing. It stores 2 different plots in the selected
dst
path. -
Usage:
iterdata = [1, 2, 3, 4] fexec.map(foo, iterdata) results = fexec.get_result() # or fexec.wait() # The next command will generate test_timeline.png and test_histogram.png in ~/lithops_plots fexec.plot(dst='~/lithops_plots/test')
-
Example:
Cleans the temporary data generated by Lithops in IBM COS. This process runs asynchronously to the main execution since Lithops starts another process to do the task. If data_cleaner=True
(default), this method is executed automatically after calling get_result()
.
clean(**kwargs)
Parameter | Default | Description |
---|---|---|
fs | None | List of futures to clean temp data. If None, Lithops uses the internally stored futures |
cs | None | List of cloudobjects to clean |
clean_cloudobjects | True | Clean or not the cloudobjects generated in the executor |
spawn_cleaner | True | Spawn cleaner process. If false it stores the data to be cleaned in a tmp dir |
-
Returns: Nothing.
-
Usage:
iterdata = [1, 2, 3, 4] futures = fexec.map(foo, iterdata) results = fexec.get_result() fexec.clean()
-
Code example: map.py
Function chaining is a pattern where multiple functions are called on the same executor consecutively. Using the same lithops.FunctionExecutor
object reference, multiple functions can be invoked. It increases the readability of the code and means less redundancy. This means we chain multiple functions together with the same element reference. It’s not necessary to attach the lithops.FunctionExecutor
reference multiple times for each function call.
This patter is specially useful when the output of one invocation is the input of another invocation. In this case, Lithops does not download the intermediate results to the local client, instead, the intermediate results are directly read from the next function.
It currently works with the Futures API, and you can chain the map()
, map_reuce()
, wait()
and get_result()
methods. Note that the returning value of one function must match the signature of the next function when chaining multiple map()
calls. View the next examples:
Getting the result from a single map()
call:
import lithops
def my_func1(x):
return x*2
iterdata = [1, 2, 3]
fexec = lithops.FunctionExecutor()
res = fexec.map(my_func1, iterdata).get_result()
print(res)
Chain multiple map() calls and get the final result:
import lithops
def my_func1(x):
return x*2, 5
def my_func2(x, y):
return x+y
iterdata = [1, 2, 3]
fexec = lithops.FunctionExecutor()
res = fexec.map(my_func1, iterdata).map(my_func2).get_result()
print(res)
There is no limit in the number of map() calls that can be chained:
def my_func1(x):
return x+2, 5
def my_func2(x, y):
return x+y, 5, 2
def my_func3(x, y, z):
return x+y+z
iterdata = [1, 2, 3]
fexec = lithops.FunctionExecutor()
res = fexec.map(my_func1, iterdata).map(my_func2).map(my_func3).get_result()
print(res)
Alternatively, you can pass the futures
generated in a map()
or map_reduce()
call to the iterdata
parameter with the same effect. Not that in this case you will only get the results of the last map()
execution. Results of intermediate map()
s are never downloaded:
def my_func1(x):
return x+2, 5
def my_func2(x, y):
return x+y, 5, 2
def my_func3(x, y, z):
return x+y+z
iterdata = [1, 2, 3]
fexec = lithops.FunctionExecutor()
futures1 = fexec.map(my_func1, iterdata)
futures2 = fexec.map(my_func2, futures1)
futures3 = fexec.map(my_func3, futures2)
final_result = fexec.get_result()
print(final_result)