Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airbyte-workers: add support for kubernetes pod annotations #10753

Merged
merged 19 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -293,23 +292,45 @@ public interface Configs {

/**
* Define one or more Job pod node selectors. Each kv-pair is separated by a `,`.
* Used for the sync job and as fallback in case job specific (spec, check, discover) node selectors are not defined.
*/
Optional<Map<String, String>> getJobKubeNodeSelectors();
Map<String, String> getJobKubeNodeSelectors();

/**
* Define node selectors for Spec job pods specifically. Each kv-pair is separated by a `,`.
*/
Optional<Map<String, String>> getSpecJobKubeNodeSelectors();
Map<String, String> getSpecJobKubeNodeSelectors();

/**
* Define node selectors for Check job pods specifically. Each kv-pair is separated by a `,`.
*/
Optional<Map<String, String>> getCheckJobKubeNodeSelectors();
Map<String, String> getCheckJobKubeNodeSelectors();

/**
* Define node selectors for Discover job pods specifically. Each kv-pair is separated by a `,`.
*/
Optional<Map<String, String>> getDiscoverJobKubeNodeSelectors();
Map<String, String> getDiscoverJobKubeNodeSelectors();

/**
* Define one or more Job pod annotations. Each kv-pair is separated by a `,`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include in the comment that this annotation is used as a fallback in case other annotations aren't set, as well as, used as the sync job pod annotation.

* Used for the sync job and as fallback in case job specific (spec, check, discover) annotations are not defined.
*/
Map<String, String> getJobKubeAnnotations();

/**
* Define annotations for Spec job pods specifically. Each kv-pair is separated by a `,`.
*/
Map<String, String> getSpecJobKubeAnnotations();

/**
* Define annotations for Check job pods specifically. Each kv-pair is separated by a `,`.
*/
Map<String, String> getCheckJobKubeAnnotations();

/**
* Define annotations for Discover job pods specifically. Each kv-pair is separated by a `,`.
*/
Map<String, String> getDiscoverJobKubeAnnotations();

/**
* Define the Job pod connector image pull policy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class EnvConfigs implements Configs {
public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY";
public static final String JOB_KUBE_TOLERATIONS = "JOB_KUBE_TOLERATIONS";
public static final String JOB_KUBE_NODE_SELECTORS = "JOB_KUBE_NODE_SELECTORS";
public static final String JOB_KUBE_ANNOTATIONS = "JOB_KUBE_ANNOTATIONS";
public static final String JOB_KUBE_SOCAT_IMAGE = "JOB_KUBE_SOCAT_IMAGE";
public static final String JOB_KUBE_BUSYBOX_IMAGE = "JOB_KUBE_BUSYBOX_IMAGE";
public static final String JOB_KUBE_CURL_IMAGE = "JOB_KUBE_CURL_IMAGE";
Expand Down Expand Up @@ -127,6 +128,9 @@ public class EnvConfigs implements Configs {
public static final String SPEC_JOB_KUBE_NODE_SELECTORS = "SPEC_JOB_KUBE_NODE_SELECTORS";
public static final String CHECK_JOB_KUBE_NODE_SELECTORS = "CHECK_JOB_KUBE_NODE_SELECTORS";
public static final String DISCOVER_JOB_KUBE_NODE_SELECTORS = "DISCOVER_JOB_KUBE_NODE_SELECTORS";
public static final String SPEC_JOB_KUBE_ANNOTATIONS = "SPEC_JOB_KUBE_ANNOTATIONS";
public static final String CHECK_JOB_KUBE_ANNOTATIONS = "CHECK_JOB_KUBE_ANNOTATIONS";
public static final String DISCOVER_JOB_KUBE_ANNOTATIONS = "DISCOVER_JOB_KUBE_ANNOTATIONS";

private static final String REPLICATION_ORCHESTRATOR_CPU_REQUEST = "REPLICATION_ORCHESTRATOR_CPU_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_CPU_LIMIT = "REPLICATION_ORCHESTRATOR_CPU_LIMIT";
Expand Down Expand Up @@ -484,8 +488,8 @@ private TolerationPOJO parseToleration(final String tolerationStr) {
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(JOB_KUBE_NODE_SELECTORS, ""));
public Map<String, String> getJobKubeNodeSelectors() {
return splitKVPairsFromEnvString(getEnvOrDefault(JOB_KUBE_NODE_SELECTORS, ""));
}

/**
Expand All @@ -494,8 +498,8 @@ public Optional<Map<String, String>> getJobKubeNodeSelectors() {
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getSpecJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(SPEC_JOB_KUBE_NODE_SELECTORS, ""));
public Map<String, String> getSpecJobKubeNodeSelectors() {
return splitKVPairsFromEnvString(getEnvOrDefault(SPEC_JOB_KUBE_NODE_SELECTORS, ""));
}

/**
Expand All @@ -504,8 +508,8 @@ public Optional<Map<String, String>> getSpecJobKubeNodeSelectors() {
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getCheckJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(CHECK_JOB_KUBE_NODE_SELECTORS, ""));
public Map<String, String> getCheckJobKubeNodeSelectors() {
return splitKVPairsFromEnvString(getEnvOrDefault(CHECK_JOB_KUBE_NODE_SELECTORS, ""));
}

/**
Expand All @@ -514,28 +518,76 @@ public Optional<Map<String, String>> getCheckJobKubeNodeSelectors() {
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Optional<Map<String, String>> getDiscoverJobKubeNodeSelectors() {
return getNodeSelectorsFromEnvString(getEnvOrDefault(DISCOVER_JOB_KUBE_NODE_SELECTORS, ""));
public Map<String, String> getDiscoverJobKubeNodeSelectors() {
return splitKVPairsFromEnvString(getEnvOrDefault(DISCOVER_JOB_KUBE_NODE_SELECTORS, ""));
}

/**
* Parse string containing node selectors into a map. Each kv-pair is separated by a `,`
* Returns a map of annotations from its own environment variable. The value of the env is a string
* that represents one or more annotations. Each kv-pair is separated by a `,`
* <p>
* For example:- The following represents two node selectors
* For example:- The following represents two annotations
* <p>
* airbyte=server,type=preemptive
*
* @param envString string that represents one or more node selector labels.
* @return map containing kv pairs of annotations
*/
@Override
public Map<String, String> getJobKubeAnnotations() {
return splitKVPairsFromEnvString(getEnvOrDefault(JOB_KUBE_ANNOTATIONS, ""));
}

/**
* Returns a map of node selectors for Spec job pods specifically.
*
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
private Optional<Map<String, String>> getNodeSelectorsFromEnvString(final String envString) {
final Map<String, String> selectors = Splitter.on(",")
.splitToStream(envString)
@Override
public Map<String, String> getSpecJobKubeAnnotations() {
return splitKVPairsFromEnvString(getEnvOrDefault(SPEC_JOB_KUBE_ANNOTATIONS, ""));
}

/**
* Returns a map of node selectors for Check job pods specifically.
*
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Map<String, String> getCheckJobKubeAnnotations() {
return splitKVPairsFromEnvString(getEnvOrDefault(CHECK_JOB_KUBE_ANNOTATIONS, ""));
}

/**
* Returns a map of node selectors for Discover job pods specifically.
*
* @return map containing kv pairs of node selectors, or empty optional if none present.
*/
@Override
public Map<String, String> getDiscoverJobKubeAnnotations() {
return splitKVPairsFromEnvString(getEnvOrDefault(DISCOVER_JOB_KUBE_ANNOTATIONS, ""));
}

/**
* Splits key value pairs from the input string into a map. Each kv-pair is separated by a ','. The
* key and the value are separated by '='.
* <p>
* For example:- The following represents two map entries
* </p>
* key1=value1,key2=value2
*
* @param input string
* @return map containing kv pairs
*/
public Map<String, String> splitKVPairsFromEnvString(String input) {
if (input == null) {
input = "";
}
final Map<String, String> map = Splitter.on(",")
.splitToStream(input)
.filter(s -> !Strings.isNullOrEmpty(s) && s.contains("="))
.map(s -> s.split("="))
.collect(Collectors.toMap(s -> s[0], s -> s[1]));

return selectors.isEmpty() ? Optional.empty() : Optional.of(selectors);
.collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim()));
return map.isEmpty() ? null : map;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,76 +186,103 @@ void testworkerKubeTolerations() {
new TolerationPOJO("airbyte-server", "NoSchedule", "true", "Equals")));
}

