Skip to content

Commit

Permalink
Add experimental structured advanced copy workflow
Browse files Browse the repository at this point in the history
Closes #396
  • Loading branch information
Manisha15 committed Feb 17, 2022
1 parent 729408b commit 385d91d
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES/396.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added experimental advanced copy API with support for structured copying.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ include COMMITMENT
include COPYRIGHT
include functest_requirements.txt
include LICENSE
include pulp_deb/app/schema/*
include pulp_deb/tests/functional/sign_deb_release.sh
include pyproject.toml
include requirements.txt
Expand Down
7 changes: 7 additions & 0 deletions pulp_deb/app/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import json
import os

location = os.path.dirname(os.path.realpath(__file__))

with open(os.path.join(location, "copy_config.json")) as copy_config_json:
COPY_CONFIG_SCHEMA = json.load(copy_config_json)
21 changes: 21 additions & 0 deletions pulp_deb/app/schema/copy_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "CopyConfig",
"description": "Config for copying content between repos",
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"additionProperties": false,
"required": [ "source_repo_version", "dest_repo" ],
"properties": {
"source_repo_version": { "type": "string" },
"dest_repo": { "type": "string" },
"dest_base_version": { "type": "integer" },
"content": {
"type": "array",
"items": { "type": "string" }
}
}
}
}
2 changes: 1 addition & 1 deletion pulp_deb/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@

from .remote_serializers import AptRemoteSerializer

from .repository_serializers import AptRepositorySerializer
from .repository_serializers import AptRepositorySerializer, CopySerializer
59 changes: 58 additions & 1 deletion pulp_deb/app/serializers/repository_serializers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from pulpcore.plugin.serializers import RepositorySerializer
from gettext import gettext as _
from pulpcore.plugin.serializers import RepositorySerializer, validate_unknown_fields

from pulp_deb.app.models import AptRepository

from jsonschema import Draft7Validator
from rest_framework import serializers
from pulp_deb.app.schema import COPY_CONFIG_SCHEMA


class AptRepositorySerializer(RepositorySerializer):
"""
Expand All @@ -11,3 +16,55 @@ class AptRepositorySerializer(RepositorySerializer):
class Meta:
fields = RepositorySerializer.Meta.fields
model = AptRepository


class CopySerializer(serializers.Serializer):
"""
A serializer for Content Copy API.
"""

config = serializers.JSONField(
help_text=_("A JSON document describing sources, destinations, and content to be copied")
)

structured = serializers.BooleanField(
help_text=_(
"Also copy any distributions, components, and releases as needed for any packages "
"being copied. This will allow for structured publications of the target repository."
"Default is set to True"
),
default=True,
)

dependency_solving = serializers.BooleanField(
help_text=_(
"Also copy dependencies of any packages being copied. NOT YET"
'IMPLEMENTED! You must keep this at "False"!'
),
default=False,
)

def validate(self, data):
"""
Validate that the Serializer contains valid data.
Set the DebRepository based on the RepositoryVersion if only the latter is provided.
Set the RepositoryVersion based on the DebRepository if only the latter is provided.
Convert the human-friendly names of the content types into what Pulp needs to query on.
"""
super().validate(data)

if hasattr(self, "initial_data"):
validate_unknown_fields(self.initial_data, self.fields)

if "config" in data:
validator = Draft7Validator(COPY_CONFIG_SCHEMA)

err = []
for error in sorted(validator.iter_errors(data["config"]), key=str):
err.append(error.message)
if err:
raise serializers.ValidationError(
_("Provided copy criteria is invalid:'{}'".format(err))
)

return data
1 change: 1 addition & 0 deletions pulp_deb/app/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# flake8: noqa
from .publishing import publish, publish_verbatim
from .synchronizing import synchronize
from .copy import copy_content
119 changes: 119 additions & 0 deletions pulp_deb/app/tasks/copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from django.db import transaction
from django.db.models import Q

from pulpcore.plugin.models import Content, RepositoryVersion

from pulp_deb.app.models import AptRepository, Package, ReleaseArchitecture, PackageReleaseComponent

import logging
from gettext import gettext as _

log = logging.getLogger(__name__)


def find_structured_publish_attributes(content, src_repo_version):
"""
Finds the content for structured publish from packages to be copied and returns it all together.
Args:
content (iterable): Content for structured publish
src_repo_version (pulpcore.models.RepositoryVersion): Source repo version
Returns: Queryset of Content objects that extends intial set of content for structured publish
"""
# Content in the source repository version
package_release_component_ids = src_repo_version.content.filter(
pulp_type=PackageReleaseComponent.get_pulp_type()
).only("pk")
architecture_ids = src_repo_version.content.filter(
pulp_type=ReleaseArchitecture.get_pulp_type()
).only("pk")
package_release_components = PackageReleaseComponent.objects.filter(
pk__in=package_release_component_ids
)

children = set()

# Packages to be copied
packages = Package.objects.filter(pk__in=content)
children.update(packages.values_list("pk", flat=True))

if len(content) != len(packages):
log.warning(_("Additional data with packages is provided. Removing from the content list."))

# List of all architectures
architectures = ReleaseArchitecture.objects.filter(pk__in=architecture_ids).values_list(
"pk", flat=True
)
children.update(architectures)

# Package release components, release components, release to be copied based on packages
for pckg in package_release_components.iterator():
if pckg.package in packages:
children.update([pckg.pk, pckg.release_component.pk, pckg.release_component.release.pk])

return Content.objects.filter(pk__in=children)


@transaction.atomic
def copy_content(config, structured, dependency_solving):
"""
Copy content from one repo to another.
Args:
source_repo_version_pk: repository version primary key to copy units from
dest_repo_pk: repository primary key to copy units into
criteria: a dict that maps type to a list of criteria to filter content by. Note that this
criteria MUST be validated before being passed to this task.
content_pks: a list of content pks to copy from source to destination
"""

def process_entry(entry):
source_repo_version = RepositoryVersion.objects.get(pk=entry["source_repo_version"])
dest_repo = AptRepository.objects.get(pk=entry["dest_repo"])

dest_version_provided = bool(entry.get("dest_base_version"))
if dest_version_provided:
dest_repo_version = RepositoryVersion.objects.get(pk=entry["dest_base_version"])
else:
dest_repo_version = dest_repo.latest_version()

if entry.get("content") is not None:
content_filter = Q(pk__in=entry.get("content"))
else:
content_filter = Q()

log.info(_("Copying: {copy} created").format(copy=content_filter))

return (
source_repo_version,
dest_repo_version,
dest_repo,
content_filter,
dest_version_provided,
)

if not dependency_solving:
# No Dependency Solving Branch
# ============================
for entry in config:
(
source_repo_version,
dest_repo_version,
dest_repo,
content_filter,
dest_version_provided,
) = process_entry(entry)

content_to_copy = source_repo_version.content.filter(content_filter)
if structured:
content_to_copy = find_structured_publish_attributes(
content_to_copy, source_repo_version
)

base_version = dest_repo_version if dest_version_provided else None

with dest_repo.new_version(base_version=base_version) as new_version:
new_version.add_content(content_to_copy)
else:
raise NotImplementedError("Advanced copy with dependency solving is not yet implemented.")
5 changes: 5 additions & 0 deletions pulp_deb/app/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.urls import path

from .viewsets import CopyViewSet

urlpatterns = [path("pulp/api/v3/deb/copy/", CopyViewSet.as_view({"post": "create"}))]
2 changes: 1 addition & 1 deletion pulp_deb/app/viewsets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@

from .remote import AptRemoteViewSet

from .repository import AptRepositoryVersionViewSet, AptRepositoryViewSet
from .repository import AptRepositoryVersionViewSet, AptRepositoryViewSet, CopyViewSet
84 changes: 84 additions & 0 deletions pulp_deb/app/viewsets/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@

from drf_spectacular.utils import extend_schema
from rest_framework.decorators import action
from rest_framework import viewsets
from rest_framework.serializers import ValidationError as DRFValidationError

from pulpcore.plugin.actions import ModifyRepositoryActionMixin
from pulpcore.plugin.serializers import (
AsyncOperationResponseSerializer,
RepositorySyncURLSerializer,
)
from pulpcore.plugin.models import RepositoryVersion
from pulpcore.plugin.tasking import dispatch
from pulpcore.plugin.viewsets import (
OperationPostponedResponse,
RepositoryVersionViewSet,
RepositoryViewSet,
NamedModelViewSet,
)

from pulp_deb.app import models, serializers, tasks
Expand Down Expand Up @@ -76,3 +80,83 @@ class AptRepositoryVersionViewSet(RepositoryVersionViewSet):
"""

parent_viewset = AptRepositoryViewSet


class CopyViewSet(viewsets.ViewSet):
"""
ViewSet for the content copy API endpoint.
"""

serializer_class = serializers.CopySerializer

@extend_schema(
description="Trigger an asynchronous task to copy APT content"
"from one repository into another, creating a new"
"repository version.",
summary="Copy content",
operation_id="copy_content",
request=serializers.CopySerializer,
responses={202: AsyncOperationResponseSerializer},
)
def create(self, request):
"""Copy content."""
serializer = serializers.CopySerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)

config = serializer.validated_data["config"]
structured = serializer.validated_data["structured"]
dependency_solving = serializer.validated_data["dependency_solving"]

config, shared_repos, exclusive_repos = self._process_config(config)

async_result = dispatch(
tasks.copy_content,
shared_resources=shared_repos,
exclusive_resources=exclusive_repos,
args=[config, structured, dependency_solving],
kwargs={},
)
return OperationPostponedResponse(async_result, request)

def _process_config(self, config):
"""
Change the hrefs into pks within config.
This method also implicitly validates that the hrefs map to objects and it returns a list of
repos so that the task can lock on them.
"""
result = []
# exclusive use of the destination repos is needed since new repository versions are being
# created, but source repos can be accessed in a read-only fashion in parallel, so long
# as there are no simultaneous modifications.
shared_repos = []
exclusive_repos = []

for entry in config:
r = dict()
source_version = NamedModelViewSet().get_resource(
entry["source_repo_version"], RepositoryVersion
)
dest_repo = NamedModelViewSet().get_resource(entry["dest_repo"], models.AptRepository)
r["source_repo_version"] = source_version.pk
r["dest_repo"] = dest_repo.pk
shared_repos.append(source_version.repository)
exclusive_repos.append(dest_repo)

if "dest_base_version" in entry:
try:
r["dest_base_version"] = dest_repo.versions.get(
number=entry["dest_base_version"]
).pk
except RepositoryVersion.DoesNotExist:
message = _(
"Version {version} does not exist for repository " "'{repo}'."
).format(version=entry["dest_base_version"], repo=dest_repo.name)
raise DRFValidationError(detail=message)

if entry.get("content") is not None:
r["content"] = []
for c in entry["content"]:
r["content"].append(NamedModelViewSet().extract_pk(c))
result.append(r)

return result, shared_repos, exclusive_repos

0 comments on commit 385d91d

Please sign in to comment.