forked from thiagobbt/nx-remotecache-s3
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathindex.ts
130 lines (113 loc) · 4.81 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import defaultTaskRunner from '@nrwl/workspace/tasks-runners/default';
import { S3 } from '@aws-sdk/client-s3';
import { fromIni } from '@aws-sdk/credential-provider-ini';
import { fromEnv, ENV_KEY, ENV_SECRET } from '@aws-sdk/credential-provider-env';
import { join, dirname, relative } from 'path';
import { promises, readFileSync } from 'fs';
import mkdirp from 'mkdirp';
import { default as getStream } from 'get-stream'
import { ProviderError } from "@aws-sdk/property-provider";
export default function runner(
tasks: Parameters<typeof defaultTaskRunner>[0],
options: Parameters<typeof defaultTaskRunner>[1] & { bucket?: string, profile?: string, region?: string, fromEnv: boolean},
context: Parameters<typeof defaultTaskRunner>[2],
) {
if (!options.bucket) {
throw new Error('missing bucket property in runner options. Please update nx.json');
}
const areCredentialsInEnv = Boolean(process.env[ENV_KEY] && process.env[ENV_SECRET]);
console.log('>>>> Credentials from env?', areCredentialsInEnv);
const s3 = new S3({
region: options.region ?? 'us-east-1',
credentials: areCredentialsInEnv ? fromEnv() : fromIni({
profile: options.profile,
})
});
process.on('unhandledRejection', () => {});
process.on('rejectionHandled', () => {});
return defaultTaskRunner(tasks, { ...options, remoteCache: { retrieve, store } }, context);
async function retrieve(hash: string, cacheDirectory: string): Promise<boolean> {
try {
const commitFile = `${hash}.commit`;
try {
await s3.headObject({
Bucket: options.bucket,
Key: `${hash}.commit`,
});
} catch (e) {
if (e.name === 'NotFound') {
return false;
} else if (e instanceof ProviderError) {
return false;
} else {
throw e;
}
}
const filesOutput = await s3.listObjects({
Bucket: options.bucket,
Prefix: `${hash}/`
});
const files = filesOutput.Contents?.map(f => f.Key) || [];
await Promise.all(files.map(f => {
if (f) {
return download(f);
}
return null;
}));
await download(commitFile); // commit file after we're sure all content is downloaded
console.log(`retrieved ${files.length + 1} files from cache s3://${options.bucket}/${hash}`);
return true;
} catch (e) {
console.log(e);
console.log(`WARNING: failed to download cache from ${options.bucket}: ${e.message}`);
return false;
}
async function download(fileKey: string) {
const destination = join(cacheDirectory, fileKey);
await mkdirp(dirname(destination));
const fileOutput = await s3.getObject({
Bucket: options.bucket,
Key: fileKey
});
const fileStream = fileOutput.Body!;
let contentBuffer: Buffer | null = await getStream.buffer(fileStream as any);
if (fileOutput.Body) {
return promises.writeFile(destination, contentBuffer);
}
}
}
async function store(hash: string, cacheDirectory: string): Promise<boolean> {
const tasks: Promise<any>[] = [];
try {
await uploadDirectory(join(cacheDirectory, hash));
await Promise.all(tasks);
// commit file once we're sure all content is uploaded
await s3.putObject({
Bucket: options.bucket,
Key: `${hash}.commit`,
Body: readFileSync(join(cacheDirectory, `${hash}.commit`))
});
console.log(`stored ${tasks.length + 1} files in cache s3://${options.bucket}/${hash}`);
return true;
} catch (e) {
console.log(`WARNING: failed to upload cache to ${options.bucket}: ${e.message}`);
return false;
}
async function uploadDirectory(dir: string) {
for (const entry of await promises.readdir(dir)) {
const full = join(dir, entry);
const stats = await promises.stat(full);
if (stats.isDirectory()) {
await uploadDirectory(full);
} else if (stats.isFile()) {
const destination = relative(cacheDirectory, full);
tasks.push(s3.putObject({
Bucket: options.bucket,
Key: destination,
Body: readFileSync(full)
}));
}
}
}
}
}