Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streams] Adding the first integration test #201293

Merged
merged 6 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .buildkite/ftr_oblt_stateful_configs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ enabled:
- x-pack/test/api_integration/apis/synthetics/config.ts
- x-pack/test/api_integration/apis/uptime/config.ts
- x-pack/test/api_integration/apis/entity_manager/config.ts
- x-pack/test/api_integration/apis/streams/config.ts
- x-pack/test/apm_api_integration/basic/config.ts
- x-pack/test/apm_api_integration/cloud/config.ts
- x-pack/test/apm_api_integration/rules/config.ts
Expand Down
16 changes: 16 additions & 0 deletions x-pack/test/api_integration/apis/streams/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { FtrConfigProviderContext } from '@kbn/test';

export default async function ({ readConfigFile }: FtrConfigProviderContext) {
const baseIntegrationTestsConfig = await readConfigFile(require.resolve('../../config.ts'));
return {
...baseIntegrationTestsConfig.getAll(),
testFiles: [require.resolve('.')],
};
}
140 changes: 140 additions & 0 deletions x-pack/test/api_integration/apis/streams/full_flow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import expect from '@kbn/expect';
import {
deleteStream,
enableStreams,
fetchDocument,
forkStream,
indexDocument,
} from './helpers/requests';
import { FtrProviderContext } from '../../ftr_provider_context';
import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers';
import { cleanUpRootStream } from './helpers/cleanup';

export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const esClient = getService('es');
const retryService = getService('retry');
const logger = getService('log');

describe('Basic functionality', () => {
after(async () => {
await deleteStream(supertest, 'logs.nginx');
await cleanUpRootStream(esClient);
});

// Note: Each step is dependent on the previous
describe('Full flow', () => {
it('Enable streams', async () => {
await enableStreams(supertest);
});

it('Index a JSON document to logs, should go to logs', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:00.000Z',
message: JSON.stringify({
'log.level': 'info',
'log.logger': 'nginx',
message: 'test',
}),
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
await waitForDocumentInIndex({ esClient, indexName: 'logs', retryService, logger });

const result = await fetchDocument(esClient, 'logs', response._id);
expect(result._index).to.match(/^\.ds\-logs-.*/);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:00.000Z',
message: 'test',
log: { level: 'info', logger: 'nginx' },
});
});

it('Fork logs to logs.nginx', async () => {
const body = {
stream: {
id: 'logs.nginx',
fields: [],
processing: [],
},
condition: {
field: 'log.logger',
operator: 'eq',
value: 'nginx',
},
};
const response = await forkStream(supertest, 'logs', body);
expect(response).to.have.property('acknowledged', true);
});

it('Index an Nginx access log message, should goto logs.nginx', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:10.000Z',
message: JSON.stringify({
'log.level': 'info',
'log.logger': 'nginx',
message: 'test',
}),
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
await waitForDocumentInIndex({ esClient, indexName: 'logs.nginx', retryService, logger });

const result = await fetchDocument(esClient, 'logs.nginx', response._id);
expect(result._index).to.match(/^\.ds\-logs.nginx-.*/);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:10.000Z',
message: 'test',
log: { level: 'info', logger: 'nginx' },
});
});

it('Fork logs to logs.nginx.access', async () => {
const body = {
stream: {
id: 'logs.nginx.access',
fields: [],
processing: [],
},
condition: { field: 'log.level', operator: 'eq', value: 'info' },
};
const response = await forkStream(supertest, 'logs.nginx', body);
expect(response).to.have.property('acknowledged', true);
});

it('Index an Nginx access log message, should goto logs.nginx.access', async () => {
const doc = {
'@timestamp': '2024-01-01T00:00:20.000Z',
message: JSON.stringify({
'log.level': 'info',
'log.logger': 'nginx',
message: 'test',
}),
};
const response = await indexDocument(esClient, 'logs', doc);
expect(response.result).to.eql('created');
await waitForDocumentInIndex({
esClient,
indexName: 'logs.nginx.access',
retryService,
logger,
});

const result = await fetchDocument(esClient, 'logs.nginx.access', response._id);
expect(result._index).to.match(/^\.ds\-logs.nginx.access-.*/);
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:20.000Z',
message: 'test',
log: { level: 'info', logger: 'nginx' },
});
});
});
});
}
26 changes: 26 additions & 0 deletions x-pack/test/api_integration/apis/streams/helpers/cleanup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Client } from '@elastic/elasticsearch';

/**
DELETE .kibana_streams
DELETE _data_stream/logs
DELETE /_index_template/logs@stream
DELETE /_component_template/[email protected]
DELETE /_ingest/pipeline/logs@json-pipeline
DELETE /_ingest/pipeline/[email protected]
DELETE /_ingest/pipeline/[email protected]
*/

export async function cleanUpRootStream(esClient: Client) {
await esClient.indices.delete({ index: '.kibana_streams' });
await esClient.indices.deleteDataStream({ name: 'logs' });
await esClient.indices.deleteIndexTemplate({ name: 'logs@stream' });
await esClient.cluster.deleteComponentTemplate({ name: '[email protected]' });
await esClient.ingest.deletePipeline({ id: 'logs@stream.*' });
}
43 changes: 43 additions & 0 deletions x-pack/test/api_integration/apis/streams/helpers/requests.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Client } from '@elastic/elasticsearch';
import { JsonObject } from '@kbn/utility-types';
import { Agent } from 'supertest';
import expect from '@kbn/expect';
import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';

export async function enableStreams(supertest: Agent) {
const req = supertest.post('/api/streams/_enable').set('kbn-xsrf', 'xxx');
const response = await req.send().expect(200);
return response.body;
}

export async function indexDocument(esClient: Client, index: string, document: JsonObject) {
const response = await esClient.index({ index, document });
return response;
}

export async function fetchDocument(esClient: Client, index: string, id: string) {
const query = {
ids: { values: [id] },
};
const response = await esClient.search({ index, query });
expect((response.hits.total as SearchTotalHits).value).to.eql(1);
return response.hits.hits[0];
}

export async function forkStream(supertest: Agent, root: string, body: JsonObject) {
const req = supertest.post(`/api/streams/${root}/_fork`).set('kbn-xsrf', 'xxx');
const response = await req.send(body).expect(200);
return response.body;
}

export async function deleteStream(supertest: Agent, id: string) {
const req = supertest.delete(`/api/streams/${id}`).set('kbn-xsrf', 'xxx');
const response = await req.send().expect(200);
return response.body;
}
14 changes: 14 additions & 0 deletions x-pack/test/api_integration/apis/streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { FtrProviderContext } from '../../ftr_provider_context';

export default function ({ loadTestFile }: FtrProviderContext) {
describe('Streams Endpoints', () => {
loadTestFile(require.resolve('./full_flow'));
});
}
Loading