Skip to content

Commit

Permalink
feat(ElasticSearch): Postgres synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
Betree committed Dec 11, 2024
1 parent c564a5b commit b8517bb
Show file tree
Hide file tree
Showing 22 changed files with 737 additions and 149 deletions.
3 changes: 2 additions & 1 deletion config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"client": "MAILPIT_CLIENT"
},
"elasticSearch": {
"url": "ELASTICSEARCH_URL"
"url": "ELASTICSEARCH_URL",
"maxSyncDelay": "ELASTICSEARCH_MAX_SYNC_DELAY"
},
"database": {
"url": "PG_URL",
Expand Down
7 changes: 7 additions & 0 deletions config/default.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{
"port": "3060",
"services": {
"server": true,
"searchSync": true
},
"mailpit": {
"client": false
},
Expand All @@ -16,6 +20,9 @@
},
"readOnly": false
},
"elasticSearch": {
"maxSyncDelay": 5000
},
"maintenancedb": {
"url": "postgres://127.0.0.1:5432/postgres"
},
Expand Down
4 changes: 4 additions & 0 deletions config/test.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{
"port": "3061",
"services": {
"server": true,
"searchSync": false
},
"database": {
"url": "postgres://[email protected]:5432/opencollective_test"
},
Expand Down
21 changes: 21 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
"pg": "8.13.1",
"pg-connection-string": "2.7.0",
"pg-format": "1.0.4",
"pg-listen": "1.7.0",
"plaid": "29.0.0",
"prepend-http": "3.0.1",
"redis": "4.6.6",
Expand Down
54 changes: 45 additions & 9 deletions server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import config from 'config';
import express from 'express';
import throng from 'throng';

import { startElasticSearchPostgresSync, stopElasticSearchPostgresSync } from './lib/elastic-search/sync-postgres';
import expressLib from './lib/express';
import logger from './lib/logger';
import { updateCachedFidoMetadata } from './lib/two-factor-authentication/fido-metadata';
import { parseToBoolean } from './lib/utils';
import routes from './routes';

const workers = process.env.WEB_CONCURRENCY || 1;

async function start(i) {
async function startExpressServer(workerId) {
const expressApp = express();

await updateCachedFidoMetadata();
Expand All @@ -35,7 +37,7 @@ async function start(i) {
host,
server.address().port,
config.env,
i,
workerId,
);
});

Expand All @@ -45,15 +47,49 @@ async function start(i) {
return expressApp;
}

let app;
// Start the express server
let appPromise;
if (parseToBoolean(config.services.server)) {
if (['production', 'staging'].includes(config.env) && workers > 1) {
throng({ worker: startExpressServer, count: workers }); // TODO: Thong is not compatible with the shutdown logic below
} else {
appPromise = startExpressServer(1);
}
}

if (['production', 'staging'].includes(config.env) && workers > 1) {
throng({ worker: start, count: workers });
} else {
app = start(1);
// Start the search sync job
if (parseToBoolean(config.services.searchSync)) {
startElasticSearchPostgresSync();
}

let isShuttingDown = false;
const gracefullyShutdown = async signal => {
if (!isShuttingDown) {
logger.info(`Received ${signal}. Shutting down.`);
isShuttingDown = true;

if (appPromise) {
await appPromise.then(app => {
if (app.__server__) {
logger.info('Closing express server');
app.__server__.close();
}
});
}

if (parseToBoolean(config.services.searchSync)) {
await stopElasticSearchPostgresSync();
}

process.exit();
}
};

process.on('exit', () => gracefullyShutdown('exit'));
process.on('SIGINT', () => gracefullyShutdown('SIGINT'));
process.on('SIGTERM', () => gracefullyShutdown('SIGTERM'));

// This is used by tests
export default async function () {
return app ? app : start(1);
export default async function startServerForTest() {
return appPromise ? appPromise : parseToBoolean(config.services.server) ? startExpressServer(1) : null;
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import models, { Op } from '../../../models';
import { Op } from '../../../models';
import Collective from '../../../models/Collective';
import { stripHTMLOrEmpty } from '../../sanitize-html';
import { ElasticSearchIndexName } from '../constants';

import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter';
import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter';

export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapter {
public readonly model = models.Collective;
public readonly model = Collective;
public readonly index = ElasticSearchIndexName.COLLECTIVES;
public readonly mappings = {
properties: {
Expand All @@ -30,16 +31,8 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte
},
} as const;

public async findEntriesToIndex(
options: {
offset?: number;
limit?: number;
fromDate?: Date;
maxId?: number;
ids?: number[];
} = {},
): Promise<Array<InstanceType<typeof models.Collective>>> {
return models.Collective.findAll({
public async findEntriesToIndex(options: FindEntriesToIndexOptions = {}) {
return Collective.findAll({
attributes: Object.keys(this.mappings.properties),
order: [['id', 'DESC']],
limit: options.limit,
Expand All @@ -49,12 +42,21 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte
...(options.fromDate ? { updatedAt: options.fromDate } : null),
...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null),
...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null),
...(options.relatedToCollectiveIds?.length
? {
[Op.or]: [
{ id: options.relatedToCollectiveIds },
{ HostCollectiveId: options.relatedToCollectiveIds },
{ ParentCollectiveId: options.relatedToCollectiveIds },
],
}
: null),
},
});
}

public mapModelInstanceToDocument(
instance: InstanceType<typeof models.Collective>,
instance: InstanceType<typeof Collective>,
): Record<keyof (typeof this.mappings)['properties'], unknown> {
return {
id: instance.id,
Expand All @@ -72,7 +74,7 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte
isActive: instance.isActive,
isHostAccount: instance.isHostAccount,
deactivatedAt: instance.deactivatedAt,
HostCollectiveId: instance.HostCollectiveId,
HostCollectiveId: !instance.isActive ? null : instance.HostCollectiveId,
ParentCollectiveId: instance.ParentCollectiveId,
};
}
Expand Down
33 changes: 16 additions & 17 deletions server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ import { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { omit } from 'lodash';
import { Op } from 'sequelize';

import models from '../../../models';
import { CommentType } from '../../../models/Comment';
import Comment, { CommentType } from '../../../models/Comment';
import { stripHTMLOrEmpty } from '../../sanitize-html';
import { ElasticSearchIndexName } from '../constants';

import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter';
import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter';

export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter {
public readonly model = models.Comment;
public readonly model = Comment;
public readonly index = ElasticSearchIndexName.COMMENTS;
public readonly mappings = {
properties: {
Expand All @@ -29,16 +28,8 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter {
},
} as const;

public findEntriesToIndex(
options: {
offset?: number;
limit?: number;
fromDate?: Date;
maxId?: number;
ids?: number[];
} = {},
) {
return models.Comment.findAll({
public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) {
return Comment.findAll({
attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']),
order: [['id', 'DESC']],
limit: options.limit,
Expand All @@ -47,12 +38,20 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter {
...(options.fromDate ? { updatedAt: options.fromDate } : null),
...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null),
...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null),
...(options.relatedToCollectiveIds?.length
? {
[Op.or]: [
{ CollectiveId: options.relatedToCollectiveIds },
{ FromCollectiveId: options.relatedToCollectiveIds },
],
}
: null),
},
include: [
{
association: 'collective',
required: true,
attributes: ['HostCollectiveId', 'ParentCollectiveId'],
attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'],
},
{
association: 'expense',
Expand All @@ -69,7 +68,7 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter {
}

public mapModelInstanceToDocument(
instance: InstanceType<typeof models.Comment>,
instance: InstanceType<typeof Comment>,
): Record<keyof (typeof this.mappings)['properties'], unknown> {
return {
id: instance.id,
Expand All @@ -84,7 +83,7 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter {
HostCollectiveId:
instance.expense?.HostCollectiveId ??
instance.hostApplication?.HostCollectiveId ??
instance.collective.HostCollectiveId,
(!instance.collective.isActive ? null : instance.collective.HostCollectiveId),
};
}

Expand Down
Loading

0 comments on commit b8517bb

Please sign in to comment.