-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
workflow-helper.ts
299 lines (270 loc) · 10.2 KB
/
workflow-helper.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
// Copyright 2019 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { PassThrough, Stream } from 'stream';
import { ClientOptions as MinioClientOptions } from 'minio';
import { getK8sSecret, getArgoWorkflow, getPodLogs } from './k8s-helper';
import { createMinioClient, MinioRequestConfig, getObjectStream } from './minio-helper';
export interface PartialArgoWorkflow {
status: {
artifactRepositoryRef?: ArtifactRepositoryRef;
nodes?: ArgoWorkflowStatusNode;
};
}
export interface ArtifactRepositoryRef {
artifactRepository?: ArtifactRepository;
}
export interface ArtifactRepository {
archiveLogs?: boolean;
s3?: S3Artifact;
}
export interface ArgoWorkflowStatusNode {
[key: string]: ArgoWorkflowStatusNodeInfo;
}
export interface ArgoWorkflowStatusNodeInfo {
outputs?: {
artifacts?: ArtifactRecord[];
};
}
export interface ArtifactRecord {
name?: string;
s3: S3Key;
}
export interface S3Key {
key: string;
}
export interface S3Artifact {
accessKeySecret?: SecretSelector;
bucket: string;
endpoint: string;
insecure: boolean;
key: string;
secretKeySecret?: SecretSelector;
}
export interface SecretSelector {
key: string;
name: string;
}
/**
* Compose a pod logs stream handler - i.e. a stream handler returns a stream
* containing the pod logs.
* @param handler a function that returns a stream.
* @param fallback a fallback function that returns a stream if the initial handler
* fails.
*/
export function composePodLogsStreamHandler<T = Stream>(
handler: (podName: string, createdAt: string, namespace?: string) => Promise<T>,
fallback?: (podName: string, createdAt: string, namespace?: string) => Promise<T>,
) {
return async (podName: string, createdAt: string, namespace?: string) => {
try {
return await handler(podName, createdAt, namespace);
} catch (err) {
if (fallback) {
return await fallback(podName, createdAt, namespace);
}
console.warn(err);
throw err;
}
};
}
/**
* Returns a stream containing the pod logs using kubernetes api.
* @param podName name of the pod.
* @param createdAt YYYY-MM-DD run was created. Not used.
* @param namespace namespace of the pod (uses the same namespace as the server if not provided).
* @param containerName container's name of the pod, the default value is 'main'.
*/
export async function getPodLogsStreamFromK8s(
podName: string,
createdAt: string,
namespace?: string,
containerName: string = 'main',
) {
const stream = new PassThrough();
stream.end(await getPodLogs(podName, namespace, containerName));
console.log(
`Getting logs for pod, ${podName}, in namespace, ${namespace}, by calling the Kubernetes API.`,
);
return stream;
}
/**
* Returns a stream containing the pod logs using the information provided in the
* workflow status (uses k8s api to retrieve the workflow and secrets).
* @param podName name of the pod.
* @param createdAt YYYY-MM-DD run was created. Not used.
* @param namespace namespace of the pod (uses the same namespace as the server if not provided).
*/
export const getPodLogsStreamFromWorkflow = toGetPodLogsStream(
getPodLogsMinioRequestConfigfromWorkflow,
);
/**
* Returns a function that retrieves the pod log streams using the provided
* getMinioRequestConfig function (a MinioRequestConfig object specifies the
* artifact bucket and key, with the corresponding minio client).
* @param getMinioRequestConfig function that returns a MinioRequestConfig based
* on the provided pod name and namespace (optional).
*/
export function toGetPodLogsStream(
getMinioRequestConfig: (
podName: string,
createdAt: string,
namespace?: string,
) => Promise<MinioRequestConfig>,
) {
return async (podName: string, createdAt: string, namespace?: string) => {
const request = await getMinioRequestConfig(podName, createdAt, namespace);
console.log(`Getting logs for pod, ${podName}, from ${request.bucket}/${request.key}.`);
return await getObjectStream(request);
};
}
/**
* Returns a MinioRequestConfig with the provided minio options (a MinioRequestConfig
* object contains the artifact bucket and keys, with the corresponding minio
* client).
* @param minioOptions Minio options to create a minio client.
* @param bucket bucket containing the pod logs artifacts.
* @param keyFormat the keyFormat for pod logs artifacts stored in the bucket.
*/
export function createPodLogsMinioRequestConfig(
minioOptions: MinioClientOptions,
bucket: string,
keyFormat: string,
) {
return async (
podName: string,
createdAt: string,
namespace: string = '',
): Promise<MinioRequestConfig> => {
// create a new client each time to ensure session token has not expired
const client = await createMinioClient(minioOptions, 's3');
const createdAtArray = createdAt.split('-');
let key: string = keyFormat
.replace(/\s+/g, '') // Remove all whitespace.
.replace('{{workflow.name}}', podName.replace(/-system-container-impl-.*/, ''))
.replace('{{workflow.creationTimestamp.Y}}', createdAtArray[0])
.replace('{{workflow.creationTimestamp.m}}', createdAtArray[1])
.replace('{{workflow.creationTimestamp.d}}', createdAtArray[2])
.replace('{{pod.name}}', podName)
.replace('{{workflow.namespace}}', namespace);
if (!key.endsWith('/')) {
key = key + '/';
}
key = key + 'main.log';
// If there are unresolved template tags in the keyFormat, throw an error
// that surfaces in the frontend's console log.
if (key.includes('{') || key.includes('}')) {
throw new Error(
`keyFormat, which is defined in config.ts or through the ARGO_KEYFORMAT env var, appears to include template tags that are not supported. ` +
`The resulting log key, ${key}, includes unresolved template tags and is therefore invalid.`,
);
}
const regex = /^[a-zA-Z0-9\-._/]+$/; // Allow letters, numbers, -, ., _, /
if (!regex.test(key)) {
throw new Error(
`The log key, ${key}, which is derived from keyFormat in config.ts or through the ARGO_KEYFORMAT env var, is an invalid path. ` +
`Supported characters include: letters, numbers, -, ., _, and /.`,
);
}
return { bucket, client, key };
};
}
/**
* Retrieves the bucket and pod log artifact key (as well as the
* minio client need to retrieve them) from the corresponding argo workflow status.
*
* @param podName name of the pod to retrieve the logs.
*/
export async function getPodLogsMinioRequestConfigfromWorkflow(
podName: string,
): Promise<MinioRequestConfig> {
let workflow: PartialArgoWorkflow;
// We should probably parameterize this replace statement. It's brittle to
// changes in implementation. But brittle is better than completely broken.
let workflowName = podName.replace(/-system-container-impl-.*/, '');
try {
workflow = await getArgoWorkflow(workflowName);
} catch (err) {
throw new Error(`Unable to retrieve workflow status: ${err}.`);
}
// archiveLogs can be set globally for the workflow as a whole and / or for
// each individual task. The compiler sets it globally so we look for it in
// the global field, which is documented here:
// https://argo-workflows.readthedocs.io/en/release-3.4/fields/#workflow
if (!workflow.status.artifactRepositoryRef?.artifactRepository?.archiveLogs) {
throw new Error('Unable to retrieve logs from artifact store; archiveLogs is disabled.');
}
let artifacts: ArtifactRecord[] | undefined;
if (workflow.status && workflow.status.nodes) {
const nodeName = podName.replace('-system-container-impl', '');
const node = workflow.status.nodes[nodeName];
artifacts = node?.outputs?.artifacts || undefined;
}
if (!artifacts) {
throw new Error('Unable to find corresponding log artifact in node.');
}
const logKey =
artifacts.find((artifact: ArtifactRecord) => artifact.name === 'main-logs')?.s3.key || false;
if (!logKey) {
throw new Error('No artifact named "main-logs" for node.');
}
const s3Artifact = workflow.status.artifactRepositoryRef.artifactRepository.s3 || false;
if (!s3Artifact) {
throw new Error('Unable to find artifact repository information from workflow status.');
}
const { host, port } = urlSplit(s3Artifact.endpoint, s3Artifact.insecure);
const { accessKey, secretKey } = await getMinioClientSecrets(s3Artifact);
const client = await createMinioClient(
{
accessKey,
// TODO: endPoint needs to be set to 'localhost' for local development.
// start-proxy-and-server.sh sets MINIO_HOST=localhost, but it doesn't
// seem to be respected when running the server in development mode.
// Investigate and fix this.
endPoint: host,
port,
secretKey,
useSSL: !s3Artifact.insecure,
},
's3',
);
return {
bucket: s3Artifact.bucket,
client,
key: logKey,
};
}
/**
* Returns the k8s access key and secret used to connect to the s3 artifactory.
* @param s3artifact s3artifact object describing the s3 artifactory config for argo workflow.
*/
async function getMinioClientSecrets({ accessKeySecret, secretKeySecret }: S3Artifact) {
if (!accessKeySecret || !secretKeySecret) {
return {};
}
const accessKey = await getK8sSecret(accessKeySecret.name, accessKeySecret.key);
const secretKey = await getK8sSecret(secretKeySecret.name, secretKeySecret.key);
return { accessKey, secretKey };
}
/**
* Split an uri into host and port.
* @param uri uri to split
* @param insecure if port is not provided in uri, return port depending on whether ssl is enabled.
*/
function urlSplit(uri: string, insecure: boolean) {
const chunks = uri.split(':');
if (chunks.length === 1) {
return { host: chunks[0], port: insecure ? 80 : 443 };
}
return { host: chunks[0], port: parseInt(chunks[1], 10) };
}