-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathcatalog-worker.js
292 lines (258 loc) · 7.93 KB
/
catalog-worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
/**
* This background process polls S3 buckets for new imagery
*/
"use strict";
var path = require("path");
console.log("Starting catalog worker...");
require("dotenv").config({
path: path.resolve(process.cwd(), process.env.DOT_ENV_FILENAME || ".env"),
});
var _ = require("lodash");
var S3 = require("aws-sdk/clients/s3");
var async = require("async");
var config = require("./config");
var Conn = require("./services/db");
var analytics = require("./controllers/analytics");
var meta = require("./controllers/meta");
var Meta = require("./models/meta");
// Replace mongoose's deprecated promise library (mpromise) with bluebird
var mongoose = require("mongoose");
mongoose.Promise = require("bluebird");
var request = require("request");
var cron = require("node-cron");
var { Client: PgClient } = require("pg");
var db = new Conn();
db.start();
var consoleLog = function (err, msg) {
if (err) {
console.log(err);
}
console.log(msg);
};
/**
* Get the list of buckets from the master register
*
* @param {function} cb - Callback with response (buckets) that returns array
* of buckets.
*/
var getBucketList = function (cb) {
if (typeof config.oinRegisterUrl === "undefined") {
cb(null, [{ type: "s3", bucket_name: config.oinBucket }]);
} else {
request.get(
{
json: true,
uri: config.oinRegisterUrl,
},
function (err, res, remoteData) {
if (err) {
return cb(err);
}
if (res.statusCode !== 200) {
return console.error("Unable to get register list.");
}
var buckets = _.map(remoteData.nodes, function (node) {
return node.locations;
});
buckets = _.flatten(buckets);
cb(null, buckets);
}
);
}
};
/**
* Runs the readBuckets tasks in parallel and save analytics data when done
*
* @param {Array} tasks - The array of bucket read functions to be run in parallel
*/
var readBuckets = function (tasks) {
console.info("--- Started indexing all buckets ---");
async.parallelLimit(
tasks,
4,
// Results is an [[tasks]]
function (err, results) {
if (err) {
return console.error(err);
}
results = _.flatten(results);
results = results.map(function (task) {
return async.retryable(task);
});
async.parallelLimit(results, 5, function (err, results) {
if (err) {
return console.error(err);
}
console.info("--- Finished indexing all buckets ---");
// Get image, sensor, and provider counts and save to analytics collection
return Promise.all([
Meta.count(),
Meta.distinct("properties.sensor"),
Meta.distinct("provider"),
])
.then(function (res) {
var counts = {};
counts.image_count = res[0];
counts.sensor_count = res[1].length;
counts.provider_count = res[2].length;
analytics.addAnalyticsRecord(counts, function (err) {
// Catch error in record addition
if (err) {
console.error(err);
}
console.info("--- Added new analytics record ---");
});
// Catch error in db query promises
})
.catch(function (err) {
return console.error(err);
});
});
}
);
};
// Read bucket method for S3. It reads the S3 bucket and adds/updates *_metadata.json to Meta model
var readBucket = function (bucket, lastSystemUpdate, errCb, done) {
console.info("--- Reading from bucket: " + bucket.bucket_name + " ---");
let bucketDetails = {
Bucket: bucket.bucket_name,
};
if (bucket.bucket_name === config.oinBucket) {
bucketDetails.Prefix = config.oinBucketPrefix;
}
var s3 = new S3();
s3.listObjects(bucketDetails, function (err, data) {
if (err) {
errCb(err);
done(err);
return;
}
var tasks = [];
data.Contents.forEach(function (item) {
if (item.Key.includes("_meta.json")) {
// Get the last time the metadata file was modified so we can determine
// if we need to update it.
var lastModified = item.LastModified;
var url = `https://${config.s3PublicDomain}/${bucket.bucket_name}/${item.Key}`;
var task = function (done) {
meta.addRemoteMeta(url, lastModified, lastSystemUpdate, done);
};
tasks.push(task);
}
});
done(null, tasks);
});
};
// The main function to get the registered buckets, read them and update metadata
var getListAndReadBuckets = function () {
// Start off by getting the last time the system was updated.
analytics.getLastUpdateTime(function (err, lastSystemUpdate) {
if (err) {
return console.error(err);
}
console.info("Last system update time:", lastSystemUpdate);
getBucketList(function (err, buckets) {
if (err) {
return console.error(err.stack);
}
// Generate array of tasks to run in parallel
var tasks = _.map(buckets, function (bucket) {
return function (done) {
if (bucket.type === "s3") {
readBucket(bucket, lastSystemUpdate, consoleLog, done);
} else {
console.error("Unknown bucket type: " + bucket.type);
}
};
});
// Read the buckets and store metadata
readBuckets(tasks);
});
});
};
// Kick it all off
cron.schedule(config.cronTime, function () {
console.log("Running a catalog worker (cron time: " + config.cronTime + ")");
getListAndReadBuckets();
});
const {
PGHOST,
PGPORT,
PGUSER,
PGPASSWORD,
PGDATABASE,
PG_CRON_TIME = "* * * * *",
} = process.env;
const isPgEnabled = [PGHOST, PGUSER, PGPASSWORD, PGDATABASE, PGPORT].every(
Boolean
);
let pgConnection;
async function pgCreateConnection() {
if (pgConnection) {
return pgConnection;
}
const connection = new PgClient({
user: PGUSER,
password: PGPASSWORD,
database: PGDATABASE,
port: 5432,
host: PGHOST,
ssl: true,
});
try {
await connection.connect();
} catch (error) {
console.error(error);
}
pgConnection = connection;
return pgConnection;
}
// This is a task scheduled by cron run that copies all images metadata from
// mongodb in postgres. It is required to run mosaic server that relies on
// postgres db with postgis extension.
if (isPgEnabled) {
cron.schedule(PG_CRON_TIME, async function () {
const records = await new Promise((resolve, reject) => {
Meta.find({}, null, {}).exec((err, records) => {
if (err) {
reject(err);
return;
}
resolve(records);
});
});
const pgConnection = await pgCreateConnection();
const mosaicLayerId = config.oamMosacLayerId;
try {
await pgConnection.query("begin");
await pgConnection.query(
`delete from layers_features where layer_id = (select id from layers where public_id = '${mosaicLayerId}')`
);
// TODO: there should be a better way to do bulk insert
const queryText = `insert into public.layers_features (feature_id, layer_id, properties, geom, last_updated, zoom) values ($1, (select id from layers where public_id = '${mosaicLayerId}'), $2, ST_Transform(ST_GeomFromGeoJSON($3), 4326), now(), 999)`;
for (const record of records) {
const queryValues = [
record._id,
JSON.stringify({
...record.properties,
gsd: record.gsd,
uuid: record.uuid,
uploaded_at: record.uploaded_at,
acquisition_start: record.acquisition_start,
acquisition_end: record.acquisition_end,
}),
JSON.stringify(record.geojson),
];
await pgConnection.query(queryText, queryValues);
}
await pgConnection.query("commit");
} catch (err) {
console.error(err);
await pgConnection.query("rollback");
}
});
} else {
console.warn(
"The Postgres credentials not defined, skip mosaic index updating"
);
}