Skip to content

Commit

Permalink
feat(elasticsearch-plugin): Add health check
Browse files Browse the repository at this point in the history
Relates to #289
  • Loading branch information
michaelbromley committed May 5, 2020
1 parent 97b5aed commit 47a8cb9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 9 deletions.
36 changes: 36 additions & 0 deletions packages/elasticsearch-plugin/src/elasticsearch.health.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Injectable } from '@nestjs/common';
import { HealthCheckError, HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus';

import { ElasticsearchService } from './elasticsearch.service';

@Injectable()
export class ElasticsearchHealthIndicator extends HealthIndicator {
constructor(private elasticsearchService: ElasticsearchService) {
super();
}

async isHealthy(): Promise<HealthIndicatorResult> {
let isHealthy = false;
let error = '';
try {
await this.elasticsearchService.checkConnection();
isHealthy = true;
} catch (e) {
error = e.message;
}
const result = this.getStatus('elasticsearch', isHealthy, { message: error });
if (isHealthy) {
return result;
}
this.throwHealthCheckError(result);
}

startupCheckFailed(message: string): never {
const result = this.getStatus('elasticsearch', false, { message });
return this.throwHealthCheckError(result);
}

private throwHealthCheckError(result: HealthIndicatorResult): never {
throw new HealthCheckError('Elasticsearch not available', result);
}
}
29 changes: 20 additions & 9 deletions packages/elasticsearch-plugin/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
CollectionModificationEvent,
DeepRequired,
EventBus,
HealthCheckRegistryService,
ID,
idsAreEqual,
Logger,
Expand All @@ -21,6 +22,7 @@ import { ELASTIC_SEARCH_OPTIONS, loggerCtx } from './constants';
import { CustomMappingsResolver } from './custom-mappings.resolver';
import { ElasticsearchIndexService } from './elasticsearch-index.service';
import { AdminElasticSearchResolver, ShopElasticSearchResolver } from './elasticsearch-resolver';
import { ElasticsearchHealthIndicator } from './elasticsearch.health';
import { ElasticsearchService } from './elasticsearch.service';
import { generateSchemaExtensions } from './graphql-schema-extensions';
import { ElasticsearchIndexerController } from './indexer.controller';
Expand Down Expand Up @@ -192,6 +194,7 @@ import { ElasticsearchOptions, mergeWithDefaults } from './options';
providers: [
ElasticsearchIndexService,
ElasticsearchService,
ElasticsearchHealthIndicator,
{ provide: ELASTIC_SEARCH_OPTIONS, useFactory: () => ElasticsearchPlugin.options },
],
adminApiExtensions: { resolvers: [AdminElasticSearchResolver] },
Expand All @@ -217,6 +220,8 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
private eventBus: EventBus,
private elasticsearchService: ElasticsearchService,
private elasticsearchIndexService: ElasticsearchIndexService,
private elasticsearchHealthIndicator: ElasticsearchHealthIndicator,
private healthCheckRegistryService: HealthCheckRegistryService,
) {}

/**
Expand All @@ -235,28 +240,34 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
} catch (e) {
Logger.error(`Could not connect to Elasticsearch instance at "${host}:${port}"`, loggerCtx);
Logger.error(JSON.stringify(e), loggerCtx);
this.healthCheckRegistryService.registerIndicatorFunction(() =>
this.elasticsearchHealthIndicator.startupCheckFailed(e.message),
);
return;
}
Logger.info(`Sucessfully connected to Elasticsearch instance at "${host}:${port}"`, loggerCtx);

await this.elasticsearchService.createIndicesIfNotExists();
this.elasticsearchIndexService.initJobQueue();
this.healthCheckRegistryService.registerIndicatorFunction(() =>
this.elasticsearchHealthIndicator.isHealthy(),
);

this.eventBus.ofType(ProductEvent).subscribe((event) => {
this.eventBus.ofType(ProductEvent).subscribe(event => {
if (event.type === 'deleted') {
return this.elasticsearchIndexService.deleteProduct(event.ctx, event.product);
} else {
return this.elasticsearchIndexService.updateProduct(event.ctx, event.product);
}
});
this.eventBus.ofType(ProductVariantEvent).subscribe((event) => {
this.eventBus.ofType(ProductVariantEvent).subscribe(event => {
if (event.type === 'deleted') {
return this.elasticsearchIndexService.deleteVariant(event.ctx, event.variants);
} else {
return this.elasticsearchIndexService.updateVariants(event.ctx, event.variants);
}
});
this.eventBus.ofType(AssetEvent).subscribe((event) => {
this.eventBus.ofType(AssetEvent).subscribe(event => {
if (event.type === 'updated') {
return this.elasticsearchIndexService.updateAsset(event.ctx, event.asset);
}
Expand All @@ -265,7 +276,7 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
}
});

this.eventBus.ofType(ProductChannelEvent).subscribe((event) => {
this.eventBus.ofType(ProductChannelEvent).subscribe(event => {
if (event.type === 'assigned') {
return this.elasticsearchIndexService.assignProductToChannel(
event.ctx,
Expand All @@ -286,18 +297,18 @@ export class ElasticsearchPlugin implements OnVendureBootstrap {
collectionModification$
.pipe(
buffer(closingNotifier$),
filter((events) => 0 < events.length),
map((events) => ({
filter(events => 0 < events.length),
map(events => ({
ctx: events[0].ctx,
ids: events.reduce((ids, e) => [...ids, ...e.productVariantIds], [] as ID[]),
})),
filter((e) => 0 < e.ids.length),
filter(e => 0 < e.ids.length),
)
.subscribe((events) => {
.subscribe(events => {
return this.elasticsearchIndexService.updateVariantsById(events.ctx, events.ids);
});

this.eventBus.ofType(TaxRateModificationEvent).subscribe((event) => {
this.eventBus.ofType(TaxRateModificationEvent).subscribe(event => {
const defaultTaxZone = event.ctx.channel.defaultTaxZone;
if (defaultTaxZone && idsAreEqual(defaultTaxZone.id, event.taxRate.zone.id)) {
return this.elasticsearchService.updateAll(event.ctx);
Expand Down

0 comments on commit 47a8cb9

Please sign in to comment.