Skip to content

Commit

Permalink
Fixing flow control metrics during server restarts.
Browse files Browse the repository at this point in the history
  • Loading branch information
ritwiksahani committed Nov 20, 2024
1 parent 6d14da9 commit 3e29800
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.twill.internal.CompositeService;
Expand All @@ -101,6 +100,9 @@
* events topic
*/
public class ProgramNotificationSubscriberService extends AbstractIdleService {

private static final Logger LOG =
LoggerFactory.getLogger(ProgramNotificationSubscriberService.class);
private final MessagingService messagingService;
private final CConfiguration cConf;
private final MetricsCollectionService metricsCollectionService;
Expand Down Expand Up @@ -145,6 +147,8 @@ protected void startUp() throws Exception {
List<Service> children = new ArrayList<>();
String topicPrefix = cConf.get(Constants.AppFabric.PROGRAM_STATUS_EVENT_TOPIC);
int numPartitions = cConf.getInt(Constants.AppFabric.PROGRAM_STATUS_EVENT_NUM_PARTITIONS);
// Active runs should be restored only once not for every shard that is created.
restoreActiveRuns();
// Add bare one - we always listen to it
children.add(createChildService("program.status", topicPrefix));
// If number of partitions is more than 1 - create partitioned services
Expand All @@ -153,8 +157,60 @@ protected void startUp() throws Exception {
.forEach(i -> children.add(createChildService("program.status." + i, topicPrefix + i)));
}
delegate = new CompositeService(children);

delegate.startAndWait();
// Explicitly emit both launching and running counts on startup.
emitFlowControlMetrics();
}

private void emitFlowControlMetrics() {
runRecordMonitorService.emitLaunchingMetrics();
runRecordMonitorService.emitRunningMetrics();
}

private void restoreActiveRuns() {
LOG.info("Restoring active runs");
int batchSize = cConf.getInt(Constants.RuntimeMonitor.INIT_BATCH_SIZE);
RetryStrategy retryStrategy =
RetryStrategies.fromConfiguration(cConf, Constants.Service.RUNTIME_MONITOR_RETRY_PREFIX);
long startTs = System.currentTimeMillis();

Retries.runWithRetries(
() ->
store.scanActiveRuns(
batchSize,
(runRecordDetail) -> {
if (runRecordDetail.getStartTs() > startTs) {
return;
}
try {
LOG.info("Found active run: {}", runRecordDetail.getProgramRunId());
if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) {
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
} else if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) {
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
// It is unknown what is the state of program runs in STARTING state.
// A STARTING message is published again to retry STARTING logic.
ProgramOptions programOptions =
new SimpleProgramOptions(
runRecordDetail.getProgramRunId().getParent(),
new BasicArguments(runRecordDetail.getSystemArgs()),
new BasicArguments(runRecordDetail.getUserArgs()));
LOG.debug("Retrying to start run {}.", runRecordDetail.getProgramRunId());
programStateWriter.start(
runRecordDetail.getProgramRunId(),
programOptions,
null,
this.store.loadProgram(runRecordDetail.getProgramRunId().getParent()));
}
} catch (Exception e) {
ProgramRunId programRunId = runRecordDetail.getProgramRunId();
LOG.warn(
"Retrying to start run {} failed. Marking it as failed.", programRunId, e);
programStateWriter.error(programRunId, e);
}
}),
retryStrategy,
e -> true);
}

