From ea61f4011b7969288a4031b3bdb5f5d968694baf Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Fri, 28 Jan 2022 16:45:42 +0100 Subject: [PATCH 01/15] add support for annotations, add default resources --- .../main/java/io/airbyte/config/Configs.java | 5 +++ .../java/io/airbyte/config/EnvConfigs.java | 33 ++++++++++++++++++- .../io/airbyte/workers/WorkerConfigs.java | 6 ++++ .../workers/process/KubePodProcess.java | 11 +++++-- .../workers/process/KubeProcessFactory.java | 1 + 5 files changed, 52 insertions(+), 4 deletions(-) diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 297744faaf13..ffe880b5a9b8 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -247,6 +247,11 @@ public interface Configs { */ Map getJobKubeNodeSelectors(); + /** + * Define one or more Job pod annotations. Each kv-pair is separated by a `,`. + */ + Map getJobKubeAnnotations(); + /** * Define the Job pod connector image pull policy. */ diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 065c50963f90..59ba1b053e3b 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -58,6 +58,7 @@ public class EnvConfigs implements Configs { public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY"; public static final String JOB_KUBE_TOLERATIONS = "JOB_KUBE_TOLERATIONS"; public static final String JOB_KUBE_NODE_SELECTORS = "JOB_KUBE_NODE_SELECTORS"; + public static final String JOB_KUBE_ANNOTATIONS = "JOB_KUBE_ANNOTATIONS"; public static final String JOB_KUBE_SOCAT_IMAGE = "JOB_KUBE_SOCAT_IMAGE"; public static final String JOB_KUBE_BUSYBOX_IMAGE = "JOB_KUBE_BUSYBOX_IMAGE"; public static final String JOB_KUBE_CURL_IMAGE = "JOB_KUBE_CURL_IMAGE"; @@ -431,8 +432,38 @@ private TolerationPOJO parseToleration(final String tolerationStr) { */ @Override public Map getJobKubeNodeSelectors() { + return splitKVPairs(getEnvOrDefault(JOB_KUBE_NODE_SELECTORS, "")); + } + + /** + * Returns a map of annotations from its own environment variable. The value of the env is a string + * that represents one or more annotations. Each kv-pair is separated by a `,` + *

+ * For example:- The following represents two annotations + *

+ * airbyte=server,type=preemptive + * + * @return map containing kv pairs of annotations + */ + @Override + public Map getJobKubeAnnotations() { + return splitKVPairs(getEnvOrDefault(JOB_KUBE_ANNOTATIONS, "")); + } + + /** + * Splits key value pairs from the input string into a Map. + * Each kv-pair is separated by a ','. The key and the value is separated by '='. + *

+ * For example: + *

+ * key1=value1,key2=value2 + * + * @param input string + * @return map containing kv pairs + */ + public Map splitKVPairs(String input) { return Splitter.on(",") - .splitToStream(getEnvOrDefault(JOB_KUBE_NODE_SELECTORS, "")) + .splitToStream(input) .filter(s -> !Strings.isNullOrEmpty(s) && s.contains("=")) .map(s -> s.split("=")) .collect(Collectors.toMap(s -> s[0], s -> s[1])); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java index 4dcd1d383d9c..43d76494fdd4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java @@ -16,6 +16,7 @@ public class WorkerConfigs { private final ResourceRequirements resourceRequirements; private final List workerKubeTolerations; private final Map workerKubeNodeSelectors; + private final Map workerKubeAnnotations; private final String jobImagePullSecret; private final String jobImagePullPolicy; private final String jobSocatImage; @@ -32,6 +33,7 @@ public WorkerConfigs(final Configs configs) { .withMemoryLimit(configs.getJobMainContainerMemoryLimit()); this.workerKubeTolerations = configs.getJobKubeTolerations(); this.workerKubeNodeSelectors = configs.getJobKubeNodeSelectors(); + this.workerKubeAnnotations = configs.getJobKubeAnnotations(); this.jobImagePullSecret = configs.getJobKubeMainContainerImagePullSecret(); this.jobImagePullPolicy = configs.getJobKubeMainContainerImagePullPolicy(); this.jobSocatImage = configs.getJobKubeSocatImage(); @@ -56,6 +58,10 @@ public Map getworkerKubeNodeSelectors() { return workerKubeNodeSelectors; } + public Map getWorkerKubeAnnotations() { + return workerKubeAnnotations; + } + public String getJobImagePullSecret() { return jobImagePullSecret; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 0274f42126e5..3d5d0a1cb9c2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -98,8 +98,11 @@ public class KubePodProcess extends Process { public static final Duration DEFAULT_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30); private static final String DEFAULT_MEMORY_REQUEST = "25Mi"; private static final String DEFAULT_MEMORY_LIMIT = "50Mi"; + private static final String DEFAULT_CPU_REQUEST = "0.1"; + private static final String DEFAULT_CPU_LIMIT = "0.2"; private static final ResourceRequirements DEFAULT_SIDECAR_RESOURCES = new ResourceRequirements() - .withMemoryLimit(DEFAULT_MEMORY_LIMIT).withMemoryRequest(DEFAULT_MEMORY_REQUEST); + .withMemoryLimit(DEFAULT_MEMORY_LIMIT).withMemoryRequest(DEFAULT_MEMORY_REQUEST) + .withCpuLimit(DEFAULT_CPU_LIMIT).withCpuRequest(DEFAULT_CPU_REQUEST); private static final String PIPES_DIR = "/pipes"; private static final String STDIN_PIPE_FILE = PIPES_DIR + "/stdin"; @@ -162,6 +165,7 @@ private static Container getInit(final boolean usesStdin, .withImage(busyboxImage) .withWorkingDir(CONFIG_DIR) .withCommand("sh", "-c", initEntrypointStr) + .withResources(getResourceRequirementsBuilder(DEFAULT_SIDECAR_RESOURCES).build()) .withVolumeMounts(mainVolumeMounts) .build(); } @@ -336,6 +340,7 @@ public KubePodProcess(final boolean isOrchestrator, final List tolerations, final Map nodeSelectors, final Map labels, + final Map annotations, final String socatImage, final String busyboxImage, final String curlImage, @@ -451,6 +456,7 @@ public KubePodProcess(final boolean isOrchestrator, .withNewMetadata() .withName(podName) .withLabels(labels) + .withAnnotations(annotations) .endMetadata() .withNewSpec(); @@ -468,9 +474,8 @@ public KubePodProcess(final boolean isOrchestrator, .endSpec() .build(); - LOGGER.info("Creating pod..."); + LOGGER.info("Creating pod {}...", pod.getMetadata().getName()); this.podDefinition = fabricClient.pods().inNamespace(namespace).createOrReplace(pod); - waitForInitPodToRun(fabricClient, podDefinition); LOGGER.info("Copying files..."); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 5323d9e2e4e7..cb4c36b47663 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -144,6 +144,7 @@ public Process create(final String jobId, workerConfigs.getWorkerKubeTolerations(), workerConfigs.getworkerKubeNodeSelectors(), allLabels, + workerConfigs.getWorkerKubeAnnotations(), workerConfigs.getJobSocatImage(), workerConfigs.getJobBusyboxImage(), workerConfigs.getJobCurlImage(), From 89246a036d1d3e66edb00e06f5228971bfd23f1c Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Fri, 28 Jan 2022 17:00:19 +0100 Subject: [PATCH 02/15] cosmetics --- .../src/main/java/io/airbyte/workers/process/KubePodProcess.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 3d5d0a1cb9c2..ab6b84528158 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -476,6 +476,7 @@ public KubePodProcess(final boolean isOrchestrator, LOGGER.info("Creating pod {}...", pod.getMetadata().getName()); this.podDefinition = fabricClient.pods().inNamespace(namespace).createOrReplace(pod); + waitForInitPodToRun(fabricClient, podDefinition); LOGGER.info("Copying files..."); From 2ac74f39bb3e2f690cbf96f6219a79cca15748f3 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Tue, 1 Feb 2022 20:04:25 +0100 Subject: [PATCH 03/15] add /tmp emptyDir for the main container --- .../airbyte/workers/process/KubePodProcess.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 1126d2cd014b..d53bc03f8c42 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -110,6 +110,7 @@ public class KubePodProcess extends Process implements KubePod { private static final String STDERR_PIPE_FILE = PIPES_DIR + "/stderr"; public static final String CONFIG_DIR = "/config"; private static final String TERMINATION_DIR = "/termination"; + private static final String TMP_DIR = "/tmp"; private static final String TERMINATION_FILE_MAIN = TERMINATION_DIR + "/main"; private static final String TERMINATION_FILE_CHECK = TERMINATION_DIR + "/check"; public static final String SUCCESS_FILE_NAME = "FINISHED_UPLOADING"; @@ -403,13 +404,24 @@ public KubePodProcess(final boolean isOrchestrator, .withMountPath(TERMINATION_DIR) .build(); + final Volume tmpVolume = new VolumeBuilder() + .withName("tmp") + .withNewEmptyDir() + .endEmptyDir() + .build(); + + final VolumeMount tmpVolumeMount = new VolumeMountBuilder() + .withName("tmp") + .withMountPath(TMP_DIR) + .build(); + final Container init = getInit(usesStdin, List.of(pipeVolumeMount, configVolumeMount), busyboxImage); final Container main = getMain( image, imagePullPolicy, usesStdin, entrypointOverride, - List.of(pipeVolumeMount, configVolumeMount, terminationVolumeMount), + List.of(pipeVolumeMount, configVolumeMount, terminationVolumeMount, tmpVolumeMount), resourceRequirements, internalToExternalPorts, envMap, @@ -478,7 +490,7 @@ public KubePodProcess(final boolean isOrchestrator, .withRestartPolicy("Never") .withInitContainers(init) .withContainers(containers) - .withVolumes(pipeVolume, configVolume, terminationVolume) + .withVolumes(pipeVolume, configVolume, terminationVolume, tmpVolume) .endSpec() .build(); From de0970c45fd41ec55e26136577f79a86ae0fb8ef Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Fri, 4 Feb 2022 09:51:27 +0100 Subject: [PATCH 04/15] add pod annotations variable to kube manifests --- kube/overlays/dev-integration-test/.env | 3 ++- kube/overlays/dev/.env | 3 ++- kube/overlays/stable-with-resource-limits/.env | 3 ++- kube/overlays/stable/.env | 3 ++- kube/resources/worker.yaml | 5 +++++ 5 files changed, 13 insertions(+), 4 deletions(-) diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index e85cf73e04d3..d9a82788cb7b 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -57,8 +57,9 @@ JOB_MAIN_CONTAINER_CPU_LIMIT= JOB_MAIN_CONTAINER_MEMORY_REQUEST= JOB_MAIN_CONTAINER_MEMORY_LIMIT= -# Worker pod tolerations and node selectors +# Worker pod tolerations, annotations and node selectors JOB_KUBE_TOLERATIONS= +JOB_KUBE_ANNOTATIONS= JOB_KUBE_NODE_SELECTORS= # Job image pull policy diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index 5e4680e590cf..865118eb2dbf 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -59,8 +59,9 @@ JOB_MAIN_CONTAINER_CPU_LIMIT= JOB_MAIN_CONTAINER_MEMORY_REQUEST= JOB_MAIN_CONTAINER_MEMORY_LIMIT= -# Worker pod tolerations and node selectors +# Worker pod tolerations, annotations and node selectors JOB_KUBE_TOLERATIONS= +JOB_KUBE_ANNOTATIONS= JOB_KUBE_NODE_SELECTORS= # Job image pull policy diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index 7c8ae77bac1e..f1c111a8835c 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -59,8 +59,9 @@ JOB_MAIN_CONTAINER_CPU_LIMIT= JOB_MAIN_CONTAINER_MEMORY_REQUEST= JOB_MAIN_CONTAINER_MEMORY_LIMIT= -# Worker pod tolerations and node selectors +# Worker pod tolerations, annotations and node selectors JOB_KUBE_TOLERATIONS= +JOB_KUBE_ANNOTATIONS= JOB_KUBE_NODE_SELECTORS= # Job image pull policy diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index 7c8ae77bac1e..f1c111a8835c 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -59,8 +59,9 @@ JOB_MAIN_CONTAINER_CPU_LIMIT= JOB_MAIN_CONTAINER_MEMORY_REQUEST= JOB_MAIN_CONTAINER_MEMORY_LIMIT= -# Worker pod tolerations and node selectors +# Worker pod tolerations, annotations and node selectors JOB_KUBE_TOLERATIONS= +JOB_KUBE_ANNOTATIONS= JOB_KUBE_NODE_SELECTORS= # Job image pull policy diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml index 2c8c5eb92e90..3e6258d2224c 100644 --- a/kube/resources/worker.yaml +++ b/kube/resources/worker.yaml @@ -174,6 +174,11 @@ spec: configMapKeyRef: name: airbyte-env key: JOB_KUBE_TOLERATIONS + - name: JOB_KUBE_ANNOTATIONS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_KUBE_ANNOTATIONS - name: JOB_KUBE_NODE_SELECTORS valueFrom: configMapKeyRef: From 967ffd51a029c0022514b54e104a014316e2d2b1 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Fri, 25 Feb 2022 11:23:33 +0100 Subject: [PATCH 05/15] update kubernetes specific docs --- docs/operator-guides/configuring-airbyte.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/operator-guides/configuring-airbyte.md b/docs/operator-guides/configuring-airbyte.md index 8f59dd8bdb34..01fdb195e42c 100644 --- a/docs/operator-guides/configuring-airbyte.md +++ b/docs/operator-guides/configuring-airbyte.md @@ -97,12 +97,13 @@ The following variables are relevant to both Docker and Kubernetes. #### Jobs 1. `JOB_KUBE_TOLERATIONS` - Define one or more Job pod tolerations. Tolerations are separated by ';'. Each toleration contains k=v pairs mentioning some/all of key, effect, operator and value and separated by `,`. 2. `JOB_KUBE_NODE_SELECTORS` - Define one or more Job pod node selectors. Each kv-pair is separated by a `,`. -3. `JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY` - Define the Job pod connector image pull policy. -4. `JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET` - Define the Job pod connector image pull secret. Useful when hosting private images. -5. `JOB_KUBE_SOCAT_IMAGE` - Define the Job pod socat image. -6. `JOB_KUBE_BUSYBOX_IMAGE` - Define the Job pod busybox image. -7. `JOB_KUBE_CURL_IMAGE` - Define the Job pod curl image pull. -8. `JOB_KUBE_NAMESPACE` - Define the Kubernetes namespace Job pods are created in. +3. `JOB_KUBE_ANNOTATIONS` - Define one or more Job pod annotations. Each kv-pair is separated by a `,`. +4. `JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY` - Define the Job pod connector image pull policy. +5. `JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET` - Define the Job pod connector image pull secret. Useful when hosting private images. +6. `JOB_KUBE_SOCAT_IMAGE` - Define the Job pod socat image. +7. `JOB_KUBE_BUSYBOX_IMAGE` - Define the Job pod busybox image. +8. `JOB_KUBE_CURL_IMAGE` - Define the Job pod curl image pull. +9. `JOB_KUBE_NAMESPACE` - Define the Kubernetes namespace Job pods are created in. #### Worker 1. `TEMPORAL_WORKER_PORTS` - Define the local ports the Airbyte Worker pod uses to connect to the various Job pods. Port 9001 - 9040 are exposed by default in the Kustomize deployments. From 0800f259f05ea27a11c130808794f2328cd22652 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Fri, 25 Feb 2022 16:20:26 +0100 Subject: [PATCH 06/15] cosmetics --- .../models/src/main/java/io/airbyte/config/EnvConfigs.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 8b8eabd4d495..b7d0acd20ae0 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -553,8 +553,8 @@ public Optional> getDiscoverJobKubeAnnotations() { } /** - * Splits key value pairs from the input string into a map. - * Each kv-pair is separated by a ','. The key and the value are separated by '='. + * Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','. The + * key and the value are separated by '='. *

* For example:- The following represents two map entries *

From 58ea1f91aecbd881e985f7e52c83c13be2f95ad1 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Fri, 25 Feb 2022 16:20:55 +0100 Subject: [PATCH 07/15] add env vars to helm chart --- charts/airbyte/templates/worker/deployment.yaml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/charts/airbyte/templates/worker/deployment.yaml b/charts/airbyte/templates/worker/deployment.yaml index 8bcf7dc06610..7f2344f40f91 100644 --- a/charts/airbyte/templates/worker/deployment.yaml +++ b/charts/airbyte/templates/worker/deployment.yaml @@ -120,6 +120,16 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + - name: JOB_KUBE_ANNOTATIONS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_KUBE_ANNOTATIONS + - name: JOB_KUBE_NODE_SELECTORS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_KUBE_NODE_SELECTORS - name: SUBMITTER_NUM_THREADS valueFrom: configMapKeyRef: From 89949392e5e8506dab33251cb45f387dbba942d2 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Mon, 28 Feb 2022 18:18:06 +0100 Subject: [PATCH 08/15] Revert "add /tmp emptyDir for the main container" This reverts commit 2ac74f39bb3e2f690cbf96f6219a79cca15748f3. --- .../airbyte/workers/process/KubePodProcess.java | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 5e5c4a1275fd..ebc1327e9232 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -114,7 +114,6 @@ public class KubePodProcess extends Process implements KubePod { private static final String STDERR_PIPE_FILE = PIPES_DIR + "/stderr"; public static final String CONFIG_DIR = "/config"; private static final String TERMINATION_DIR = "/termination"; - private static final String TMP_DIR = "/tmp"; private static final String TERMINATION_FILE_MAIN = TERMINATION_DIR + "/main"; private static final String TERMINATION_FILE_CHECK = TERMINATION_DIR + "/check"; public static final String SUCCESS_FILE_NAME = "FINISHED_UPLOADING"; @@ -428,24 +427,13 @@ public KubePodProcess(final boolean isOrchestrator, .withMountPath(TERMINATION_DIR) .build(); - final Volume tmpVolume = new VolumeBuilder() - .withName("tmp") - .withNewEmptyDir() - .endEmptyDir() - .build(); - - final VolumeMount tmpVolumeMount = new VolumeMountBuilder() - .withName("tmp") - .withMountPath(TMP_DIR) - .build(); - final Container init = getInit(usesStdin, List.of(pipeVolumeMount, configVolumeMount), busyboxImage); final Container main = getMain( image, imagePullPolicy, usesStdin, entrypointOverride, - List.of(pipeVolumeMount, configVolumeMount, terminationVolumeMount, tmpVolumeMount), + List.of(pipeVolumeMount, configVolumeMount, terminationVolumeMount), resourceRequirements, internalToExternalPorts, envMap, @@ -514,7 +502,7 @@ public KubePodProcess(final boolean isOrchestrator, .withRestartPolicy("Never") .withInitContainers(init) .withContainers(containers) - .withVolumes(pipeVolume, configVolume, terminationVolume, tmpVolume) + .withVolumes(pipeVolume, configVolume, terminationVolume) .endSpec() .build(); From 6cff4b926b8365b50fccb80c3a6ba058a06ccf99 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Mon, 28 Feb 2022 18:20:46 +0100 Subject: [PATCH 09/15] Remove sidecar resources modifications This will be handled in a separate PR --- .../java/io/airbyte/workers/process/KubePodProcess.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index ebc1327e9232..4ba6aab861a0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -102,11 +102,8 @@ public class KubePodProcess extends Process implements KubePod { private static final String INIT_CONTAINER_NAME = "init"; private static final String DEFAULT_MEMORY_REQUEST = "25Mi"; private static final String DEFAULT_MEMORY_LIMIT = "50Mi"; - private static final String DEFAULT_CPU_REQUEST = "0.1"; - private static final String DEFAULT_CPU_LIMIT = "0.2"; private static final ResourceRequirements DEFAULT_SIDECAR_RESOURCES = new ResourceRequirements() - .withMemoryLimit(DEFAULT_MEMORY_LIMIT).withMemoryRequest(DEFAULT_MEMORY_REQUEST) - .withCpuLimit(DEFAULT_CPU_LIMIT).withCpuRequest(DEFAULT_CPU_REQUEST); + .withMemoryLimit(DEFAULT_MEMORY_LIMIT).withMemoryRequest(DEFAULT_MEMORY_REQUEST); private static final String PIPES_DIR = "/pipes"; private static final String STDIN_PIPE_FILE = PIPES_DIR + "/stdin"; @@ -179,7 +176,6 @@ private static Container getInit(final boolean usesStdin, .withImage(busyboxImage) .withWorkingDir(CONFIG_DIR) .withCommand("sh", "-c", initCommand) - .withResources(getResourceRequirementsBuilder(DEFAULT_SIDECAR_RESOURCES).build()) .withVolumeMounts(mainVolumeMounts) .build(); } From d95394eb3f8e4e9aebe55a9566698ab9ae88d082 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Wed, 2 Mar 2022 14:34:33 +0100 Subject: [PATCH 10/15] add unit test --- .../java/io/airbyte/config/EnvConfigs.java | 9 ++++-- .../io/airbyte/config/EnvConfigsTest.java | 29 +++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index b7d0acd20ae0..6099db7af4ac 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -564,13 +564,16 @@ public Optional> getDiscoverJobKubeAnnotations() { * @return map containing kv pairs */ public Optional> splitKVPairsFromEnvString(String input) { - final Map kv = Splitter.on(",") + if (input == null) { + input = ""; + } + final Map map = Splitter.on(",") .splitToStream(input) .filter(s -> !Strings.isNullOrEmpty(s) && s.contains("=")) .map(s -> s.split("=")) - .collect(Collectors.toMap(s -> s[0], s -> s[1])); + .collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim())); - return kv.isEmpty() ? Optional.empty() : Optional.of(kv); + return map.isEmpty() ? Optional.empty() : Optional.of(map); } @Override diff --git a/airbyte-config/models/src/test/java/io/airbyte/config/EnvConfigsTest.java b/airbyte-config/models/src/test/java/io/airbyte/config/EnvConfigsTest.java index d3a7fcf4f279..92a9072c391d 100644 --- a/airbyte-config/models/src/test/java/io/airbyte/config/EnvConfigsTest.java +++ b/airbyte-config/models/src/test/java/io/airbyte/config/EnvConfigsTest.java @@ -11,6 +11,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -185,6 +187,33 @@ void testworkerKubeTolerations() { new TolerationPOJO("airbyte-server", "NoSchedule", "true", "Equals"))); } + @Test + void testSplitKVPairsFromEnvString() { + String input = "key1=value1,key2=value2"; + Optional> map = config.splitKVPairsFromEnvString(input); + assertTrue(map.isPresent()); + assertEquals(2, map.get().size()); + assertEquals(map.get(), Map.of("key1", "value1", "key2", "value2")); + + input = "key=k,,;$%&^#"; + map = config.splitKVPairsFromEnvString(input); + assertTrue(map.isPresent()); + assertEquals(map.get(), Map.of("key", "k")); + + input = null; + map = config.splitKVPairsFromEnvString(input); + assertFalse(map.isPresent()); + + input = " key1= value1, key2 = value2"; + map = config.splitKVPairsFromEnvString(input); + assertTrue(map.isPresent()); + assertEquals(map.get(), Map.of("key1", "value1", "key2", "value2")); + + input = "key1:value1,key2:value2"; + map = config.splitKVPairsFromEnvString(input); + assertFalse(map.isPresent()); + } + @Test void testJobKubeNodeSelectors() { envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, null); From e76cd14ff4ee1d8568fe4520673e65382839ec98 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Wed, 2 Mar 2022 14:43:27 +0100 Subject: [PATCH 11/15] add kv pair example --- docs/operator-guides/configuring-airbyte.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/operator-guides/configuring-airbyte.md b/docs/operator-guides/configuring-airbyte.md index 01fdb195e42c..132dfd00b0ac 100644 --- a/docs/operator-guides/configuring-airbyte.md +++ b/docs/operator-guides/configuring-airbyte.md @@ -96,8 +96,8 @@ The following variables are relevant to both Docker and Kubernetes. ### Kubernetes-Only #### Jobs 1. `JOB_KUBE_TOLERATIONS` - Define one or more Job pod tolerations. Tolerations are separated by ';'. Each toleration contains k=v pairs mentioning some/all of key, effect, operator and value and separated by `,`. -2. `JOB_KUBE_NODE_SELECTORS` - Define one or more Job pod node selectors. Each kv-pair is separated by a `,`. -3. `JOB_KUBE_ANNOTATIONS` - Define one or more Job pod annotations. Each kv-pair is separated by a `,`. +2. `JOB_KUBE_NODE_SELECTORS` - Define one or more Job pod node selectors. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` +3. `JOB_KUBE_ANNOTATIONS` - Define one or more Job pod annotations. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` 4. `JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY` - Define the Job pod connector image pull policy. 5. `JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET` - Define the Job pod connector image pull secret. Useful when hosting private images. 6. `JOB_KUBE_SOCAT_IMAGE` - Define the Job pod socat image. From 0e5a1ab1ec6594c7dfa65066cb6ac19833320707 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Mon, 28 Mar 2022 20:29:27 +0200 Subject: [PATCH 12/15] remove optional maps --- .../main/java/io/airbyte/config/Configs.java | 17 +++-- .../java/io/airbyte/config/EnvConfigs.java | 21 +++---- .../io/airbyte/config/EnvConfigsTest.java | 62 +++++++++---------- .../io/airbyte/workers/WorkerConfigs.java | 21 +++---- .../workers/process/KubePodProcess.java | 9 ++- .../io/airbyte/workers/WorkerConfigsTest.java | 29 +++++---- 6 files changed, 76 insertions(+), 83 deletions(-) diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index f1a2fc684f0e..b47020d69d59 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -11,7 +11,6 @@ import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; /** @@ -271,42 +270,42 @@ public interface Configs { /** * Define one or more Job pod node selectors. Each kv-pair is separated by a `,`. */ - Optional> getJobKubeNodeSelectors(); + Map getJobKubeNodeSelectors(); /** * Define node selectors for Spec job pods specifically. Each kv-pair is separated by a `,`. */ - Optional> getSpecJobKubeNodeSelectors(); + Map getSpecJobKubeNodeSelectors(); /** * Define node selectors for Check job pods specifically. Each kv-pair is separated by a `,`. */ - Optional> getCheckJobKubeNodeSelectors(); + Map getCheckJobKubeNodeSelectors(); /** * Define node selectors for Discover job pods specifically. Each kv-pair is separated by a `,`. */ - Optional> getDiscoverJobKubeNodeSelectors(); + Map getDiscoverJobKubeNodeSelectors(); /** * Define one or more Job pod annotations. Each kv-pair is separated by a `,`. */ - Optional> getJobKubeAnnotations(); + Map getJobKubeAnnotations(); /** * Define annotations for Spec job pods specifically. Each kv-pair is separated by a `,`. */ - Optional> getSpecJobKubeAnnotations(); + Map getSpecJobKubeAnnotations(); /** * Define annotations for Check job pods specifically. Each kv-pair is separated by a `,`. */ - Optional> getCheckJobKubeAnnotations(); + Map getCheckJobKubeAnnotations(); /** * Define annotations for Discover job pods specifically. Each kv-pair is separated by a `,`. */ - Optional> getDiscoverJobKubeAnnotations(); + Map getDiscoverJobKubeAnnotations(); /** * Define the Job pod connector image pull policy. diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 6099db7af4ac..c708332b5a3a 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -473,7 +473,7 @@ private TolerationPOJO parseToleration(final String tolerationStr) { * @return map containing kv pairs of node selectors, or empty optional if none present. */ @Override - public Optional> getJobKubeNodeSelectors() { + public Map getJobKubeNodeSelectors() { return splitKVPairsFromEnvString(getEnvOrDefault(JOB_KUBE_NODE_SELECTORS, "")); } @@ -483,7 +483,7 @@ public Optional> getJobKubeNodeSelectors() { * @return map containing kv pairs of node selectors, or empty optional if none present. */ @Override - public Optional> getSpecJobKubeNodeSelectors() { + public Map getSpecJobKubeNodeSelectors() { return splitKVPairsFromEnvString(getEnvOrDefault(SPEC_JOB_KUBE_NODE_SELECTORS, "")); } @@ -493,7 +493,7 @@ public Optional> getSpecJobKubeNodeSelectors() { * @return map containing kv pairs of node selectors, or empty optional if none present. */ @Override - public Optional> getCheckJobKubeNodeSelectors() { + public Map getCheckJobKubeNodeSelectors() { return splitKVPairsFromEnvString(getEnvOrDefault(CHECK_JOB_KUBE_NODE_SELECTORS, "")); } @@ -503,7 +503,7 @@ public Optional> getCheckJobKubeNodeSelectors() { * @return map containing kv pairs of node selectors, or empty optional if none present. */ @Override - public Optional> getDiscoverJobKubeNodeSelectors() { + public Map getDiscoverJobKubeNodeSelectors() { return splitKVPairsFromEnvString(getEnvOrDefault(DISCOVER_JOB_KUBE_NODE_SELECTORS, "")); } @@ -518,7 +518,7 @@ public Optional> getDiscoverJobKubeNodeSelectors() { * @return map containing kv pairs of annotations */ @Override - public Optional> getJobKubeAnnotations() { + public Map getJobKubeAnnotations() { return splitKVPairsFromEnvString(getEnvOrDefault(JOB_KUBE_ANNOTATIONS, "")); } @@ -528,7 +528,7 @@ public Optional> getJobKubeAnnotations() { * @return map containing kv pairs of node selectors, or empty optional if none present. */ @Override - public Optional> getSpecJobKubeAnnotations() { + public Map getSpecJobKubeAnnotations() { return splitKVPairsFromEnvString(getEnvOrDefault(SPEC_JOB_KUBE_ANNOTATIONS, "")); } @@ -538,7 +538,7 @@ public Optional> getSpecJobKubeAnnotations() { * @return map containing kv pairs of node selectors, or empty optional if none present. */ @Override - public Optional> getCheckJobKubeAnnotations() { + public Map getCheckJobKubeAnnotations() { return splitKVPairsFromEnvString(getEnvOrDefault(CHECK_JOB_KUBE_ANNOTATIONS, "")); } @@ -548,7 +548,7 @@ public Optional> getCheckJobKubeAnnotations() { * @return map containing kv pairs of node selectors, or empty optional if none present. */ @Override - public Optional> getDiscoverJobKubeAnnotations() { + public Map getDiscoverJobKubeAnnotations() { return splitKVPairsFromEnvString(getEnvOrDefault(DISCOVER_JOB_KUBE_ANNOTATIONS, "")); } @@ -563,7 +563,7 @@ public Optional> getDiscoverJobKubeAnnotations() { * @param input string * @return map containing kv pairs */ - public Optional> splitKVPairsFromEnvString(String input) { + public Map splitKVPairsFromEnvString(String input) { if (input == null) { input = ""; } @@ -572,8 +572,7 @@ public Optional> splitKVPairsFromEnvString(String input) { .filter(s -> !Strings.isNullOrEmpty(s) && s.contains("=")) .map(s -> s.split("=")) .collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim())); - - return map.isEmpty() ? Optional.empty() : Optional.of(map); + return map.isEmpty() ? null : map; } @Override diff --git a/airbyte-config/models/src/test/java/io/airbyte/config/EnvConfigsTest.java b/airbyte-config/models/src/test/java/io/airbyte/config/EnvConfigsTest.java index 92a9072c391d..8578fd261db8 100644 --- a/airbyte-config/models/src/test/java/io/airbyte/config/EnvConfigsTest.java +++ b/airbyte-config/models/src/test/java/io/airbyte/config/EnvConfigsTest.java @@ -11,8 +11,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -190,100 +188,100 @@ void testworkerKubeTolerations() { @Test void testSplitKVPairsFromEnvString() { String input = "key1=value1,key2=value2"; - Optional> map = config.splitKVPairsFromEnvString(input); - assertTrue(map.isPresent()); - assertEquals(2, map.get().size()); - assertEquals(map.get(), Map.of("key1", "value1", "key2", "value2")); + Map map = config.splitKVPairsFromEnvString(input); + assertNotNull(map); + assertEquals(2, map.size()); + assertEquals(map, Map.of("key1", "value1", "key2", "value2")); input = "key=k,,;$%&^#"; map = config.splitKVPairsFromEnvString(input); - assertTrue(map.isPresent()); - assertEquals(map.get(), Map.of("key", "k")); + assertNotNull(map); + assertEquals(map, Map.of("key", "k")); input = null; map = config.splitKVPairsFromEnvString(input); - assertFalse(map.isPresent()); + assertNull(map); input = " key1= value1, key2 = value2"; map = config.splitKVPairsFromEnvString(input); - assertTrue(map.isPresent()); - assertEquals(map.get(), Map.of("key1", "value1", "key2", "value2")); + assertNotNull(map); + assertEquals(map, Map.of("key1", "value1", "key2", "value2")); input = "key1:value1,key2:value2"; map = config.splitKVPairsFromEnvString(input); - assertFalse(map.isPresent()); + assertNull(map); } @Test void testJobKubeNodeSelectors() { envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, null); - assertFalse(config.getJobKubeNodeSelectors().isPresent()); + assertNull(config.getJobKubeNodeSelectors()); envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, ",,,"); - assertFalse(config.getJobKubeNodeSelectors().isPresent()); + assertNull(config.getJobKubeNodeSelectors()); envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#"); - assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("key", "k")); + assertEquals(config.getJobKubeNodeSelectors(), Map.of("key", "k")); envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "one=two"); - assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("one", "two")); + assertEquals(config.getJobKubeNodeSelectors(), Map.of("one", "two")); envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing"); - assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing")); + assertEquals(config.getJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing")); } @Test void testSpecKubeNodeSelectors() { envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, null); - assertFalse(config.getSpecJobKubeNodeSelectors().isPresent()); + assertNull(config.getSpecJobKubeNodeSelectors()); envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, ",,,"); - assertFalse(config.getSpecJobKubeNodeSelectors().isPresent()); + assertNull(config.getSpecJobKubeNodeSelectors()); envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#"); - assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("key", "k")); + assertEquals(config.getSpecJobKubeNodeSelectors(), Map.of("key", "k")); envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "one=two"); - assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("one", "two")); + assertEquals(config.getSpecJobKubeNodeSelectors(), Map.of("one", "two")); envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing"); - assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing")); + assertEquals(config.getSpecJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing")); } @Test void testCheckKubeNodeSelectors() { envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, null); - assertFalse(config.getCheckJobKubeNodeSelectors().isPresent()); + assertNull(config.getCheckJobKubeNodeSelectors()); envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, ",,,"); - assertFalse(config.getCheckJobKubeNodeSelectors().isPresent()); + assertNull(config.getCheckJobKubeNodeSelectors()); envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#"); - assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("key", "k")); + assertEquals(config.getCheckJobKubeNodeSelectors(), Map.of("key", "k")); envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "one=two"); - assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("one", "two")); + assertEquals(config.getCheckJobKubeNodeSelectors(), Map.of("one", "two")); envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing"); - assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing")); + assertEquals(config.getCheckJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing")); } @Test void testDiscoverKubeNodeSelectors() { envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, null); - assertFalse(config.getDiscoverJobKubeNodeSelectors().isPresent()); + assertNull(config.getDiscoverJobKubeNodeSelectors()); envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, ",,,"); - assertFalse(config.getDiscoverJobKubeNodeSelectors().isPresent()); + assertNull(config.getDiscoverJobKubeNodeSelectors()); envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#"); - assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("key", "k")); + assertEquals(config.getDiscoverJobKubeNodeSelectors(), Map.of("key", "k")); envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "one=two"); - assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("one", "two")); + assertEquals(config.getDiscoverJobKubeNodeSelectors(), Map.of("one", "two")); envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing"); - assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing")); + assertEquals(config.getDiscoverJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing")); } @Test diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java index c79e945332db..386df13625c3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java @@ -10,7 +10,6 @@ import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Optional; import lombok.AllArgsConstructor; @AllArgsConstructor @@ -19,8 +18,8 @@ public class WorkerConfigs { private final Configs.WorkerEnvironment workerEnvironment; private final ResourceRequirements resourceRequirements; private final List workerKubeTolerations; - private final Optional> workerKubeNodeSelectors; - private final Optional> workerKubeAnnotations; + private final Map workerKubeNodeSelectors; + private final Map workerKubeAnnotations; private final String jobImagePullSecret; private final String jobImagePullPolicy; private final String jobSocatImage; @@ -57,11 +56,11 @@ public WorkerConfigs(final Configs configs) { * Builds a WorkerConfigs with some configs that are specific to the Spec job type. */ public static WorkerConfigs buildSpecWorkerConfigs(final Configs configs) { - final Optional> nodeSelectors = configs.getSpecJobKubeNodeSelectors().isPresent() + final Map nodeSelectors = configs.getSpecJobKubeNodeSelectors() != null ? configs.getSpecJobKubeNodeSelectors() : configs.getJobKubeNodeSelectors(); - final Optional> annotations = configs.getSpecJobKubeAnnotations().isPresent() + final Map annotations = configs.getSpecJobKubeAnnotations() != null ? configs.getSpecJobKubeAnnotations() : configs.getJobKubeAnnotations(); @@ -88,11 +87,11 @@ public static WorkerConfigs buildSpecWorkerConfigs(final Configs configs) { * Builds a WorkerConfigs with some configs that are specific to the Check job type. */ public static WorkerConfigs buildCheckWorkerConfigs(final Configs configs) { - final Optional> nodeSelectors = configs.getCheckJobKubeNodeSelectors().isPresent() + final Map nodeSelectors = configs.getCheckJobKubeNodeSelectors() != null ? configs.getCheckJobKubeNodeSelectors() : configs.getJobKubeNodeSelectors(); - final Optional> annotations = configs.getCheckJobKubeAnnotations().isPresent() + final Map annotations = configs.getCheckJobKubeAnnotations() != null ? configs.getCheckJobKubeAnnotations() : configs.getJobKubeAnnotations(); @@ -119,11 +118,11 @@ public static WorkerConfigs buildCheckWorkerConfigs(final Configs configs) { * Builds a WorkerConfigs with some configs that are specific to the Discover job type. */ public static WorkerConfigs buildDiscoverWorkerConfigs(final Configs configs) { - final Optional> nodeSelectors = configs.getDiscoverJobKubeNodeSelectors().isPresent() + final Map nodeSelectors = configs.getDiscoverJobKubeNodeSelectors() != null ? configs.getDiscoverJobKubeNodeSelectors() : configs.getJobKubeNodeSelectors(); - final Optional> annotations = configs.getDiscoverJobKubeAnnotations().isPresent() + final Map annotations = configs.getDiscoverJobKubeAnnotations() != null ? configs.getDiscoverJobKubeAnnotations() : configs.getJobKubeAnnotations(); @@ -178,11 +177,11 @@ public List getWorkerKubeTolerations() { return workerKubeTolerations; } - public Optional> getworkerKubeNodeSelectors() { + public Map getworkerKubeNodeSelectors() { return workerKubeNodeSelectors; } - public Optional> getWorkerKubeAnnotations() { + public Map getWorkerKubeAnnotations() { return workerKubeAnnotations; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 4ba6aab861a0..e7899ea755e3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -48,7 +48,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -366,9 +365,9 @@ public KubePodProcess(final boolean isOrchestrator, final ResourceRequirements resourceRequirements, final String imagePullSecret, final List tolerations, - final Optional> nodeSelectors, + final Map nodeSelectors, final Map labels, - final Optional> annotations, + final Map annotations, final String socatImage, final String busyboxImage, final String curlImage, @@ -484,7 +483,7 @@ public KubePodProcess(final boolean isOrchestrator, .withNewMetadata() .withName(podName) .withLabels(labels) - .withAnnotations(annotations.orElse(null)) + .withAnnotations(annotations) .endMetadata() .withNewSpec(); @@ -494,7 +493,7 @@ public KubePodProcess(final boolean isOrchestrator, final Pod pod = podBuilder.withTolerations(buildPodTolerations(tolerations)) .withImagePullSecrets(new LocalObjectReference(imagePullSecret)) // An empty string turns this into a no-op setting. - .withNodeSelector(nodeSelectors.orElse(null)) + .withNodeSelector(nodeSelectors) .withRestartPolicy("Never") .withInitContainers(init) .withContainers(containers) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/WorkerConfigsTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/WorkerConfigsTest.java index 4ffb832772a9..020ff5efa124 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/WorkerConfigsTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/WorkerConfigsTest.java @@ -12,7 +12,6 @@ import io.airbyte.config.EnvConfigs; import io.airbyte.config.ResourceRequirements; import java.util.Map; -import java.util.Optional; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -49,7 +48,7 @@ public class WorkerConfigsTest { @BeforeEach public void setup() { configs = mock(EnvConfigs.class); - when(configs.getJobKubeNodeSelectors()).thenReturn(Optional.of(DEFAULT_NODE_SELECTORS)); + when(configs.getJobKubeNodeSelectors()).thenReturn(DEFAULT_NODE_SELECTORS); when(configs.getJobMainContainerCpuRequest()).thenReturn(DEFAULT_CPU_REQUEST); when(configs.getJobMainContainerCpuLimit()).thenReturn(DEFAULT_CPU_LIMIT); when(configs.getJobMainContainerMemoryRequest()).thenReturn(DEFAULT_MEMORY_REQUEST); @@ -61,39 +60,39 @@ public void setup() { public void testDefaultNodeSelectors() { final WorkerConfigs defaultWorkerConfigs = new WorkerConfigs(configs); - Assertions.assertEquals(DEFAULT_NODE_SELECTORS, defaultWorkerConfigs.getworkerKubeNodeSelectors().get()); + Assertions.assertEquals(DEFAULT_NODE_SELECTORS, defaultWorkerConfigs.getworkerKubeNodeSelectors()); } @Test @DisplayName("spec, check, and discover workerConfigs use job-specific node selectors if set") public void testCustomNodeSelectors() { - when(configs.getCheckJobKubeNodeSelectors()).thenReturn(Optional.of(CHECK_NODE_SELECTORS)); - when(configs.getSpecJobKubeNodeSelectors()).thenReturn(Optional.of(SPEC_NODE_SELECTORS)); - when(configs.getDiscoverJobKubeNodeSelectors()).thenReturn(Optional.of(DISCOVER_NODE_SELECTORS)); + when(configs.getCheckJobKubeNodeSelectors()).thenReturn(CHECK_NODE_SELECTORS); + when(configs.getSpecJobKubeNodeSelectors()).thenReturn(SPEC_NODE_SELECTORS); + when(configs.getDiscoverJobKubeNodeSelectors()).thenReturn(DISCOVER_NODE_SELECTORS); final WorkerConfigs specWorkerConfigs = WorkerConfigs.buildSpecWorkerConfigs(configs); final WorkerConfigs checkWorkerConfigs = WorkerConfigs.buildCheckWorkerConfigs(configs); final WorkerConfigs discoverWorkerConfigs = WorkerConfigs.buildDiscoverWorkerConfigs(configs); - Assertions.assertEquals(SPEC_NODE_SELECTORS, specWorkerConfigs.getworkerKubeNodeSelectors().get()); - Assertions.assertEquals(CHECK_NODE_SELECTORS, checkWorkerConfigs.getworkerKubeNodeSelectors().get()); - Assertions.assertEquals(DISCOVER_NODE_SELECTORS, discoverWorkerConfigs.getworkerKubeNodeSelectors().get()); + Assertions.assertEquals(SPEC_NODE_SELECTORS, specWorkerConfigs.getworkerKubeNodeSelectors()); + Assertions.assertEquals(CHECK_NODE_SELECTORS, checkWorkerConfigs.getworkerKubeNodeSelectors()); + Assertions.assertEquals(DISCOVER_NODE_SELECTORS, discoverWorkerConfigs.getworkerKubeNodeSelectors()); } @Test @DisplayName("spec, check, and discover workerConfigs use default node selectors when custom selectors are not set") public void testNodeSelectorsFallbackToDefault() { - when(configs.getCheckJobKubeNodeSelectors()).thenReturn(Optional.empty()); - when(configs.getSpecJobKubeNodeSelectors()).thenReturn(Optional.empty()); - when(configs.getDiscoverJobKubeNodeSelectors()).thenReturn(Optional.empty()); + when(configs.getCheckJobKubeNodeSelectors()).thenReturn(null); + when(configs.getSpecJobKubeNodeSelectors()).thenReturn(null); + when(configs.getDiscoverJobKubeNodeSelectors()).thenReturn(null); final WorkerConfigs specWorkerConfigs = WorkerConfigs.buildSpecWorkerConfigs(configs); final WorkerConfigs checkWorkerConfigs = WorkerConfigs.buildCheckWorkerConfigs(configs); final WorkerConfigs discoverWorkerConfigs = WorkerConfigs.buildDiscoverWorkerConfigs(configs); - Assertions.assertEquals(DEFAULT_NODE_SELECTORS, specWorkerConfigs.getworkerKubeNodeSelectors().get()); - Assertions.assertEquals(DEFAULT_NODE_SELECTORS, checkWorkerConfigs.getworkerKubeNodeSelectors().get()); - Assertions.assertEquals(DEFAULT_NODE_SELECTORS, discoverWorkerConfigs.getworkerKubeNodeSelectors().get()); + Assertions.assertEquals(DEFAULT_NODE_SELECTORS, specWorkerConfigs.getworkerKubeNodeSelectors()); + Assertions.assertEquals(DEFAULT_NODE_SELECTORS, checkWorkerConfigs.getworkerKubeNodeSelectors()); + Assertions.assertEquals(DEFAULT_NODE_SELECTORS, discoverWorkerConfigs.getworkerKubeNodeSelectors()); } @Test From 2d93e0dda09f8ae7cd95878c8851e876af31e729 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Mon, 28 Mar 2022 20:48:30 +0200 Subject: [PATCH 13/15] add job specific documentation --- docs/operator-guides/configuring-airbyte.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/operator-guides/configuring-airbyte.md b/docs/operator-guides/configuring-airbyte.md index 132dfd00b0ac..4a3cb36285c2 100644 --- a/docs/operator-guides/configuring-airbyte.md +++ b/docs/operator-guides/configuring-airbyte.md @@ -105,6 +105,17 @@ The following variables are relevant to both Docker and Kubernetes. 8. `JOB_KUBE_CURL_IMAGE` - Define the Job pod curl image pull. 9. `JOB_KUBE_NAMESPACE` - Define the Kubernetes namespace Job pods are created in. +#### Jobs specific + +A job specific variable overwrites the standard job variable. + +1. `SPEC_JOB_KUBE_NODE_SELECTORS` - Define one or more pod node selectors for the spec job. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` +2. `CHECK_JOB_KUBE_NODE_SELECTORS` - Define one or more pod node selectors for the check job. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` +3. `DISCOVER_JOB_KUBE_NODE_SELECTORS` - Define one or more pod node selectors for the discover job. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` +4. `SPEC_JOB_KUBE_ANNOTATIONS` - Define one or more pod annotations for the spec job. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` +5. `CHECK_JOB_KUBE_ANNOTATIONS` - Define one or more pod annotations for the check job. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` +6. `DISCOVER_JOB_KUBE_ANNOTATIONS` - Define one or more pod annotations for the discover job. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` + #### Worker 1. `TEMPORAL_WORKER_PORTS` - Define the local ports the Airbyte Worker pod uses to connect to the various Job pods. Port 9001 - 9040 are exposed by default in the Kustomize deployments. From 9d0c524a6af29ac7519fafc2af350c3939942f98 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Mon, 28 Mar 2022 20:58:48 +0200 Subject: [PATCH 14/15] update doc --- docs/operator-guides/configuring-airbyte.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/operator-guides/configuring-airbyte.md b/docs/operator-guides/configuring-airbyte.md index 4a3cb36285c2..3771617b35a5 100644 --- a/docs/operator-guides/configuring-airbyte.md +++ b/docs/operator-guides/configuring-airbyte.md @@ -96,8 +96,8 @@ The following variables are relevant to both Docker and Kubernetes. ### Kubernetes-Only #### Jobs 1. `JOB_KUBE_TOLERATIONS` - Define one or more Job pod tolerations. Tolerations are separated by ';'. Each toleration contains k=v pairs mentioning some/all of key, effect, operator and value and separated by `,`. -2. `JOB_KUBE_NODE_SELECTORS` - Define one or more Job pod node selectors. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` -3. `JOB_KUBE_ANNOTATIONS` - Define one or more Job pod annotations. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` +2. `JOB_KUBE_NODE_SELECTORS` - Define one or more Job pod node selectors. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2`. It is the pod node selectors of the sync job and the default pod node selectors fallback for others jobs. +3. `JOB_KUBE_ANNOTATIONS` - Define one or more Job pod annotations. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2`. It is the pod annotations of the sync job and the default pod annotations fallback for others jobs. 4. `JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY` - Define the Job pod connector image pull policy. 5. `JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET` - Define the Job pod connector image pull secret. Useful when hosting private images. 6. `JOB_KUBE_SOCAT_IMAGE` - Define the Job pod socat image. @@ -107,7 +107,7 @@ The following variables are relevant to both Docker and Kubernetes. #### Jobs specific -A job specific variable overwrites the standard job variable. +A job specific variable overwrites the default sync job variable defined above. 1. `SPEC_JOB_KUBE_NODE_SELECTORS` - Define one or more pod node selectors for the spec job. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` 2. `CHECK_JOB_KUBE_NODE_SELECTORS` - Define one or more pod node selectors for the check job. Each k=v pair is separated by a `,`. For example: `key1=value1,key2=value2` From fd233a0107dce7e3cd9daa10e4ba3c2960804444 Mon Sep 17 00:00:00 2001 From: Thibaud Chardonnens Date: Mon, 28 Mar 2022 21:04:09 +0200 Subject: [PATCH 15/15] update javadoc --- .../models/src/main/java/io/airbyte/config/Configs.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index b47020d69d59..5a322f6f9237 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -269,6 +269,7 @@ public interface Configs { /** * Define one or more Job pod node selectors. Each kv-pair is separated by a `,`. + * Used for the sync job and as fallback in case job specific (spec, check, discover) node selectors are not defined. */ Map getJobKubeNodeSelectors(); @@ -289,6 +290,7 @@ public interface Configs { /** * Define one or more Job pod annotations. Each kv-pair is separated by a `,`. + * Used for the sync job and as fallback in case job specific (spec, check, discover) annotations are not defined. */ Map getJobKubeAnnotations();