@Test
void testSplitKVPairsFromEnvString() {
String input = "key1=value1,key2=value2";
Map<String, String> map = config.splitKVPairsFromEnvString(input);
assertNotNull(map);
assertEquals(2, map.size());
assertEquals(map, Map.of("key1", "value1", "key2", "value2"));

input = "key=k,,;$%&^#";
map = config.splitKVPairsFromEnvString(input);
assertNotNull(map);
assertEquals(map, Map.of("key", "k"));

input = null;
map = config.splitKVPairsFromEnvString(input);
assertNull(map);

input = " key1= value1, key2 = value2";
map = config.splitKVPairsFromEnvString(input);
assertNotNull(map);
assertEquals(map, Map.of("key1", "value1", "key2", "value2"));

input = "key1:value1,key2:value2";
map = config.splitKVPairsFromEnvString(input);
assertNull(map);
}

@Test
void testJobKubeNodeSelectors() {
envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, null);
assertFalse(config.getJobKubeNodeSelectors().isPresent());
assertNull(config.getJobKubeNodeSelectors());

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, ",,,");
assertFalse(config.getJobKubeNodeSelectors().isPresent());
assertNull(config.getJobKubeNodeSelectors());

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("key", "k"));
assertEquals(config.getJobKubeNodeSelectors(), Map.of("key", "k"));

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "one=two");
assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("one", "two"));
assertEquals(config.getJobKubeNodeSelectors(), Map.of("one", "two"));