@Override
Expand All @@ -178,7 +234,6 @@ private ProgramNotificationSingleTopicSubscriberService createChildService(
provisioningService,
programStateWriter,
transactionRunner,
store,
runRecordMonitorService,
name,
topicName,
Expand All @@ -195,7 +250,7 @@ class ProgramNotificationSingleTopicSubscriberService
extends AbstractNotificationSubscriberService {

private static final Logger LOG =
LoggerFactory.getLogger(ProgramNotificationSubscriberService.class);
LoggerFactory.getLogger(ProgramNotificationSingleTopicSubscriberService.class);

private static final Gson GSON =
ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).create();
Expand All @@ -220,8 +275,6 @@ class ProgramNotificationSingleTopicSubscriberService
private final Queue<Runnable> tasks;
private final MetricsCollectionService metricsCollectionService;
private Set<ProgramCompletionNotifier> programCompletionNotifiers;
private final CConfiguration cConf;
private final Store store;
private final RunRecordMonitorService runRecordMonitorService;
private final boolean checkTxSeparation;

Expand All @@ -234,7 +287,6 @@ class ProgramNotificationSingleTopicSubscriberService
ProvisioningService provisioningService,
ProgramStateWriter programStateWriter,
TransactionRunner transactionRunner,
Store store,
RunRecordMonitorService runRecordMonitorService,
String name,
String topicName,
Expand All @@ -259,8 +311,6 @@ class ProgramNotificationSingleTopicSubscriberService
this.metricsCollectionService = metricsCollectionService;
this.programCompletionNotifiers = programCompletionNotifiers;
this.runRecordMonitorService = runRecordMonitorService;
this.cConf = cConf;
this.store = store;

// If number of partitions equals 1, DB deadlock cannot happen as a result of concurrent
// modifications to
Expand All @@ -273,55 +323,6 @@ class ProgramNotificationSingleTopicSubscriberService
@Override
protected void doStartUp() throws Exception {
super.doStartUp();

int batchSize = cConf.getInt(Constants.RuntimeMonitor.INIT_BATCH_SIZE);
RetryStrategy retryStrategy =
RetryStrategies.fromConfiguration(cConf, Constants.Service.RUNTIME_MONITOR_RETRY_PREFIX);
long startTs = System.currentTimeMillis();

AtomicBoolean launching = new AtomicBoolean(false);
Retries.runWithRetries(
() ->
store.scanActiveRuns(
batchSize,
(runRecordDetail) -> {
if (runRecordDetail.getStartTs() > startTs) {
return;
}
try {
if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) {
launching.set(true);
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
} else if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) {
launching.set(true);
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId());
// It is unknown what is the state of program runs in STARTING state.
// A STARTING message is published again to retry STARTING logic.
ProgramOptions programOptions =
new SimpleProgramOptions(
runRecordDetail.getProgramRunId().getParent(),
new BasicArguments(runRecordDetail.getSystemArgs()),
new BasicArguments(runRecordDetail.getUserArgs()));
LOG.debug("Retrying to start run {}.", runRecordDetail.getProgramRunId());
programStateWriter.start(
runRecordDetail.getProgramRunId(),
programOptions,
null,
this.store.loadProgram(runRecordDetail.getProgramRunId().getParent()));
}
} catch (Exception e) {
ProgramRunId programRunId = runRecordDetail.getProgramRunId();
LOG.warn(
"Retrying to start run {} failed. Marking it as failed.", programRunId, e);
programStateWriter.error(programRunId, e);
}
}),
retryStrategy,
e -> true);
if (!launching.get()) {
// there is no launching pipeline
runRecordMonitorService.emitLaunchingMetrics(0);
}
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.Metrics.FlowControl;
import io.cdap.cdap.proto.ProgramRunStatus;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProgramRunId;
Expand Down Expand Up @@ -184,15 +185,31 @@ public void removeRequest(ProgramRunId programRunId, boolean emitRunningChange)
}

if (emitRunningChange) {
emitMetrics(Constants.Metrics.FlowControl.RUNNING_COUNT, getProgramsRunningCount());
emitRunningMetrics();
}
}

public void emitLaunchingMetrics(long value) {
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, value);
}

/**
* Emit the {@link Constants.Metrics.FlowControl#LAUNCHING_COUNT} metric for runs.
*/
public void emitLaunchingMetrics() {
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size());
}


/**
* Emit the {@link Constants.Metrics.FlowControl#RUNNING_COUNT} metric for runs.
*/
public void emitRunningMetrics() {
emitMetrics(FlowControl.RUNNING_COUNT, getProgramsRunningCount());
}

private void emitMetrics(String metricName, long value) {
LOG.debug("Setting metric {} to value {}", metricName, value);
metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value);
}

Expand All @@ -208,11 +225,11 @@ private void cleanupQueue() {
// Queue head might have already been removed. So instead of calling poll, we call remove.
if (launchingQueue.remove(programRunId)) {
LOG.info("Removing request with runId {} due to expired retention time.", programRunId);
emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size());
}
}

emitMetrics(Constants.Metrics.FlowControl.RUNNING_COUNT, getProgramsRunningCount());
// Always emit both metrics after cleanup.
emitLaunchingMetrics();
emitRunningMetrics();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ protected void configure() {
});
}

public static <T extends Service> T getService(Class<T> clazz) {
for (Service service : services) {
if (clazz.isAssignableFrom(service.getClass())) {
return (T) service;
}
}
return null;
}

