Skip to content

Commit

Permalink
[8.x] 🌊 Add support for existing data streams (#202057) (#202771)
Browse files Browse the repository at this point in the history
# Backport

This will backport the following commits from `main` to `8.x`:
- [🌊 Add support for existing data streams
(#202057)](#202057)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Chris
Cowan","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-12-02T23:18:58Z","message":"🌊
Add support for existing data streams (#202057)\n\n## πŸ’
Summary\r\n\r\nThis PR introduces the initial support for working with
existing data\r\nstreams. This is done by reading the `/_data_steram`
API endpoint then\r\nconverting those results to stream definitions with
the `managed` flag\r\nset to `false`, and then mixing them in with the
\"managed\" streams\r\nresults. This PR has the following
changes:\r\n\r\n- Add `managed` field to the Stream definition \r\n- Set
`managed: true` on streams created through the system\r\n- Update
`listStreams` to return both managed and un-managed streams \r\n- Update
`readStream` to fallback to \"un-managed\" stream if the
managed\r\nstream is not found\r\n- In `readStream` return all related
Elasticsearch assets\r\n- Add rudimentary UI support for classic data
streams\r\n\r\n---------\r\n\r\nCo-authored-by: Joe Reuter
<[email protected]>","sha":"4ec420a816e2eb24a3a978471e80186a470ebdde","branchLabelMapping":{"^v9.0.0$":"main","^v8.18.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:Observability","v9.0.0","backport:prev-minor","v8.18.0","Feature:Streams"],"title":"🌊
Add support for existing data
streams","number":202057,"url":"https://github.com/elastic/kibana/pull/202057","mergeCommit":{"message":"🌊
Add support for existing data streams (#202057)\n\n## πŸ’
Summary\r\n\r\nThis PR introduces the initial support for working with
existing data\r\nstreams. This is done by reading the `/_data_steram`
API endpoint then\r\nconverting those results to stream definitions with
the `managed` flag\r\nset to `false`, and then mixing them in with the
\"managed\" streams\r\nresults. This PR has the following
changes:\r\n\r\n- Add `managed` field to the Stream definition \r\n- Set
`managed: true` on streams created through the system\r\n- Update
`listStreams` to return both managed and un-managed streams \r\n- Update
`readStream` to fallback to \"un-managed\" stream if the
managed\r\nstream is not found\r\n- In `readStream` return all related
Elasticsearch assets\r\n- Add rudimentary UI support for classic data
streams\r\n\r\n---------\r\n\r\nCo-authored-by: Joe Reuter
<[email protected]>","sha":"4ec420a816e2eb24a3a978471e80186a470ebdde"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/202057","number":202057,"mergeCommit":{"message":"🌊
Add support for existing data streams (#202057)\n\n## πŸ’
Summary\r\n\r\nThis PR introduces the initial support for working with
existing data\r\nstreams. This is done by reading the `/_data_steram`
API endpoint then\r\nconverting those results to stream definitions with
the `managed` flag\r\nset to `false`, and then mixing them in with the
\"managed\" streams\r\nresults. This PR has the following
changes:\r\n\r\n- Add `managed` field to the Stream definition \r\n- Set
`managed: true` on streams created through the system\r\n- Update
`listStreams` to return both managed and un-managed streams \r\n- Update
`readStream` to fallback to \"un-managed\" stream if the
managed\r\nstream is not found\r\n- In `readStream` return all related
Elasticsearch assets\r\n- Add rudimentary UI support for classic data
streams\r\n\r\n---------\r\n\r\nCo-authored-by: Joe Reuter
<[email protected]>","sha":"4ec420a816e2eb24a3a978471e80186a470ebdde"}},{"branch":"8.x","label":"v8.18.0","branchLabelMappingKey":"^v8.18.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Chris Cowan <[email protected]>
  • Loading branch information
kibanamachine and simianhacker authored Dec 3, 2024
1 parent f3da0dc commit 60c300e
Show file tree
Hide file tree
Showing 14 changed files with 258 additions and 18 deletions.
9 changes: 9 additions & 0 deletions x-pack/plugins/streams/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ export type StreamWithoutIdDefinition = z.infer<typeof streamDefinitonSchema>;

export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({
id: z.string(),
managed: z.boolean().default(true),
unmanaged_elasticsearch_assets: z.optional(
z.array(
z.object({
type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']),
id: z.string(),
})
)
),
});

export type StreamDefinition = z.infer<typeof streamDefinitonSchema>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export function createStreamsIndex(scopedClusterClient: IScopedClusterClient) {
id: {
type: 'keyword',
},
managed: {
type: 'boolean',
},
},
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { StreamDefinition } from '../../../common/types';

export const rootStreamDefinition: StreamDefinition = {
id: 'logs',
managed: true,
processing: [],
children: [],
fields: [
Expand Down
97 changes: 91 additions & 6 deletions x-pack/plugins/streams/server/lib/streams/stream_crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async function upsertInternalStream({ definition, scopedClusterClient }: BasePar
return scopedClusterClient.asInternalUser.index({
id: definition.id,
index: STREAMS_INDEX,
document: definition,
document: { ...definition, managed: true },
refresh: 'wait_for',
});
}
Expand All @@ -101,15 +101,32 @@ export async function listStreams({
size: 10000,
sort: [{ id: 'asc' }],
});
const definitions = response.hits.hits.map((hit) => hit._source!);

const dataStreams = await listDataStreamsAsStreams({ scopedClusterClient });
const definitions = response.hits.hits.map((hit) => ({ ...hit._source!, managed: true }));
const total = response.hits.total!;

return {
definitions,
total: typeof total === 'number' ? total : total.value,
definitions: [...definitions, ...dataStreams],
total: (typeof total === 'number' ? total : total.value) + dataStreams.length,
};
}

export async function listDataStreamsAsStreams({
scopedClusterClient,
}: ListStreamsParams): Promise<StreamDefinition[]> {
const response = await scopedClusterClient.asInternalUser.indices.getDataStream();
return response.data_streams
.filter((dataStream) => dataStream.template.endsWith('@stream') === false)
.map((dataStream) => ({
id: dataStream.name,
managed: false,
children: [],
fields: [],
processing: [],
}));
}

interface ReadStreamParams extends BaseParams {
id: string;
skipAccessCheck?: boolean;
Expand Down Expand Up @@ -137,16 +154,80 @@ export async function readStream({
}
}
return {
definition,
definition: {
...definition,
managed: true,
},
};
} catch (e) {
if (e.meta?.statusCode === 404) {
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
return readDataStreamAsStream({ id, scopedClusterClient, skipAccessCheck });
}
throw e;
}
}

export async function readDataStreamAsStream({
id,
scopedClusterClient,
skipAccessCheck,
}: ReadStreamParams) {
const response = await scopedClusterClient.asInternalUser.indices.getDataStream({ name: id });
if (response.data_streams.length === 1) {
const definition: StreamDefinition = {
id,
managed: false,
children: [],
fields: [],
processing: [],
};
if (!skipAccessCheck) {
const hasAccess = await checkReadAccess({ id, scopedClusterClient });
if (!hasAccess) {
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
}
// retrieve linked index template, component template and ingest pipeline
const templateName = response.data_streams[0].template;
const componentTemplates: string[] = [];
const template = await scopedClusterClient.asInternalUser.indices.getIndexTemplate({
name: templateName,
});
if (template.index_templates.length) {
template.index_templates[0].index_template.composed_of.forEach((componentTemplateName) => {
componentTemplates.push(componentTemplateName);
});
}
const writeIndexName = response.data_streams[0].indices.at(-1)?.index_name!;
const currentIndex = await scopedClusterClient.asInternalUser.indices.get({
index: writeIndexName,
});
const ingestPipelineId = currentIndex[writeIndexName].settings?.index?.default_pipeline!;

definition.unmanaged_elasticsearch_assets = [
{
type: 'ingest_pipeline',
id: ingestPipelineId,
},
...componentTemplates.map((componentTemplateName) => ({
type: 'component_template' as const,
id: componentTemplateName,
})),
{
type: 'index_template',
id: templateName,
},
{
type: 'data_stream',
id,
},
];

return { definition };
}
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}

interface ReadAncestorsParams extends BaseParams {
id: string;
}
Expand Down Expand Up @@ -285,6 +366,10 @@ export async function syncStream({
rootDefinition,
logger,
}: SyncStreamParams) {
if (!definition.managed) {
// TODO For now, we just don't allow reads at all - later on we will relax this to allow certain operations, but they will use a completely different syncing logic
throw new Error('Cannot sync an unmanaged stream');
}
const componentTemplate = generateLayer(definition.id, definition);
await upsertComponent({
esClient: scopedClusterClient.asCurrentUser,
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/streams/server/routes/streams/edit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export const editStreamRoute = createServerRoute({
children: [],
fields: [],
processing: [],
managed: true,
};

await syncStream({
Expand All @@ -87,7 +88,7 @@ export const editStreamRoute = createServerRoute({

await syncStream({
scopedClusterClient,
definition: { ...streamDefinition, id: params.path.id },
definition: { ...streamDefinition, id: params.path.id, managed: true },
rootDefinition: parentDefinition,
logger,
});
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/streams/server/routes/streams/fork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export const forkStreamsRoute = createServerRoute({
id: params.path.id,
});

const childDefinition = { ...params.body.stream, children: [] };
const childDefinition = { ...params.body.stream, children: [], managed: true };

// check whether root stream has a child of the given name already
if (rootDefinition.children.some((child) => child.id === childDefinition.id)) {
Expand Down
6 changes: 4 additions & 2 deletions x-pack/plugins/streams/server/routes/streams/list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ export interface StreamTree {
children: StreamTree[];
}

function asTrees(definitions: Array<{ id: string }>) {
function asTrees(definitions: Array<{ id: string; managed?: boolean }>) {
const trees: StreamTree[] = [];
const ids = definitions.map((definition) => definition.id);
const ids = definitions
.filter((definition) => definition.managed)
.map((definition) => definition.id);

ids.sort((a, b) => a.split('.').length - b.split('.').length);

Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugins/streams/server/routes/streams/read.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ export const readStreamRoute = createServerRoute({
id: params.path.id,
});

if (streamEntity.definition.managed === false) {
return {
...streamEntity.definition,
inheritedFields: [],
};
}

const { ancestors } = await readAncestors({
id: streamEntity.definition.id,
scopedClusterClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { EuiFlexGroup, EuiFlexItem, EuiIcon, EuiLink, EuiPanel } from '@elastic/eui';
import { EuiFlexGroup, EuiFlexItem, EuiIcon, EuiLink, EuiPanel, EuiBadge } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import React from 'react';
import { StreamDefinition } from '@kbn/streams-plugin/common';
import { useStreamsAppBreadcrumbs } from '../../hooks/use_streams_app_breadcrumbs';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { EntityOverviewTabList } from '../entity_overview_tab_list';
Expand All @@ -25,13 +26,15 @@ export function EntityDetailViewWithoutParams({
selectedTab,
tabs,
entity,
definition,
}: {
selectedTab: string;
tabs: EntityViewTab[];
entity: {
displayName?: string;
id: string;
};
definition?: StreamDefinition;
}) {
const router = useStreamsAppRouter();
useStreamsAppBreadcrumbs(() => {
Expand Down Expand Up @@ -86,7 +89,26 @@ export function EntityDetailViewWithoutParams({
<EuiFlexItem grow={false}>
<StreamsAppPageHeader
verticalPaddingSize="none"
title={<StreamsAppPageHeaderTitle title={entity.displayName} />}
title={
<StreamsAppPageHeaderTitle
title={
<>
{entity.displayName}
{definition && !definition.managed ? (
<>
{' '}
<EuiBadge>
{i18n.translate(
'xpack.streams.entityDetailViewWithoutParams.unmanagedBadgeLabel',
{ defaultMessage: 'Unmanaged' }
)}
</EuiBadge>
</>
) : null}
</>
}
/>
}
>
<EntityOverviewTabList
tabs={Object.entries(tabMap).map(([tabKey, { label, href }]) => {
Expand Down
Loading

0 comments on commit 60c300e

Please sign in to comment.