Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0XP-1634: simplify pipeline retry logic #2118

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ export class PipelineAPISubservice {
(await this.localFileService?.hasCachedLoad(pipelineId)) ?? false,
cachedBytes:
(await this.localFileService?.getCachedLoadSize(pipelineId)) ?? 0,
loading: !!pipelineSlot.loadPromise,
loading: pipelineSlot.loading,
latestAtoms,
lastLoad,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
PipelineLoadSummary
} from "@pcd/passport-interface";
import { RollbarService } from "@pcd/server-shared";
import { sleep, str } from "@pcd/util";
import { str } from "@pcd/util";
import _ from "lodash";
import { PoolClient } from "postgres-pool";
import { IPipelineAtomDB } from "../../../database/queries/pipelineAtomDB";
Expand All @@ -14,7 +14,6 @@ import {
} from "../../../database/sqlQuery";
import { ApplicationContext } from "../../../types";
import { logger } from "../../../util/logger";
import { isAbortError } from "../../../util/util";
import { DiscordService } from "../../discordService";
import {
LocalFileService,
Expand Down Expand Up @@ -51,7 +50,7 @@ export class PipelineExecutorSubservice {
* 3. wait until the time is at least {@link PIPELINE_REFRESH_INTERVAL_MS} milliseconds after the last load started
* 4. go back to step one
*/
private static readonly PIPELINE_REFRESH_INTERVAL_MS = 60_000;
private static readonly PIPELINE_REFRESH_INTERVAL_MS = 60_000; // 1 minute

/**
* Podbox maintains an instance of a {@link PipelineSlot} for each pipeline
Expand Down Expand Up @@ -191,7 +190,8 @@ export class PipelineExecutorSubservice {
owner: await this.userSubservice.getUserById(
client,
definition.ownerUserId
)
),
loading: false
};
this.pipelineSlots.push(pipelineSlot);
} else {
Expand All @@ -204,13 +204,10 @@ export class PipelineExecutorSubservice {
tracePipeline(pipelineSlot.definition);
traceUser(pipelineSlot.owner);

const existingInstance = pipelineSlot.instance;
const stopPipeline = async (): Promise<void> => {
if (existingInstance && !existingInstance.isStopped()) {
span?.setAttribute("stopping", true);
await existingInstance.stop();
}
};
if (pipelineSlot.instance && !pipelineSlot.instance.isStopped()) {
span?.setAttribute("stopping", true);
await pipelineSlot.instance.stop();
}

pipelineSlot.instance = await instantiatePipeline(
this.context,
Expand All @@ -221,9 +218,9 @@ export class PipelineExecutorSubservice {
pipelineSlot.definition = definition;

if (dontLoad !== true) {
await this.performPipelineLoad(pipelineSlot, stopPipeline);
} else {
await stopPipeline();
this.performPipelineLoad(pipelineSlot).catch((e) => {
logger(LOG_TAG, "failed to perform pipeline load", e);
});
}
});
}
Expand Down Expand Up @@ -254,6 +251,7 @@ export class PipelineExecutorSubservice {
this.pipelineSlots.map(async (entry) => {
if (entry.instance && !entry.instance.isStopped()) {
await entry.instance.stop();
entry.loading = false;
}
})
);
Expand All @@ -269,7 +267,8 @@ export class PipelineExecutorSubservice {
owner: await this.userSubservice.getUserById(
client,
pipelineDefinition.ownerUserId
)
),
loading: false
});

// attempt to instantiate a {@link Pipeline}
Expand Down Expand Up @@ -313,8 +312,7 @@ export class PipelineExecutorSubservice {
* If load result caching is enabled globally and for this pipeline, takes care of that too.
*/
public async performPipelineLoad(
pipelineSlot: PipelineSlot,
loadStarted?: () => Promise<void>
pipelineSlot: PipelineSlot
): Promise<PipelineLoadSummary> {
return traced<PipelineLoadSummary>(
SERVICE_NAME,
Expand Down Expand Up @@ -363,8 +361,7 @@ export class PipelineExecutorSubservice {
this.userSubservice,
this.discordService,
this.pagerdutyService,
this.rollbarService,
loadStarted
this.rollbarService
);

if (cachedLoadFromDisk) {
Expand Down Expand Up @@ -438,40 +435,7 @@ export class PipelineExecutorSubservice {

await Promise.allSettled(
this.pipelineSlots.map(async (slot: PipelineSlot): Promise<void> => {
try {
if (slot.loadPromise) {
await slot.loadPromise;
} else {
await this.performPipelineLoad(slot);
}
} catch (e) {
// an abort means a podbox user either deleted or edited (and thus restarted)
// this pipeline. in the case that it was deleted, `slot.loadPromise` will be
// set to `undefined`. In the case that it was edited, Podbox will restart the
// pipeline, and set `slot.loadPromise` to a new promise representing the
// pipeline's load operation.
if (isAbortError(e)) {
while (slot.loadPromise) {
try {
await slot.loadPromise;
slot.loadPromise = undefined;
break;
} catch (e) {
if (isAbortError(e)) {
await sleep(50);
}
}
}
// no load promise means we're done here
return;
}

logger(
LOG_TAG,
`failed to perform pipeline load for pipeline ${slot.definition.id}`,
e
);
}
await this.performPipelineLoad(slot);
})
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ export class PipelineSubservice {
await this.pipelineDB.deleteDefinition(client, pipelineId);
await this.pipelineDB.saveLoadSummary(pipelineId, undefined);
await this.pipelineAtomDB.clear(pipelineId);
await this.executorSubservice.restartPipeline(client, pipelineId);
await this.executorSubservice.restartPipeline(client, pipelineId, true);
await this.localFileService?.clearPipelineCache(pipelineId);
});
}
Expand All @@ -308,7 +308,7 @@ export class PipelineSubservice {
await this.localFileService?.clearPipelineCache(pipelineId);
await this.pipelineDB.saveLoadSummary(pipelineId, undefined);
await this.pipelineAtomDB.clear(pipelineId);
await this.executorSubservice.restartPipeline(client, pipelineId);
await this.executorSubservice.restartPipeline(client, pipelineId, true);
}

public async getPipelineEditHistory(
Expand Down Expand Up @@ -429,7 +429,7 @@ export class PipelineSubservice {
return {
extraInfo: {
ownerEmail: owner?.email,
loading: !!slot.loadPromise,
loading: slot.loading,
lastLoad: summary,
hasCachedLoad:
(await this.localFileService?.hasCachedLoad(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { getActiveSpan } from "@opentelemetry/api/build/src/trace/context-utils";
import { PipelineLoadSummary } from "@pcd/passport-interface";
import { RollbarService } from "@pcd/server-shared";
import { sleep } from "@pcd/util";
import { Pool } from "postgres-pool";
import { sqlTransaction } from "../../../../database/sqlQuery";
import { logger } from "../../../../util/logger";
Expand All @@ -21,6 +22,7 @@ import { UserSubservice } from "../UserSubservice";
import { maybeAlertForPipelineRun } from "./maybeAlertForPipelineRun";

const LOG_TAG = `[performPipelineLoad]`;
const PIPELINE_LOAD_TIMEOUT_MS = 1000 * 60 * 10; // 10 minutes

/**
* Performs a {@link Pipeline#load} for the given {@link Pipeline}, and
Expand All @@ -34,8 +36,7 @@ export async function performPipelineLoad(
userSubservice: UserSubservice,
discordService: DiscordService | null,
pagerdutyService: PagerDutyService | null,
rollbarService: RollbarService | null,
loadStarted?: () => Promise<void>
rollbarService: RollbarService | null
): Promise<PipelineLoadSummary> {
const startTime = new Date();
const pipelineId = pipelineSlot.definition.id;
Expand Down Expand Up @@ -110,20 +111,18 @@ export async function performPipelineLoad(
`loading data for pipeline with id '${pipelineId}'` +
` of type '${pipelineSlot.definition.type}'`
);
const loadPromise = pipeline.load();
pipelineSlot.loadPromise = loadPromise;
try {
// we `.stop()` the previous instance of this pipeline
// here, so that by the time the `AbortError` is thrown/caught,
// we're ready for it with the above `pipelineSlot.loadPromise`
// which can be awaited again, until the load either succeeds
// or fails with an error other than `AbortError`.
await loadStarted?.();
} catch (e) {
logger(LOG_TAG, "failed to run loadStarted callback", e);
}
const summary = await loadPromise;
pipelineSlot.loadPromise = undefined;

pipelineSlot.loading = true;

// in case a pipeline hangs, continue after 10 minutes.
const summary = await Promise.race([
pipeline.load(),
(async (): Promise<PipelineLoadSummary> => {
await sleep(PIPELINE_LOAD_TIMEOUT_MS);
throw new Error(`TIME_OUT: ${pipeline.id}`);
})()
]);

logger(
LOG_TAG,
`successfully loaded data for pipeline with id '${pipelineId}'` +
Expand Down Expand Up @@ -170,5 +169,7 @@ export async function performPipelineLoad(
discordService
);
return summary;
} finally {
pipelineSlot.loading = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ export async function upsertPipelineDefinition(
// so that the `AbortError` that is thrown by the `stop()` method can be
// handled properly upstream.
if (existingSlot.instance && !existingSlot.instance.isStopped()) {
existingSlot.loadPromise = undefined;
await existingSlot.instance?.stop();
existingSlot.loading = false;
}

if (validatedNewDefinition.options.disableCache) {
Expand All @@ -213,6 +213,7 @@ export async function upsertPipelineDefinition(
}
}

existingSlot.loading = true;
existingSlot.owner = await userSubservice.getUserById(
client,
validatedNewDefinition.ownerUserId
Expand All @@ -223,7 +224,8 @@ export async function upsertPipelineDefinition(
// which can take an arbitrary amount of time.
const restartPromise = executorSubservice.restartPipeline(
client,
validatedNewDefinition.id
validatedNewDefinition.id,
true
);

// To get accurate timestamps, we need to load the pipeline definition
Expand Down
7 changes: 2 additions & 5 deletions apps/passport-server/src/services/generic-issuance/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import {
PipelineDefinition,
PipelineLoadSummary
} from "@pcd/passport-interface";
import { PipelineDefinition } from "@pcd/passport-interface";
import { PipelineCapability } from "./capabilities/types";
import { Pipeline, PipelineUser } from "./pipelines/types";

Expand Down Expand Up @@ -41,5 +38,5 @@ export interface PipelineSlot {
owner?: PipelineUser;
loadIncidentId?: string;
lastLoadDiscordMsgTimestamp?: Date;
loadPromise?: Promise<PipelineLoadSummary>;
loading: boolean;
}
Loading