public static Injector getInjector(CConfiguration cConf, Module overrides) {
return getInjector(cConf, null, overrides);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.Metrics.FlowControl;
import io.cdap.cdap.common.id.Id;
import io.cdap.cdap.common.utils.ProjectInfo;
import io.cdap.cdap.common.utils.Tasks;
Expand Down Expand Up @@ -308,6 +309,66 @@ public void testMetricsEmit() throws Exception {
metricStore.deleteAll();
}

@Test
public void testLaunchingCountMetricsOnRestart() throws Exception {
AppFabricTestHelper.deployApplication(Id.Namespace.DEFAULT, ProgramStateWorkflowApp.class, null,
cConf);
ApplicationDetail appDetail = AppFabricTestHelper.getAppInfo(Id.Namespace.DEFAULT,
ProgramStateWorkflowApp.class.getSimpleName(), cConf);

ProgramRunId workflowRunId = NamespaceId.DEFAULT
.app(ProgramStateWorkflowApp.class.getSimpleName(), appDetail.getAppVersion())
.workflow(ProgramStateWorkflowApp.ProgramStateWorkflow.class.getSimpleName())
.run(RunIds.generate());

ApplicationSpecification appSpec = TransactionRunners.run(transactionRunner, context -> {
return AppMetadataStore.create(context).getApplication(workflowRunId.getParent().getParent())
.getSpec();
});

ProgramDescriptor programDescriptor = new ProgramDescriptor(workflowRunId.getParent(), appSpec);

// Start and run the workflow
Map<String, String> systemArgs = new HashMap<>();
systemArgs.put(ProgramOptionConstants.SKIP_PROVISIONING, Boolean.TRUE.toString());
systemArgs.put(SystemArguments.PROFILE_NAME, ProfileId.NATIVE.getScopedName());
TransactionRunners.run(transactionRunner, context -> {
programStateWriter.start(workflowRunId, new SimpleProgramOptions(workflowRunId.getParent(),
new BasicArguments(systemArgs),
new BasicArguments()), null, programDescriptor);
});
checkProgramStatus(appSpec.getArtifactId(), workflowRunId, ProgramRunStatus.STARTING);

ProgramNotificationSubscriberService notificationService = AppFabricTestHelper.getService(
ProgramNotificationSubscriberService.class);
// Restart the Notification service.
notificationService.stopAndWait();
notificationService.startUp();

MetricStore metricStore = injector.getInstance(MetricStore.class);
// Wait for metrics to be written.
Tasks.waitFor(1L, () -> queryMetrics(metricStore,
SYSTEM_METRIC_PREFIX + FlowControl.LAUNCHING_COUNT, new HashMap<>()), 10, TimeUnit.SECONDS);
Assert.assertEquals(0L, queryMetrics(metricStore,
SYSTEM_METRIC_PREFIX + FlowControl.RUNNING_COUNT, new HashMap<>()));

TransactionRunners.run(transactionRunner, context -> {
programStateWriter.running(workflowRunId, null);
});
checkProgramStatus(appSpec.getArtifactId(), workflowRunId, ProgramRunStatus.RUNNING);
// Restart the Notification service.
notificationService.stopAndWait();
notificationService.startUp();

// Running counts are not based on metadata store in RunRecordMonitorService so not asserting it
// here.
Tasks.waitFor(0L, () -> queryMetrics(metricStore,
SYSTEM_METRIC_PREFIX + FlowControl.LAUNCHING_COUNT, new HashMap<>()), 10, TimeUnit.SECONDS);

// Stop restarted services and cleanup metrics.
metricStore.deleteAll();
}

private Map<String, String> getAdditionalTagsForProgramMetrics(ProgramRunStatus existingStatus, String provisioner,
ProgramRunClusterStatus clusterStatus) {
Map<String, String> additionalTags = new HashMap<>();
Expand Down Expand Up @@ -460,8 +521,13 @@ private long getMetric(MetricStore metricStore, ProgramRunId programRunId, Profi
.put(Constants.Metrics.Tag.PROGRAM, programRunId.getProgram())
.putAll(additionalTags)
.build();
return queryMetrics(metricStore, metricName, tags);
}

private long queryMetrics(MetricStore metricStore, String metricName,
Map<String, String> tags) {
MetricDataQuery query = new MetricDataQuery(0, 0, Integer.MAX_VALUE, metricName, AggregationFunction.SUM,
tags, new ArrayList<>());
tags, new ArrayList<>());
Collection<MetricTimeSeries> result = metricStore.query(query);
if (result.isEmpty()) {
return 0;
Expand Down

0 comments on commit 3e29800

Please sign in to comment.