Skip to content

Commit

Permalink
[CDAP-20913] Fix bug where runs fails when appfabric is restarted
Browse files Browse the repository at this point in the history
  • Loading branch information
rmstar committed Jan 16, 2024
1 parent 6f245ef commit d5a75c3
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ public Map<RunId, RuntimeInfo> list(ProgramType type) {

// Goes through all live application and fill the twillProgramInfo table
for (TwillRunner.LiveInfo liveInfo : twillRunner.lookupLive()) {
String appName = liveInfo.getApplicationName();
ProgramId programId = TwillAppNames.fromTwillAppName(appName, false);
ProgramId programId = TwillAppNames.fromTwillAppName(liveInfo.getApplicationName(),
false, liveInfo.getApplicationVersion());
if (programId == null) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,9 @@ private Builder.RunnableSetter localizeFiles(Map<String, LocalizeResource> local
public String getRunId() {
return programRunId.getRun();
}

@Override
public String getApplicationVersion() {
return programRunId.getVersion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ public static ProgramId fromTwillAppName(String twillAppName) {
*/
@Nullable
public static ProgramId fromTwillAppName(String twillAppName, boolean mustMatch) {
return fromTwillAppName(twillAppName, mustMatch, null);
}

/**
* Given a Twill app name and version, returns the id of the program that was used to construct
* this Twill app name.
*
* @return {@code null} if mustMatch is false, and if the specified Twill app name does not match
* the {@link #APP_NAME_PATTERN}. For instance, for the Constants.Service.MASTER_SERVICES
* Twill app, it will return null.
* @throws IllegalArgumentException if the given app name does not match the {@link
* #APP_NAME_PATTERN} and mustMatch is true.
*/
@Nullable
public static ProgramId fromTwillAppName(String twillAppName, boolean mustMatch,
@Nullable String version) {
Matcher matcher = APP_NAME_PATTERN.matcher(twillAppName);
if (!matcher.matches()) {
Preconditions.checkArgument(!mustMatch,
Expand All @@ -79,6 +95,8 @@ public static ProgramId fromTwillAppName(String twillAppName, boolean mustMatch)
"Expected matcher for '%s' to have 4 groups, but it had %s groups.",
twillAppName, matcher.groupCount());
ProgramType type = ProgramType.valueOf(matcher.group(1).toUpperCase());
return new ProgramId(matcher.group(2), matcher.group(3), type, matcher.group(4));
return version != null ?
new ProgramId(matcher.group(2), matcher.group(3), version, type, matcher.group(4)) :
new ProgramId(matcher.group(2), matcher.group(3), type, matcher.group(4));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package io.cdap.cdap.common.twill;

import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.ProgramId;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -46,4 +48,14 @@ public void test() {
Assert.assertTrue(e.getMessage().contains("does not match pattern for programs"));
}
}

@Test
public void testAppWithVersion() {
String appVersion = UUID.randomUUID().toString();
ProgramId expected = new ProgramId("default", "app", appVersion,
ProgramType.SPARK, "DataPipelineWorkflow");
ProgramId programId = TwillAppNames.fromTwillAppName("spark.default.app.DataPipelineWorkflow",
false, appVersion);
Assert.assertEquals(programId, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.cdap.cdap.master.spi.namespace.NamespaceDetail;
import io.cdap.cdap.master.spi.namespace.NamespaceListener;
import io.cdap.cdap.master.spi.twill.ExtendedTwillApplication;
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.proto.id.NamespaceId;
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.custom.Quantity;
Expand Down Expand Up @@ -64,9 +65,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -121,6 +120,7 @@ public class KubeTwillRunnerService implements TwillRunnerService, NamespaceList
private static final Logger LOG = LoggerFactory.getLogger(KubeTwillRunnerService.class);

static final String APP_LABEL = "cdap.twill.app";
static final String APP_VERSION = "cdap.twill.app.version";
private static final String CDAP_NAMESPACE_LABEL = "cdap.namespace";
private static final String NAMESPACE_CPU_LIMIT_PROPERTY = "k8s.namespace.cpu.limits";
private static final String NAMESPACE_MEMORY_LIMIT_PROPERTY = "k8s.namespace.memory.limits";
Expand Down Expand Up @@ -209,15 +209,22 @@ public TwillPreparer prepare(TwillRunnable runnable,
public TwillPreparer prepare(TwillApplication application) {
TwillSpecification spec = application.configure();
RunId runId;
String appVersion;
if (application instanceof ExtendedTwillApplication) {
runId = RunIds.fromString(((ExtendedTwillApplication) application).getRunId());
appVersion = ((ExtendedTwillApplication) application).getApplicationVersion();
} else {
// Version is not set for system apps
appVersion = null;
runId = RunIds.generate();
}
Location appLocation = getApplicationLocation(spec.getName(), runId);
Map<String, String> labels = new HashMap<>(extraLabels);
labels.put(RUNNER_LABEL, RUNNER_LABEL_VAL);
labels.put(APP_LABEL, spec.getName());
if (appVersion != null && !appVersion.equals(ApplicationId.DEFAULT_VERSION)) {
labels.put(APP_VERSION, appVersion);
}
labels.put(RUN_ID_LABEL, runId.getId());

return new KubeTwillPreparer(masterEnvContext, apiClient, kubeNamespace, podInfo,
Expand All @@ -232,8 +239,9 @@ public TwillPreparer prepare(TwillApplication application) {
//since monitor is disabled, we fire and forget
return controller;
}

KubeLiveInfo liveInfo = liveInfos.computeIfAbsent(spec.getName(),
n -> new KubeLiveInfo(resourceType, n));
n -> new KubeLiveInfo(resourceType, n, appVersion));
return liveInfo.addControllerIfAbsent(runId, timeout, timeoutUnit, controller, meta);
} finally {
liveInfoLock.unlock();
Expand Down Expand Up @@ -923,6 +931,7 @@ private final class AppResourceChangeListener<T extends KubernetesObject> implem
public void resourceAdded(T resource) {
V1ObjectMeta metadata = resource.getMetadata();
String appName = metadata.getAnnotations().get(APP_LABEL);
String appVersion = metadata.getLabels().getOrDefault(APP_VERSION, null);
if (appName == null) {
// This shouldn't happen. Just to guard against future bug.
return;
Expand All @@ -946,7 +955,7 @@ public void resourceAdded(T resource) {
liveInfoLock.lock();
try {
KubeLiveInfo liveInfo = liveInfos.computeIfAbsent(appName,
k -> new KubeLiveInfo(resource.getClass(), appName));
k -> new KubeLiveInfo(resource.getClass(), appName, appVersion));
KubeTwillController controller = createKubeTwillController(appName, runId,
resource.getClass(), metadata);
liveInfo.addControllerIfAbsent(runId, startTimeoutMillis, TimeUnit.MILLISECONDS, controller,
Expand Down Expand Up @@ -1084,12 +1093,15 @@ private final class KubeLiveInfo implements LiveInfo {

private final Type resourceType;
private final String applicationName;
@Nullable
private final String applicationVersion;
private final Map<String, KubeTwillController> controllers;

KubeLiveInfo(Type resourceType, String applicationName) {
KubeLiveInfo(Type resourceType, String applicationName, @Nullable String appVersion) {
this.resourceType = resourceType;
this.applicationName = applicationName;
this.controllers = new ConcurrentSkipListMap<>();
this.applicationVersion = appVersion;
}

KubeTwillController addControllerIfAbsent(RunId runId, long timeout, TimeUnit timeoutUnit,
Expand Down Expand Up @@ -1124,6 +1136,12 @@ public String getApplicationName() {
return applicationName;
}

@Override
@Nullable
public String getApplicationVersion() {
return applicationVersion;
}

@Override
public Iterable<TwillController> getControllers() {
// Protect against modifications
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public interface ExtendedTwillApplication extends TwillApplication {

String getRunId();

String getApplicationVersion();

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@
<tephra.version>0.15.0-incubating</tephra.version>
<tez.version>0.8.4</tez.version>
<thrift.version>0.9.3</thrift.version>
<twill.version>1.3.1</twill.version>
<twill.version>1.4.0-SNAPSHOT</twill.version>
<unboundid.version>2.3.6</unboundid.version>
<zookeeper.version>3.4.5</zookeeper.version>
<embedded-postgres.version>1.3.1</embedded-postgres.version>
Expand Down

0 comments on commit d5a75c3

Please sign in to comment.