Skip to content

Commit

Permalink
migrate logstash plugin to new ES client (#98064)
Browse files Browse the repository at this point in the history
* migrate logstash plugin to new ES client

* handle 404s

* use default ES client
  • Loading branch information
pgayvallet committed Apr 27, 2021
1 parent 7b83495 commit dccb80d
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
* 2.0.
*/

import { estypes } from '@elastic/elasticsearch';
import { Cluster } from './cluster';

describe('cluster', () => {
describe('Cluster', () => {
describe('fromUpstreamJSON factory method', () => {
const upstreamJSON = {
cluster_uuid: 'S-S4NNZDRV-g9c-JrIhx6A',
};
} as estypes.RootNodeInfoResponse;

it('returns correct Cluster instance', () => {
const cluster = Cluster.fromUpstreamJSON(upstreamJSON);
Expand Down
11 changes: 5 additions & 6 deletions x-pack/plugins/logstash/server/models/cluster/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,27 @@
* 2.0.
*/

import { get } from 'lodash';
import { estypes } from '@elastic/elasticsearch';

/**
* This model deals with a cluster object from ES and converts it to Kibana downstream
*/
export class Cluster {
public readonly uuid: string;

constructor({ uuid }: { uuid: string }) {
this.uuid = uuid;
}

public get downstreamJSON() {
const json = {
return {
uuid: this.uuid,
};

return json;
}

// generate Pipeline object from elasticsearch response
static fromUpstreamJSON(upstreamCluster: Record<string, string>) {
const uuid = get(upstreamCluster, 'cluster_uuid') as string;
static fromUpstreamJSON(upstreamCluster: estypes.RootNodeInfoResponse) {
const uuid = upstreamCluster.cluster_uuid;
return new Cluster({ uuid });
}
}
31 changes: 3 additions & 28 deletions x-pack/plugins/logstash/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,11 @@
* 2.0.
*/

import {
CoreSetup,
CoreStart,
ILegacyCustomClusterClient,
Logger,
Plugin,
PluginInitializerContext,
} from 'src/core/server';
import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from 'src/core/server';
import { LicensingPluginSetup } from '../../licensing/server';
import { PluginSetupContract as FeaturesPluginSetup } from '../../features/server';
import { SecurityPluginSetup } from '../../security/server';

import { registerRoutes } from './routes';
import type { LogstashRequestHandlerContext } from './types';

interface SetupDeps {
licensing: LicensingPluginSetup;
Expand All @@ -28,16 +19,14 @@ interface SetupDeps {

export class LogstashPlugin implements Plugin {
private readonly logger: Logger;
private esClient?: ILegacyCustomClusterClient;
private coreSetup?: CoreSetup;

constructor(context: PluginInitializerContext) {
this.logger = context.logger.get();
}

setup(core: CoreSetup, deps: SetupDeps) {
this.logger.debug('Setting up Logstash plugin');

this.coreSetup = core;
registerRoutes(core.http.createRouter(), deps.security);

deps.features.registerElasticsearchFeature({
Expand All @@ -55,19 +44,5 @@ export class LogstashPlugin implements Plugin {
});
}

start(core: CoreStart) {
const esClient = core.elasticsearch.legacy.createClient('logstash');

this.coreSetup!.http.registerRouteHandlerContext<LogstashRequestHandlerContext, 'logstash'>(
'logstash',
async (context, request) => {
return { esClient: esClient.asScoped(request) };
}
);
}
stop() {
if (this.esClient) {
this.esClient.close();
}
}
start(core: CoreStart) {}
}
4 changes: 2 additions & 2 deletions x-pack/plugins/logstash/server/routes/cluster/load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ export function registerClusterLoadRoute(router: LogstashPluginRouter) {
},
wrapRouteWithLicenseCheck(checkLicense, async (context, request, response) => {
try {
const client = context.logstash!.esClient;
const info = await client.callAsCurrentUser('info');
const { client } = context.core.elasticsearch;
const { body: info } = await client.asCurrentUser.info();
return response.ok({
body: {
cluster: Cluster.fromUpstreamJSON(info).downstreamJSON,
Expand Down
18 changes: 11 additions & 7 deletions x-pack/plugins/logstash/server/routes/pipeline/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ export function registerPipelineDeleteRoute(router: LogstashPluginRouter) {
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;
const { id } = request.params;
const { client } = context.core.elasticsearch;

await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
method: 'DELETE',
});

return response.noContent();
try {
await client.asCurrentUser.logstash.deletePipeline({ id });
return response.noContent();
} catch (e) {
if (e.statusCode === 404) {
return response.notFound();
}
throw e;
}
})
)
);
Expand Down
12 changes: 6 additions & 6 deletions x-pack/plugins/logstash/server/routes/pipeline/load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ export function registerPipelineLoadRoute(router: LogstashPluginRouter) {
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;
const { id } = request.params;
const { client } = context.core.elasticsearch;

const result = await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
method: 'GET',
ignore: [404],
});
const { body: result } = await client.asCurrentUser.logstash.getPipeline(
{ id },
{ ignore: [404] }
);

if (result[request.params.id] === undefined) {
return response.notFound();
Expand Down
7 changes: 3 additions & 4 deletions x-pack/plugins/logstash/server/routes/pipeline/save.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ export function registerPipelineSaveRoute(
username = user?.username;
}

const client = context.logstash!.esClient;
const { client } = context.core.elasticsearch;
const pipeline = Pipeline.fromDownstreamJSON(request.body, request.params.id, username);

await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(pipeline.id),
method: 'PUT',
await client.asCurrentUser.logstash.putPipeline({
id: pipeline.id,
body: pipeline.upstreamJSON,
});

Expand Down
18 changes: 9 additions & 9 deletions x-pack/plugins/logstash/server/routes/pipelines/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
*/

import { schema } from '@kbn/config-schema';
import { LegacyAPICaller } from 'src/core/server';
import { ElasticsearchClient } from 'src/core/server';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';

import { checkLicense } from '../../lib/check_license';
import type { LogstashPluginRouter } from '../../types';

async function deletePipelines(callWithRequest: LegacyAPICaller, pipelineIds: string[]) {
async function deletePipelines(client: ElasticsearchClient, pipelineIds: string[]) {
const deletePromises = pipelineIds.map((pipelineId) => {
return callWithRequest('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(pipelineId),
method: 'DELETE',
})
.then((success) => ({ success }))
return client.logstash
.deletePipeline({
id: pipelineId,
})
.then((response) => ({ success: response.body }))
.catch((error) => ({ error }));
});

Expand All @@ -45,8 +45,8 @@ export function registerPipelinesDeleteRoute(router: LogstashPluginRouter) {
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash.esClient;
const results = await deletePipelines(client.callAsCurrentUser, request.body.pipelineIds);
const client = context.core.elasticsearch.client.asCurrentUser;
const results = await deletePipelines(client, request.body.pipelineIds);

return response.ok({ body: { results } });
})
Expand Down
23 changes: 12 additions & 11 deletions x-pack/plugins/logstash/server/routes/pipelines/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@
*/

import { i18n } from '@kbn/i18n';
import { LegacyAPICaller } from 'src/core/server';
import { ElasticsearchClient } from 'src/core/server';
import type { LogstashPluginRouter } from '../../types';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';

import { PipelineListItem } from '../../models/pipeline_list_item';
import { checkLicense } from '../../lib/check_license';

async function fetchPipelines(callWithRequest: LegacyAPICaller) {
const params = {
path: '/_logstash/pipeline',
method: 'GET',
ignore: [404],
};

return await callWithRequest('transport.request', params);
async function fetchPipelines(client: ElasticsearchClient) {
const { body } = await client.transport.request(
{
method: 'GET',
path: '/_logstash/pipeline',
},
{ ignore: [404] }
);
return body;
}

export function registerPipelinesListRoute(router: LogstashPluginRouter) {
Expand All @@ -33,8 +34,8 @@ export function registerPipelinesListRoute(router: LogstashPluginRouter) {
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
try {
const client = context.logstash!.esClient;
const pipelinesRecord = (await fetchPipelines(client.callAsCurrentUser)) as Record<
const { client } = context.core.elasticsearch;
const pipelinesRecord = (await fetchPipelines(client.asCurrentUser)) as Record<
string,
any
>;
Expand Down
5 changes: 1 addition & 4 deletions x-pack/plugins/logstash/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import type { ILegacyScopedClusterClient, IRouter, RequestHandlerContext } from 'src/core/server';
import type { IRouter, RequestHandlerContext } from 'src/core/server';
import type { LicensingApiRequestHandlerContext } from '../../licensing/server';

export interface PipelineListItemOptions {
Expand All @@ -19,9 +19,6 @@ export interface PipelineListItemOptions {
* @internal
*/
export interface LogstashRequestHandlerContext extends RequestHandlerContext {
logstash: {
esClient: ILegacyScopedClusterClient;
};
licensing: LicensingApiRequestHandlerContext;
}

Expand Down
4 changes: 2 additions & 2 deletions x-pack/test/api_integration/apis/logstash/cluster/load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import { FtrProviderContext } from '../../../ftr_provider_context';

export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const es = getService('legacyEs');
const es = getService('es');

describe('load', () => {
it('should return the ES cluster info', async () => {
const { body } = await supertest.get('/api/logstash/cluster').expect(200);

const responseFromES = await es.info();
const { body: responseFromES } = await es.info();
expect(body.cluster.uuid).to.eql(responseFromES.cluster_uuid);
});
});
Expand Down

0 comments on commit dccb80d

Please sign in to comment.