Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ElasticSearch): Postgres synchronization #10521

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"client": "MAILPIT_CLIENT"
},
"elasticSearch": {
"url": "ELASTICSEARCH_URL"
"url": "ELASTICSEARCH_URL",
"maxSyncDelay": "ELASTICSEARCH_MAX_SYNC_DELAY"
},
"database": {
"url": "PG_URL",
Expand Down
7 changes: 7 additions & 0 deletions config/default.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{
"port": "3060",
"services": {
"server": true,
"searchSync": true
},
"mailpit": {
"client": false
},
Expand All @@ -16,6 +20,9 @@
},
"readOnly": false
},
"elasticSearch": {
"maxSyncDelay": 5000
},
"maintenancedb": {
"url": "postgres://127.0.0.1:5432/postgres"
},
Expand Down
4 changes: 4 additions & 0 deletions config/test.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{
"port": "3061",
"services": {
"server": true,
"searchSync": false
},
"database": {
"url": "postgres://[email protected]:5432/opencollective_test"
},
Expand Down
21 changes: 21 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
"pg": "8.13.1",
"pg-connection-string": "2.7.0",
"pg-format": "1.0.4",
"pg-listen": "1.7.0",
"plaid": "29.0.0",
"prepend-http": "3.0.1",
"redis": "4.6.6",
Expand Down
54 changes: 45 additions & 9 deletions server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import config from 'config';
import express from 'express';
import throng from 'throng';

import { startElasticSearchPostgresSync, stopElasticSearchPostgresSync } from './lib/elastic-search/sync-postgres';
import expressLib from './lib/express';
import logger from './lib/logger';
import { updateCachedFidoMetadata } from './lib/two-factor-authentication/fido-metadata';
import { parseToBoolean } from './lib/utils';
import routes from './routes';

const workers = process.env.WEB_CONCURRENCY || 1;

async function start(i) {
async function startExpressServer(workerId) {
const expressApp = express();

await updateCachedFidoMetadata();
Expand All @@ -35,7 +37,7 @@ async function start(i) {
host,
server.address().port,
config.env,
i,
workerId,
);
});

Expand All @@ -45,15 +47,49 @@ async function start(i) {
return expressApp;
}

let app;
// Start the express server
let appPromise;
if (parseToBoolean(config.services.server)) {
if (['production', 'staging'].includes(config.env) && workers > 1) {
throng({ worker: startExpressServer, count: workers }); // TODO: Thong is not compatible with the shutdown logic below
} else {
appPromise = startExpressServer(1);
}
}

if (['production', 'staging'].includes(config.env) && workers > 1) {
throng({ worker: start, count: workers });
} else {
app = start(1);
// Start the search sync job
if (parseToBoolean(config.services.searchSync)) {
startElasticSearchPostgresSync();
}

let isShuttingDown = false;
const gracefullyShutdown = async signal => {
if (!isShuttingDown) {
logger.info(`Received ${signal}. Shutting down.`);
isShuttingDown = true;

if (appPromise) {
await appPromise.then(app => {
if (app.__server__) {
logger.info('Closing express server');
app.__server__.close();
}
});
}

if (parseToBoolean(config.services.searchSync)) {
await stopElasticSearchPostgresSync();
}

process.exit();
}
};

process.on('exit', () => gracefullyShutdown('exit'));
process.on('SIGINT', () => gracefullyShutdown('SIGINT'));
process.on('SIGTERM', () => gracefullyShutdown('SIGTERM'));

// This is used by tests
export default async function () {
return app ? app : start(1);
export default async function startServerForTest() {
return appPromise ? appPromise : parseToBoolean(config.services.server) ? startExpressServer(1) : null;
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import models, { Op } from '../../../models';
import { Op } from '../../../models';
import Collective from '../../../models/Collective';
import { stripHTMLOrEmpty } from '../../sanitize-html';
import { ElasticSearchIndexName } from '../constants';

import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter';
import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter';

export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapter {
public readonly model = models.Collective;
public readonly model = Collective;
public readonly index = ElasticSearchIndexName.COLLECTIVES;
public readonly mappings = {
properties: {
Expand All @@ -30,16 +31,8 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte
},
} as const;

public async findEntriesToIndex(
options: {
offset?: number;
limit?: number;
fromDate?: Date;
maxId?: number;
ids?: number[];
} = {},
): Promise<Array<InstanceType<typeof models.Collective>>> {
return models.Collective.findAll({
public async findEntriesToIndex(options: FindEntriesToIndexOptions = {}) {
return Collective.findAll({
attributes: Object.keys(this.mappings.properties),
order: [['id', 'DESC']],
limit: options.limit,
Expand All @@ -49,12 +42,21 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte
...(options.fromDate ? { updatedAt: options.fromDate } : null),
...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null),
...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null),
...(options.relatedToCollectiveIds?.length
? {
[Op.or]: [
{ id: options.relatedToCollectiveIds },
{ HostCollectiveId: options.relatedToCollectiveIds },
{ ParentCollectiveId: options.relatedToCollectiveIds },
],
}
: null),
},
});
}

public mapModelInstanceToDocument(
instance: InstanceType<typeof models.Collective>,
instance: InstanceType<typeof Collective>,
): Record<keyof (typeof this.mappings)['properties'], unknown> {
return {
id: instance.id,
Expand All @@ -72,7 +74,7 @@ export class ElasticSearchCollectivesAdapter implements ElasticSearchModelAdapte
isActive: instance.isActive,
isHostAccount: instance.isHostAccount,
deactivatedAt: instance.deactivatedAt,
HostCollectiveId: instance.HostCollectiveId,
HostCollectiveId: !instance.isActive ? null : instance.HostCollectiveId,
ParentCollectiveId: instance.ParentCollectiveId,
};
}
Expand Down
33 changes: 16 additions & 17 deletions server/lib/elastic-search/adapters/ElasticSearchCommentsAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ import { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types';
import { omit } from 'lodash';
import { Op } from 'sequelize';

import models from '../../../models';
import { CommentType } from '../../../models/Comment';
import Comment, { CommentType } from '../../../models/Comment';
import { stripHTMLOrEmpty } from '../../sanitize-html';
import { ElasticSearchIndexName } from '../constants';

import { ElasticSearchModelAdapter } from './ElasticSearchModelAdapter';
import { ElasticSearchModelAdapter, FindEntriesToIndexOptions } from './ElasticSearchModelAdapter';

export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter {
public readonly model = models.Comment;
public readonly model = Comment;
public readonly index = ElasticSearchIndexName.COMMENTS;
public readonly mappings = {
properties: {
Expand All @@ -29,16 +28,8 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter {
},
} as const;

public findEntriesToIndex(
options: {
offset?: number;
limit?: number;
fromDate?: Date;
maxId?: number;
ids?: number[];
} = {},
) {
return models.Comment.findAll({
public findEntriesToIndex(options: FindEntriesToIndexOptions = {}) {
return Comment.findAll({
attributes: omit(Object.keys(this.mappings.properties), ['HostCollectiveId', 'ParentCollectiveId']),
order: [['id', 'DESC']],
limit: options.limit,
Expand All @@ -47,12 +38,20 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter {
...(options.fromDate ? { updatedAt: options.fromDate } : null),
...(options.maxId ? { id: { [Op.lte]: options.maxId } } : null),
...(options.ids?.length ? { id: { [Op.in]: options.ids } } : null),
...(options.relatedToCollectiveIds?.length
? {
[Op.or]: [
{ CollectiveId: options.relatedToCollectiveIds },
{ FromCollectiveId: options.relatedToCollectiveIds },
],
}
: null),
},
include: [
{
association: 'collective',
required: true,
attributes: ['HostCollectiveId', 'ParentCollectiveId'],
attributes: ['isActive', 'HostCollectiveId', 'ParentCollectiveId'],
},
{
association: 'expense',
Expand All @@ -69,7 +68,7 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter {
}

public mapModelInstanceToDocument(
instance: InstanceType<typeof models.Comment>,
instance: InstanceType<typeof Comment>,
): Record<keyof (typeof this.mappings)['properties'], unknown> {
return {
id: instance.id,
Expand All @@ -84,7 +83,7 @@ export class ElasticSearchCommentsAdapter implements ElasticSearchModelAdapter {
HostCollectiveId:
instance.expense?.HostCollectiveId ??
instance.hostApplication?.HostCollectiveId ??
instance.collective.HostCollectiveId,
(!instance.collective.isActive ? null : instance.collective.HostCollectiveId),
};
}

Expand Down
Loading
Loading