envMap.put(EnvConfigs.JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
assertEquals(config.getJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
assertEquals(config.getJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testSpecKubeNodeSelectors() {
envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, null);
assertFalse(config.getSpecJobKubeNodeSelectors().isPresent());
assertNull(config.getSpecJobKubeNodeSelectors());

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, ",,,");
assertFalse(config.getSpecJobKubeNodeSelectors().isPresent());
assertNull(config.getSpecJobKubeNodeSelectors());

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("key", "k"));
assertEquals(config.getSpecJobKubeNodeSelectors(), Map.of("key", "k"));

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "one=two");
assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("one", "two"));
assertEquals(config.getSpecJobKubeNodeSelectors(), Map.of("one", "two"));

envMap.put(EnvConfigs.SPEC_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
assertEquals(config.getSpecJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
assertEquals(config.getSpecJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testCheckKubeNodeSelectors() {
envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, null);
assertFalse(config.getCheckJobKubeNodeSelectors().isPresent());
assertNull(config.getCheckJobKubeNodeSelectors());

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, ",,,");
assertFalse(config.getCheckJobKubeNodeSelectors().isPresent());
assertNull(config.getCheckJobKubeNodeSelectors());

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("key", "k"));
assertEquals(config.getCheckJobKubeNodeSelectors(), Map.of("key", "k"));

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "one=two");
assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("one", "two"));
assertEquals(config.getCheckJobKubeNodeSelectors(), Map.of("one", "two"));

envMap.put(EnvConfigs.CHECK_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
assertEquals(config.getCheckJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
assertEquals(config.getCheckJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
void testDiscoverKubeNodeSelectors() {
envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, null);
assertFalse(config.getDiscoverJobKubeNodeSelectors().isPresent());
assertNull(config.getDiscoverJobKubeNodeSelectors());

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, ",,,");
assertFalse(config.getDiscoverJobKubeNodeSelectors().isPresent());
assertNull(config.getDiscoverJobKubeNodeSelectors());

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "key=k,,;$%&^#");
assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("key", "k"));
assertEquals(config.getDiscoverJobKubeNodeSelectors(), Map.of("key", "k"));

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "one=two");
assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("one", "two"));
assertEquals(config.getDiscoverJobKubeNodeSelectors(), Map.of("one", "two"));

envMap.put(EnvConfigs.DISCOVER_JOB_KUBE_NODE_SELECTORS, "airbyte=server,something=nothing");
assertEquals(config.getDiscoverJobKubeNodeSelectors().get(), Map.of("airbyte", "server", "something", "nothing"));
assertEquals(config.getDiscoverJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

@Test
Expand Down
Loading