Skip to content

Commit

Permalink
fix(frontend): retrieve archived logs from correct location
Browse files Browse the repository at this point in the history
Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: owmasch <[email protected]>
  • Loading branch information
3 people committed Jul 16, 2024
1 parent 9cb5913 commit c885af7
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 30 deletions.
8 changes: 4 additions & 4 deletions frontend/server/configs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ export function loadConfigs(argv: string[], env: ProcessEnv): UIConfigs {
ARGO_ARCHIVE_ARTIFACTORY = 'minio',
/** Bucket to retrive logs from */
ARGO_ARCHIVE_BUCKETNAME = 'mlpipeline',
/** Prefix to logs. */
ARGO_ARCHIVE_PREFIX = 'logs',
/** This should match the keyFormat specified in the Argo workflow-controller-configmap */
ARGO_KEYFORMAT = 'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
/** Should use server API for log streaming? */
STREAM_LOGS_FROM_SERVER_API = 'false',
/** The main container name of a pod where logs are retrieved */
Expand Down Expand Up @@ -127,7 +127,7 @@ export function loadConfigs(argv: string[], env: ProcessEnv): UIConfigs {
archiveArtifactory: ARGO_ARCHIVE_ARTIFACTORY,
archiveBucketName: ARGO_ARCHIVE_BUCKETNAME,
archiveLogs: asBool(ARGO_ARCHIVE_LOGS),
archivePrefix: ARGO_ARCHIVE_PREFIX,
keyFormat: ARGO_KEYFORMAT,
},
pod: {
logContainerName: POD_LOG_CONTAINER_NAME,
Expand Down Expand Up @@ -253,7 +253,7 @@ export interface ArgoConfigs {
archiveLogs: boolean;
archiveArtifactory: string;
archiveBucketName: string;
archivePrefix: string;
keyFormat: string;
}
export interface ServerConfigs {
basePath: string;
Expand Down
13 changes: 7 additions & 6 deletions frontend/server/handlers/pod-logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ export function getPodLogsHandler(
},
podLogContainerName: string,
): Handler {
const { archiveLogs, archiveArtifactory, archiveBucketName, archivePrefix = '' } = argoOptions;
const { archiveLogs, archiveArtifactory, archiveBucketName, keyFormat } = argoOptions;

// get pod log from the provided bucket and prefix.
// get pod log from the provided bucket and keyFormat.
const getPodLogsStreamFromArchive = toGetPodLogsStream(
createPodLogsMinioRequestConfig(
archiveArtifactory === 'minio' ? artifactsOptions.minio : artifactsOptions.aws,
archiveBucketName,
archivePrefix,
keyFormat,
),
);

// get the pod log stream (with fallbacks).
const getPodLogsStream = composePodLogsStreamHandler(
(podName: string, namespace?: string) => {
return getPodLogsStreamFromK8s(podName, namespace, podLogContainerName);
(podName: string, createdAt: string, namespace?: string) => {
return getPodLogsStreamFromK8s(podName, createdAt, namespace, podLogContainerName);
},
// if archive logs flag is set, then final attempt will try to retrieve the artifacts
// from the bucket and prefix provided in the config. Otherwise, only attempts
Expand All @@ -69,13 +69,14 @@ export function getPodLogsHandler(
return;
}
const podName = decodeURIComponent(req.query.podname);
const createdAt = decodeURIComponent(req.query.createdat);

// This is optional.
// Note decodeURIComponent(undefined) === 'undefined', so I cannot pass the argument directly.
const podNamespace = decodeURIComponent(req.query.podnamespace || '') || undefined;

try {
const stream = await getPodLogsStream(podName, podNamespace);
const stream = await getPodLogsStream(podName, createdAt, podNamespace);
stream.on('error', err => {
if (
err?.message &&
Expand Down
2 changes: 1 addition & 1 deletion frontend/server/workflow-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ describe('workflow-helper', () => {
mockedClientGetObject.mockResolvedValueOnce(objStream);
objStream.end('some fake logs.');

const stream = await getPodLogsStreamFromWorkflow('workflow-name-abc');
const stream = await getPodLogsStreamFromWorkflow('workflow-name-abc', "2024-07-09");

expect(mockedGetArgoWorkflow).toBeCalledWith('workflow-name');

Expand Down
40 changes: 23 additions & 17 deletions frontend/server/workflow-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ export interface SecretSelector {
* fails.
*/
export function composePodLogsStreamHandler<T = Stream>(
handler: (podName: string, namespace?: string) => Promise<T>,
fallback?: (podName: string, namespace?: string) => Promise<T>,
handler: (podName: string, createdAt: string, namespace?: string) => Promise<T>,
fallback?: (podName: string, createdAt: string, namespace?: string) => Promise<T>,
) {
return async (podName: string, namespace?: string) => {
return async (podName: string, createdAt: string, namespace?: string) => {
try {
return await handler(podName, namespace);
return await handler(podName, createdAt, namespace);
} catch (err) {
if (fallback) {
return await fallback(podName, namespace);
return await fallback(podName, createdAt, namespace);
}
console.warn(err);
throw err;
Expand All @@ -85,12 +85,12 @@ export function composePodLogsStreamHandler<T = Stream>(
*/
export async function getPodLogsStreamFromK8s(
podName: string,
createdAt: string,
namespace?: string,
containerName: string = 'main',
) {
const stream = new PassThrough();
stream.end(await getPodLogs(podName, namespace, containerName));
console.log(`Getting logs for pod:${podName} in namespace ${namespace}.`);
return stream;
}

Expand All @@ -112,10 +112,10 @@ export const getPodLogsStreamFromWorkflow = toGetPodLogsStream(
* on the provided pod name and namespace (optional).
*/
export function toGetPodLogsStream(
getMinioRequestConfig: (podName: string, namespace?: string) => Promise<MinioRequestConfig>,
getMinioRequestConfig: (podName: string, createdAt: string, namespace?: string) => Promise<MinioRequestConfig>,
) {
return async (podName: string, namespace?: string) => {
const request = await getMinioRequestConfig(podName, namespace);
return async (podName: string, createdAt: string, namespace?: string) => {
const request = await getMinioRequestConfig(podName, createdAt, namespace);
console.log(`Getting logs for pod:${podName} from ${request.bucket}/${request.key}.`);
return await getObjectStream(request);
};
Expand All @@ -132,19 +132,25 @@ export function toGetPodLogsStream(
export function createPodLogsMinioRequestConfig(
minioOptions: MinioClientOptions,
bucket: string,
prefix: string,
keyFormat: string,
) {
// TODO: support pod log artifacts for diff namespace.
// different bucket/prefix for diff namespace?
return async (podName: string, _namespace?: string): Promise<MinioRequestConfig> => {
return async (podName: string, createdAt: string, _namespace?: string): Promise<MinioRequestConfig> => {
// create a new client each time to ensure session token has not expired
const client = await createMinioClient(minioOptions, 's3');
const workflowName = workflowNameFromPodName(podName);
return {
bucket,
client,
key: path.join(prefix, workflowName, podName, 'main.log'),
};
const createdAtArray = createdAt.split("-")
let key: string = keyFormat
.replace("{{workflow.name}}", podName.replace(/-system-container-impl-.*/, ''))
.replace("{{workflow.creationTimestamp.Y}}", createdAtArray[0])
.replace("{{workflow.creationTimestamp.m}}", createdAtArray[1])
.replace("{{workflow.creationTimestamp.d}}", createdAtArray[2])
.replace("{{pod.name}}", podName)
// TODO: Add namespace tag replacement.
key = key + "/main.log"
console.log("keyFormat: ", keyFormat)
console.log("key: ", key)
return { bucket, client, key };
};
}

Expand Down
3 changes: 2 additions & 1 deletion frontend/src/components/tabs/RuntimeNodeDetailsV2.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ async function getLogsInfo(execution: Execution, runId?: string): Promise<Map<st
let logsBannerMessage = '';
let logsBannerAdditionalInfo = '';
const customPropertiesMap = execution.getCustomPropertiesMap();
const createdAt = new Date(execution.getCreateTimeSinceEpoch()).toISOString().split('T')[0]

if (execution) {
podName = customPropertiesMap.get(KfpExecutionProperties.POD_NAME)?.getStringValue() || '';
Expand All @@ -321,7 +322,7 @@ async function getLogsInfo(execution: Execution, runId?: string): Promise<Map<st
}

try {
logsDetails = await Apis.getPodLogs(runId!, podName, podNameSpace);
logsDetails = await Apis.getPodLogs(runId!, podName, podNameSpace, createdAt);
logsInfo.set(LOGS_DETAILS, logsDetails);
} catch (err) {
let errMsg = await errorToMessage(err);
Expand Down
3 changes: 2 additions & 1 deletion frontend/src/lib/Apis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ export class Apis {
/**
* Get pod logs
*/
public static getPodLogs(runId: string, podName: string, podNamespace: string): Promise<string> {
public static getPodLogs(runId: string, podName: string, podNamespace: string, createdAt: string): Promise<string> {
let query = `k8s/pod/logs?podname=${encodeURIComponent(podName)}&runid=${encodeURIComponent(
runId,
)}`;
if (podNamespace) {
query += `&podnamespace=${encodeURIComponent(podNamespace)}`;
}
query += `&createdat=${encodeURIComponent(createdAt)}`;
return this._fetch(query);
}

Expand Down

0 comments on commit c885af7

Please sign in to comment.