From 30fd1de0859ed8317d8ef717c64601f671d3308b Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Tue, 3 May 2022 23:11:08 +0800 Subject: [PATCH] Refactor process naming logic. (#12533) Follow up to #12503. Combine the naming logic to avoid duplication as they are mostly similar. Since Kubernetes has stricter conventions, we use the stricter convention throughout. Move the naming function to DockerProcessFactory. --- .../workers/process/DockerProcessFactory.java | 26 +--------- .../workers/process/KubeProcessFactory.java | 49 ++----------------- .../workers/process/ProcessFactory.java | 42 ++++++++++++++++ ...ctoryTest.java => ProcessFactoryTest.java} | 19 ++++--- 4 files changed, 60 insertions(+), 76 deletions(-) rename airbyte-workers/src/test/java/io/airbyte/workers/process/{KubeProcessFactoryTest.java => ProcessFactoryTest.java} (65%) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java index 61c33bd6db38..9dae81f6bf0f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java @@ -23,15 +23,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DockerProcessFactory implements ProcessFactory { private static final Logger LOGGER = LoggerFactory.getLogger(DockerProcessFactory.class); - private static final String VERSION_DELIMITER = ":"; - private static final String DOCKER_DELIMITER = "/"; + private static final int DOCKER_NAME_LEN_LIMIT = 128; private static final Path DATA_MOUNT_DESTINATION = Path.of("/data"); private static final Path LOCAL_MOUNT_DESTINATION = Path.of("/local"); @@ -117,7 +115,7 @@ public Process create(final String jobId, rebasePath(jobRoot).toString(), // rebases the job root on the job data mount "--log-driver", "none"); - final String containerName = createContainerName(imageName, jobId, attempt); + final String containerName = ProcessFactory.createProcessName(imageName, jobId, attempt, DOCKER_NAME_LEN_LIMIT); cmd.add("--name"); cmd.add(containerName); @@ -169,26 +167,6 @@ public Process create(final String jobId, } } - private static String createContainerName(final String fullImagePath, final String jobId, final int attempt) { - final var noVersion = fullImagePath.split(VERSION_DELIMITER)[0]; - - final var nameParts = noVersion.split(DOCKER_DELIMITER); - var imageName = nameParts[nameParts.length - 1]; - - final var randSuffix = RandomStringUtils.randomAlphabetic(5).toLowerCase(); - final String suffix = "sync" + "-" + jobId + "-" + attempt + "-" + randSuffix; - - var podName = imageName + "-" + suffix; - final var podNameLenLimit = 128; - if (podName.length() > podNameLenLimit) { - final var extra = podName.length() - podNameLenLimit; - imageName = imageName.substring(extra); - podName = imageName + "-" + suffix; - } - - return podName; - } - private Path rebasePath(final Path jobRoot) { final Path relativePath = workspaceRoot.relativize(jobRoot); return DATA_MOUNT_DESTINATION.resolve(relativePath); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 39e46f6b6afd..c2487da892a2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -15,14 +15,14 @@ import java.nio.file.Path; import java.util.HashMap; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KubeProcessFactory implements ProcessFactory { + @VisibleForTesting + public static final int KUBE_NAME_LEN_LIMIT = 63; + private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class); public static final String JOB_TYPE = "job_type"; @@ -31,15 +31,12 @@ public class KubeProcessFactory implements ProcessFactory { public static final String CHECK_JOB = "check"; public static final String DISCOVER_JOB = "discover"; - public static final String SYNC_RUNNER = "sync-runner"; - public static final String SYNC_STEP = "sync_step"; public static final String READ_STEP = "read"; public static final String WRITE_STEP = "write"; public static final String NORMALISE_STEP = "normalise"; public static final String CUSTOM_STEP = "custom"; - private static final Pattern ALPHABETIC = Pattern.compile("[a-zA-Z]+"); private static final String JOB_LABEL_KEY = "job_id"; private static final String ATTEMPT_LABEL_KEY = "attempt_id"; private static final String WORKER_POD_LABEL_KEY = "airbyte"; @@ -109,7 +106,7 @@ public Process create(final String jobId, throws WorkerException { try { // used to differentiate source and destination processes with the same id and attempt - final String podName = createPodName(imageName, jobId, attempt); + final String podName = ProcessFactory.createProcessName(imageName, jobId, attempt, KUBE_NAME_LEN_LIMIT); LOGGER.info("Attempting to start pod = {} for {}", podName, imageName); final int stdoutLocalPort = KubePortManagerSingleton.getInstance().take(); @@ -165,42 +162,4 @@ public static Map getLabels(final String jobId, final int attemp return allLabels; } - /** - * Docker image names are by convention separated by slashes. The last portion is the image's name. - * This is followed by a colon and a version number. e.g. airbyte/scheduler:v1 or - * gcr.io/my-project/image-name:v2. - * - * Kubernetes has a maximum pod name length of 63 characters, and names must start with an - * alphabetic character. - * - * With these two facts, attempt to construct a unique Pod name with the image name present for - * easier operations. - */ - @VisibleForTesting - protected static String createPodName(final String fullImagePath, final String jobId, final int attempt) { - final var versionDelimiter = ":"; - final var noVersion = fullImagePath.split(versionDelimiter)[0]; - - final var dockerDelimiter = "/"; - final var nameParts = noVersion.split(dockerDelimiter); - var imageName = nameParts[nameParts.length - 1]; - - final var randSuffix = RandomStringUtils.randomAlphabetic(5).toLowerCase(); - final String suffix = "sync" + "-" + jobId + "-" + attempt + "-" + randSuffix; - - var podName = imageName + "-" + suffix; - final var podNameLenLimit = 63; - if (podName.length() > podNameLenLimit) { - final var extra = podName.length() - podNameLenLimit; - imageName = imageName.substring(extra); - podName = imageName + "-" + suffix; - } - final Matcher m = ALPHABETIC.matcher(podName); - // Since we add sync-UUID as a suffix a couple of lines up, there will always be a substring - // starting with an alphabetic character. - // If the image name is a no-op, this function should always return `sync-UUID` at the minimum. - m.find(); - return podName.substring(m.start()); - } - } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/ProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/ProcessFactory.java index edd1e59508c8..724763fcb9ff 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/ProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/ProcessFactory.java @@ -8,9 +8,16 @@ import io.airbyte.workers.WorkerException; import java.nio.file.Path; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.commons.lang3.RandomStringUtils; public interface ProcessFactory { + String VERSION_DELIMITER = ":"; + String DOCKER_DELIMITER = "/"; + Pattern ALPHABETIC = Pattern.compile("[a-zA-Z]+"); + /** * Creates a ProcessBuilder to run a program in a new Process. * @@ -44,4 +51,39 @@ Process create(String jobId, final String... args) throws WorkerException; + /** + * Docker image names are by convention separated by slashes. The last portion is the image's name. + * This is followed by a colon and a version number. e.g. airbyte/scheduler:v1 or + * gcr.io/my-project/image-name:v2. + * + * With these two facts, attempt to construct a unique process name with the image name present that + * can be used by the factories implementing this interface for easier operations. + */ + static String createProcessName(final String fullImagePath, final String jobId, final int attempt, final int lenLimit) { + final var noVersion = fullImagePath.split(VERSION_DELIMITER)[0]; + + final var nameParts = noVersion.split(DOCKER_DELIMITER); + var imageName = nameParts[nameParts.length - 1]; + + final var randSuffix = RandomStringUtils.randomAlphabetic(5).toLowerCase(); + final String suffix = "sync" + "-" + jobId + "-" + attempt + "-" + randSuffix; + + var processName = imageName + "-" + suffix; + if (processName.length() > lenLimit) { + final var extra = processName.length() - lenLimit; + imageName = imageName.substring(extra); + processName = imageName + "-" + suffix; + } + + // Kubernetes pod names must start with an alphabetic character while Docker names accept + // alphanumeric. + // Use the stricter convention for simplicity. + final Matcher m = ALPHABETIC.matcher(processName); + // Since we add sync-UUID as a suffix a couple of lines up, there will always be a substring + // starting with an alphabetic character. + // If the image name is a no-op, this function should always return `sync-UUID` at the minimum. + m.find(); + return processName.substring(m.start()); + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/KubeProcessFactoryTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/ProcessFactoryTest.java similarity index 65% rename from airbyte-workers/src/test/java/io/airbyte/workers/process/KubeProcessFactoryTest.java rename to airbyte-workers/src/test/java/io/airbyte/workers/process/ProcessFactoryTest.java index 32690a9e2795..7701c58fb7d1 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/process/KubeProcessFactoryTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/ProcessFactoryTest.java @@ -7,18 +7,21 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class KubeProcessFactoryTest { +public class ProcessFactoryTest { @Test void getPodNameNormal() { - final var name = KubeProcessFactory.createPodName("airbyte/tester:1", "1", 10); + final var name = ProcessFactory.createProcessName("airbyte/tester:1", "1", 10, + KubeProcessFactory.KUBE_NAME_LEN_LIMIT); final var withoutRandSuffix = name.substring(0, name.length() - 5); Assertions.assertEquals("tester-sync-1-10-", withoutRandSuffix); } @Test void getPodNameTruncated() { - final var name = KubeProcessFactory.createPodName("airbyte/very-very-very-long-name-longer-than-63-chars:2", "1", 10); + final var name = + ProcessFactory.createProcessName("airbyte/very-very-very-long-name-longer-than-63-chars:2", + "1", 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT); final var withoutRandSuffix = name.substring(0, name.length() - 5); Assertions.assertEquals("very-very-very-long-name-longer-than-63-chars-sync-1-10-", withoutRandSuffix); } @@ -26,7 +29,8 @@ void getPodNameTruncated() { @Test void testHandlingDashAsFirstCharacter() { final var uuid = "7339ba3b-cb53-4210-9591-c70d4a372330"; - final var name = KubeProcessFactory.createPodName("airbyte/source-google-adwordsv2:latest", uuid, 10); + final var name = ProcessFactory.createProcessName("airbyte/source-google-adwordsv2:latest", uuid, + 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT); final var withoutRandSuffix = name.substring(0, name.length() - 5); Assertions.assertEquals("le-adwordsv2-sync-7339ba3b-cb53-4210-9591-c70d4a372330-10-", withoutRandSuffix); @@ -35,7 +39,8 @@ void testHandlingDashAsFirstCharacter() { @Test void testOnlyDashes() { final var uuid = "7339ba3b-cb53-4210-9591-c70d4a372330"; - final var name = KubeProcessFactory.createPodName("--------", uuid, 10); + final var name = ProcessFactory.createProcessName("--------", uuid, 10, + KubeProcessFactory.KUBE_NAME_LEN_LIMIT); final var withoutRandSuffix = name.substring(0, name.length() - 5); Assertions.assertEquals("sync-7339ba3b-cb53-4210-9591-c70d4a372330-10-", withoutRandSuffix); @@ -44,9 +49,9 @@ void testOnlyDashes() { @Test void testOnlyNumeric() { final var uuid = "7339ba3b-cb53-4210-9591-c70d4a372330"; - final var name = KubeProcessFactory.createPodName("0000000000", uuid, 10); + final var name = ProcessFactory.createProcessName("0000000000", uuid, 10, + KubeProcessFactory.KUBE_NAME_LEN_LIMIT); - System.out.println(name); final var withoutRandSuffix = name.substring(0, name.length() - 5); Assertions.assertEquals("sync-7339ba3b-cb53-4210-9591-c70d4a372330-10-", withoutRandSuffix); }