Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] migrate logstash plugin to new ES client (#98064) #98447

Merged
merged 1 commit into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
* 2.0.
*/

import { estypes } from '@elastic/elasticsearch';
import { Cluster } from './cluster';

describe('cluster', () => {
describe('Cluster', () => {
describe('fromUpstreamJSON factory method', () => {
const upstreamJSON = {
cluster_uuid: 'S-S4NNZDRV-g9c-JrIhx6A',
};
} as estypes.RootNodeInfoResponse;

it('returns correct Cluster instance', () => {
const cluster = Cluster.fromUpstreamJSON(upstreamJSON);
Expand Down
11 changes: 5 additions & 6 deletions x-pack/plugins/logstash/server/models/cluster/cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,27 @@
* 2.0.
*/

import { get } from 'lodash';
import { estypes } from '@elastic/elasticsearch';

/**
* This model deals with a cluster object from ES and converts it to Kibana downstream
*/
export class Cluster {
public readonly uuid: string;

constructor({ uuid }: { uuid: string }) {
this.uuid = uuid;
}

public get downstreamJSON() {
const json = {
return {
uuid: this.uuid,
};

return json;
}

// generate Pipeline object from elasticsearch response
static fromUpstreamJSON(upstreamCluster: Record<string, string>) {
const uuid = get(upstreamCluster, 'cluster_uuid') as string;
static fromUpstreamJSON(upstreamCluster: estypes.RootNodeInfoResponse) {
const uuid = upstreamCluster.cluster_uuid;
return new Cluster({ uuid });
}
}
31 changes: 3 additions & 28 deletions x-pack/plugins/logstash/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,11 @@
* 2.0.
*/

import {
CoreSetup,
CoreStart,
ILegacyCustomClusterClient,
Logger,
Plugin,
PluginInitializerContext,
} from 'src/core/server';
import { CoreSetup, CoreStart, Logger, Plugin, PluginInitializerContext } from 'src/core/server';
import { LicensingPluginSetup } from '../../licensing/server';
import { PluginSetupContract as FeaturesPluginSetup } from '../../features/server';
import { SecurityPluginSetup } from '../../security/server';

import { registerRoutes } from './routes';
import type { LogstashRequestHandlerContext } from './types';

interface SetupDeps {
licensing: LicensingPluginSetup;
Expand All @@ -28,16 +19,14 @@ interface SetupDeps {

export class LogstashPlugin implements Plugin {
private readonly logger: Logger;
private esClient?: ILegacyCustomClusterClient;
private coreSetup?: CoreSetup;

constructor(context: PluginInitializerContext) {
this.logger = context.logger.get();
}

setup(core: CoreSetup, deps: SetupDeps) {
this.logger.debug('Setting up Logstash plugin');

this.coreSetup = core;
registerRoutes(core.http.createRouter(), deps.security);

deps.features.registerElasticsearchFeature({
Expand All @@ -55,19 +44,5 @@ export class LogstashPlugin implements Plugin {
});
}

start(core: CoreStart) {
const esClient = core.elasticsearch.legacy.createClient('logstash');

this.coreSetup!.http.registerRouteHandlerContext<LogstashRequestHandlerContext, 'logstash'>(
'logstash',
async (context, request) => {
return { esClient: esClient.asScoped(request) };
}
);
}
stop() {
if (this.esClient) {
this.esClient.close();
}
}
start(core: CoreStart) {}
}
4 changes: 2 additions & 2 deletions x-pack/plugins/logstash/server/routes/cluster/load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ export function registerClusterLoadRoute(router: LogstashPluginRouter) {
},
wrapRouteWithLicenseCheck(checkLicense, async (context, request, response) => {
try {
const client = context.logstash!.esClient;
const info = await client.callAsCurrentUser('info');
const { client } = context.core.elasticsearch;
const { body: info } = await client.asCurrentUser.info();
return response.ok({
body: {
cluster: Cluster.fromUpstreamJSON(info).downstreamJSON,
Expand Down
18 changes: 11 additions & 7 deletions x-pack/plugins/logstash/server/routes/pipeline/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ export function registerPipelineDeleteRoute(router: LogstashPluginRouter) {
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;
const { id } = request.params;
const { client } = context.core.elasticsearch;

await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
method: 'DELETE',
});

return response.noContent();
try {
await client.asCurrentUser.logstash.deletePipeline({ id });
return response.noContent();
} catch (e) {
if (e.statusCode === 404) {
return response.notFound();
}
throw e;
}
})
)
);
Expand Down
12 changes: 6 additions & 6 deletions x-pack/plugins/logstash/server/routes/pipeline/load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ export function registerPipelineLoadRoute(router: LogstashPluginRouter) {
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;
const { id } = request.params;
const { client } = context.core.elasticsearch;

const result = await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
method: 'GET',
ignore: [404],
});
const { body: result } = await client.asCurrentUser.logstash.getPipeline(
{ id },
{ ignore: [404] }
);

if (result[request.params.id] === undefined) {
return response.notFound();
Expand Down
7 changes: 3 additions & 4 deletions x-pack/plugins/logstash/server/routes/pipeline/save.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ export function registerPipelineSaveRoute(
username = user?.username;
}

const client = context.logstash!.esClient;
const { client } = context.core.elasticsearch;
const pipeline = Pipeline.fromDownstreamJSON(request.body, request.params.id, username);

await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(pipeline.id),
method: 'PUT',
await client.asCurrentUser.logstash.putPipeline({
id: pipeline.id,
body: pipeline.upstreamJSON,
});

Expand Down
18 changes: 9 additions & 9 deletions x-pack/plugins/logstash/server/routes/pipelines/delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
*/

import { schema } from '@kbn/config-schema';
import { LegacyAPICaller } from 'src/core/server';
import { ElasticsearchClient } from 'src/core/server';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';

import { checkLicense } from '../../lib/check_license';
import type { LogstashPluginRouter } from '../../types';

async function deletePipelines(callWithRequest: LegacyAPICaller, pipelineIds: string[]) {
async function deletePipelines(client: ElasticsearchClient, pipelineIds: string[]) {
const deletePromises = pipelineIds.map((pipelineId) => {
return callWithRequest('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(pipelineId),
method: 'DELETE',
})
.then((success) => ({ success }))
return client.logstash
.deletePipeline({
id: pipelineId,
})
.then((response) => ({ success: response.body }))
.catch((error) => ({ error }));
});

Expand All @@ -45,8 +45,8 @@ export function registerPipelinesDeleteRoute(router: LogstashPluginRouter) {
wrapRouteWithLicenseCheck(
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash.esClient;
const results = await deletePipelines(client.callAsCurrentUser, request.body.pipelineIds);
const client = context.core.elasticsearch.client.asCurrentUser;
const results = await deletePipelines(client, request.body.pipelineIds);

return response.ok({ body: { results } });
})
Expand Down
23 changes: 12 additions & 11 deletions x-pack/plugins/logstash/server/routes/pipelines/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@
*/

import { i18n } from '@kbn/i18n';
import { LegacyAPICaller } from 'src/core/server';
import { ElasticsearchClient } from 'src/core/server';
import type { LogstashPluginRouter } from '../../types';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';

import { PipelineListItem } from '../../models/pipeline_list_item';
import { checkLicense } from '../../lib/check_license';

async function fetchPipelines(callWithRequest: LegacyAPICaller) {
const params = {
path: '/_logstash/pipeline',
method: 'GET',
ignore: [404],
};

return await callWithRequest('transport.request', params);
async function fetchPipelines(client: ElasticsearchClient) {
const { body } = await client.transport.request(
{
method: 'GET',
path: '/_logstash/pipeline',
},
{ ignore: [404] }
);
return body;
}

export function registerPipelinesListRoute(router: LogstashPluginRouter) {
Expand All @@ -33,8 +34,8 @@ export function registerPipelinesListRoute(router: LogstashPluginRouter) {
checkLicense,
router.handleLegacyErrors(async (context, request, response) => {
try {
const client = context.logstash!.esClient;
const pipelinesRecord = (await fetchPipelines(client.callAsCurrentUser)) as Record<
const { client } = context.core.elasticsearch;
const pipelinesRecord = (await fetchPipelines(client.asCurrentUser)) as Record<
string,
any
>;
Expand Down
5 changes: 1 addition & 4 deletions x-pack/plugins/logstash/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import type { ILegacyScopedClusterClient, IRouter, RequestHandlerContext } from 'src/core/server';
import type { IRouter, RequestHandlerContext } from 'src/core/server';
import type { LicensingApiRequestHandlerContext } from '../../licensing/server';

export interface PipelineListItemOptions {
Expand All @@ -19,9 +19,6 @@ export interface PipelineListItemOptions {
* @internal
*/
export interface LogstashRequestHandlerContext extends RequestHandlerContext {
logstash: {
esClient: ILegacyScopedClusterClient;
};
licensing: LicensingApiRequestHandlerContext;
}

Expand Down
4 changes: 2 additions & 2 deletions x-pack/test/api_integration/apis/logstash/cluster/load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import { FtrProviderContext } from '../../../ftr_provider_context';

export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const es = getService('legacyEs');
const es = getService('es');

describe('load', () => {
it('should return the ES cluster info', async () => {
const { body } = await supertest.get('/api/logstash/cluster').expect(200);

const responseFromES = await es.info();
const { body: responseFromES } = await es.info();
expect(body.cluster.uuid).to.eql(responseFromES.cluster_uuid);
});
});
Expand Down