Skip to content

Commit

Permalink
Merge pull request #37 from actinia-org/persistent_processing
Browse files Browse the repository at this point in the history
Add persistent processing
  • Loading branch information
anikaweinmann authored Dec 21, 2023
2 parents 6aa6973 + 427c4d9 commit 91a740e
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 28 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@ Types of changes


## [Unreleased]
### Added
- persistent processing


## [0.3.2] - 2023-12-21
### Added
- set up from PyPI


## [0.3.1] - 2023-12-08
### Fixed
- fix mapset deletion if mapset does not exist
- fix location creation if location already exists
- fix mapset creation if mapset exists


## [0.3.0] - 2023-03-01
### Fixed
Expand Down
5 changes: 2 additions & 3 deletions docs/docs/02_installation.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# Installation

You can install the the actinia Python library via:
You can install the actinia Python library via:
```
VERSION="0.3.0"
pip3 install "actinia-python-client @ https://github.com/actinia-org/actinia-python-client/releases/download/${VERSION}/actinia_python_client-${VERSION}-py3-none-any.whl"
pip3 install actinia-python-client
```
For newest version see [releases](https://github.com/actinia-org/actinia-python-client/releases).
41 changes: 41 additions & 0 deletions docs/docs/08_processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ locations = actinia_mundialis.get_locations()
```

## Ephemeral Processing

Start an ephemeral processing job
```
pc = {
Expand All @@ -38,3 +39,43 @@ job.poll_until_finished()
print(job.status)
print(job.message)
```


## Persistent Processing

Start a persistent processing job
```
pc = {
"list": [
{
"id": "r_mapcalc",
"module": "r.mapcalc",
"inputs": [
{
"param": "expression",
"value": "baum=5"
}
]
}
],
"version": "1"
}
# create user mapset (persistent processing can only be done in a user mapset)
mapset_name = "test_mapset"
locations["nc_spm_08"].create_mapset(mapset_name)
# create job
job = locations["nc_spm_08"].mapsets[mapset_name].create_processing_job(pc, "test")
job.poll_until_finished()
print(job.status)
print(job.message)
# print rasters in "test_mapset"
rasters = locations["nc_spm_08"].mapsets[mapset_name].get_raster_layers()
print(rasters.keys())
# delete user mapset
locations["nc_spm_08"].delete_mapset(mapset_name)
print(locations["nc_spm_08"].mapsets.keys())
```
26 changes: 8 additions & 18 deletions src/actinia/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import requests
import os
import sys
from datetime import datetime

from actinia.resources.logger import log
from actinia.region import Region
from actinia.mapset import Mapset
from actinia.job import Job
from actinia.utils import set_job_names


class Location:
Expand All @@ -46,7 +46,7 @@ def __init__(self, name, actinia, auth):
self.region = None
self.__actinia = actinia
self.__auth = auth
self.mapsets = None
self.mapsets = dict()

def __request_info(self):
"""
Expand Down Expand Up @@ -95,15 +95,15 @@ def get_mapsets(self):
"""
Return mapsets
"""
if self.mapsets is None:
if self.mapsets is None or len(self.mapsets) == 0:
self.__request_mapsets()
return self.mapsets

def create_mapset(self, name):
"""
Creates a mapset within the location.
"""
if self.mapsets is None:
if self.mapsets is None or len(self.mapsets) == 0:
self.__request_mapsets()
mapset = Mapset.create_mapset_request(
name, self.name, self.__actinia, self.__auth
Expand All @@ -118,7 +118,7 @@ def delete_mapset(self, name):
"""
Deletes a mapset and returns an updated mapset list for the location.
"""
if self.mapsets is None:
if self.mapsets is None or len(self.mapsets) == 0:
self.__request_mapsets()
Mapset.delete_mapset_request(
name, self.name, self.__actinia, self.__auth
Expand All @@ -143,16 +143,6 @@ def __validate_process_chain(self, pc, type):
)
return resp

def __set_job_names(self, name, default_name="unknown_job"):
now = datetime.now()
if name is None:
orig_name = default_name
name = f"job_{now.strftime('%Y%d%m_%H%M%S')}"
else:
orig_name = name
name += f"_{now.strftime('%Y%d%m_%H%M%S')}"
return orig_name, name

def validate_process_chain_sync(self, pc):
"""Validate a process chain (sync)."""
resp = self.__validate_process_chain(pc, "sync")
Expand All @@ -167,7 +157,7 @@ def validate_process_chain_sync(self, pc):
def validate_process_chain_async(self, pc, name=None):
"""Validate a process chain (async)."""
actiniaResp = self.__validate_process_chain(pc, "async")
orig_name, name = self.__set_job_names(name, "unknown_validation_job")
orig_name, name = set_job_names(name, "unknown_validation_job")
if actiniaResp.status_code != 200:
raise Exception(
f"Error {actiniaResp.status_code}: {actiniaResp.text}"
Expand All @@ -177,7 +167,7 @@ def validate_process_chain_async(self, pc, name=None):
self.__actinia.jobs[name] = job
return job

# * /locations/{location_name}/processing_async_export
# TODO: * /locations/{location_name}/processing_async_export
# - POST (ephemeral database)
# * (/locations/{location_name}/processing_export
# - POST (ephemeral database))
Expand All @@ -186,7 +176,7 @@ def create_processing_export_job(self, pc, name=None):
Creates a processing_export job with a given PC.
"""
# set name
orig_name, name = self.__set_job_names(name)
orig_name, name = set_job_names(name)
# set endpoint in url
url = (
f"{self.__actinia.url}/locations/{self.name}/"
Expand Down
50 changes: 44 additions & 6 deletions src/actinia/mapset.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@
__maintainer__ = "Anika Weinmann"

import json
import os
import requests
from actinia.region import Region
from enum import Enum, unique

from actinia.region import Region
from actinia.resources.logger import log
from actinia.raster import Raster
from actinia.vector import Vector
from actinia.utils import request_and_check
from actinia.job import Job
from actinia.utils import set_job_names


@unique
Expand Down Expand Up @@ -457,6 +460,46 @@ def delete_vector(self, layer_name):
del self.vector_layers[layer_name]
log.info(f"Vector <{layer_name}> successfully deleted")

# TODO: * (/locations/{location_name}/mapsets/{mapset_name}/processing
# - POST (persistent, asyncron))
# * /locations/{location_name}/mapsets/{mapset_name}/processing_async
# - POST (persistent, asyncron)
def create_processing_job(self, pc, name=None):
"""
Creates a processing job with a given PC.
"""
# set name
orig_name, name = set_job_names(name)
# set endpoint in url
url = (
f"{self.__actinia.url}/locations/{self.__location_name}/"
f"mapsets/{self.name}/processing_async"
)
# make POST request
postkwargs = dict()
postkwargs["headers"] = self.__actinia.headers
postkwargs["auth"] = self.__auth
if isinstance(pc, str):
if os.path.isfile(pc):
with open(pc, "r") as pc_file:
postkwargs["data"] = pc_file.read()
else:
postkwargs["data"] = pc
elif isinstance(pc, dict):
postkwargs["data"] = json.dumps(pc)
else:
raise Exception("Given process chain has no valid type.")

try:
actiniaResp = requests.post(url, **postkwargs)
except requests.exceptions.ConnectionError as e:
raise e
# create a job
resp = json.loads(actiniaResp.text)
job = Job(orig_name, self.__actinia, self.__auth, resp)
self.__actinia.jobs[name] = job
return job


# TODO:
# * (/locations/{location_name}/mapsets/{mapset_name}/lock - GET, DELETE, POST)
Expand All @@ -465,8 +508,3 @@ def delete_vector(self, layer_name):
# - DELETE, PUT
# * /locations/{location_name}/mapsets/{mapset_name}/strds - GET
# * "/locations/{location_name}/mapsets/{mapset_name}/vector_layers"

# * (/locations/{location_name}/mapsets/{mapset_name}/processing
# - POST (persistent, asyncron))
# * /locations/{location_name}/mapsets/{mapset_name}/processing_async
# - POST (persistent, asyncron)
12 changes: 12 additions & 0 deletions src/actinia/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import json
import requests
from datetime import datetime


def request_and_check(url, auth, status_code=200):
Expand All @@ -48,3 +49,14 @@ def request_and_check(url, auth, status_code=200):
if resp.status_code != status_code:
raise Exception(f"Error {resp.status_code}: {resp.text}")
return json.loads(resp.text)


def set_job_names(name, default_name="unknown_job"):
now = datetime.now()
if name is None:
orig_name = default_name
name = f"job_{now.strftime('%Y%d%m_%H%M%S')}"
else:
orig_name = name
name += f"_{now.strftime('%Y%d%m_%H%M%S')}"
return orig_name, name
2 changes: 1 addition & 1 deletion tests/test_process_chain_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
}


class TestActiniaLocation(object):
class TestActiniaPCValidation(object):
@classmethod
def setup_class(cls):
cls.testactinia = Actinia(ACTINIA_BASEURL, ACTINIA_VERSION)
Expand Down
110 changes: 110 additions & 0 deletions tests/test_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#######
# actinia-python-client is a python client for actinia - an open source REST
# API for scalable, distributed, high performance processing of geographical
# data that uses GRASS GIS for computational tasks.
#
# Copyright (c) 2023 mundialis GmbH & Co. KG
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#######

__license__ = "GPLv3"
__author__ = "Anika Weinmann"
__copyright__ = "Copyright 2023, mundialis GmbH & Co. KG"
__maintainer__ = "Anika Weinmann"


from actinia import Actinia
from actinia.job import Job

from .actinia_config import (
ACTINIA_BASEURL,
ACTINIA_VERSION,
ACTINIA_AUTH,
LOCATION_NAME,
)

NEW_MAPSET_NAME = "new_test_mapset"
PC = {
"list": [
{
"id": "r_mapcalc",
"module": "r.mapcalc",
"inputs": [{"param": "expression", "value": "elevation=42"}],
},
{
"module": "exporter",
"id": "elevation_export",
"outputs": [
{
"param": "map",
"value": "elevation",
"export": {"format": "GTiff", "type": "raster"},
}
],
},
],
"version": "1",
}


class TestActiniaProcessing(object):
@classmethod
def setup_class(cls):
cls.testactinia = Actinia(ACTINIA_BASEURL, ACTINIA_VERSION)
assert isinstance(cls.testactinia, Actinia)
cls.testactinia.set_authentication(ACTINIA_AUTH[0], ACTINIA_AUTH[1])
cls.testactinia.get_locations()
cls.testactinia.locations[LOCATION_NAME].create_mapset(NEW_MAPSET_NAME)

@classmethod
def teardown_class(cls):
if NEW_MAPSET_NAME in cls.testactinia.locations[LOCATION_NAME].mapsets:
cls.testactinia.locations[LOCATION_NAME].delete_mapset(
NEW_MAPSET_NAME
)

def test_async_ephemeral_processing(self):
"""Test async ephemeral processing."""
job = self.testactinia.locations[
LOCATION_NAME
].create_processing_export_job(PC)
assert isinstance(job, Job), "No job returned!"
# poll job
job.poll_until_finished()
assert job.status == "finished", "Job status not 'finished'!"
# check export url
assert job.urls["resources"][0].endswith("elevation.tif")

def test_async_persistent_processing(self):
"""Test async persistent processing."""
job = (
self.testactinia.locations[LOCATION_NAME]
.mapsets[NEW_MAPSET_NAME]
.create_processing_job(PC)
)
assert isinstance(job, Job), "No job returned!"
# poll job
job.poll_until_finished()
assert job.status == "finished", "Job status not 'finished'!"
# check created raster
rasters = (
self.testactinia.locations[LOCATION_NAME]
.mapsets[NEW_MAPSET_NAME]
.get_raster_layers()
)
assert "elevation" in rasters

0 comments on commit 91a740e

Please sign in to comment.