diff --git a/x-pack/plugins/streams/common/types.ts b/x-pack/plugins/streams/common/types.ts index 82a7a372662e8..c2d99d4ba1d89 100644 --- a/x-pack/plugins/streams/common/types.ts +++ b/x-pack/plugins/streams/common/types.ts @@ -94,6 +94,15 @@ export type StreamWithoutIdDefinition = z.infer; export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({ id: z.string(), + managed: z.boolean().default(true), + unmanaged_elasticsearch_assets: z.optional( + z.array( + z.object({ + type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']), + id: z.string(), + }) + ) + ), }); export type StreamDefinition = z.infer; diff --git a/x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts b/x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts index 8e88eeef8cd84..faff949c0d97b 100644 --- a/x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts +++ b/x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts @@ -29,6 +29,9 @@ export function createStreamsIndex(scopedClusterClient: IScopedClusterClient) { id: { type: 'keyword', }, + managed: { + type: 'boolean', + }, }, }, }); diff --git a/x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts b/x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts index 2b7deed877309..1bdb4f20a95cc 100644 --- a/x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts +++ b/x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts @@ -9,6 +9,7 @@ import { StreamDefinition } from '../../../common/types'; export const rootStreamDefinition: StreamDefinition = { id: 'logs', + managed: true, processing: [], children: [], fields: [ diff --git a/x-pack/plugins/streams/server/lib/streams/stream_crud.ts b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts index 28775ebe8acba..245e06e8b4573 100644 --- a/x-pack/plugins/streams/server/lib/streams/stream_crud.ts +++ b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts @@ -81,7 +81,7 @@ async function upsertInternalStream({ definition, scopedClusterClient }: BasePar return scopedClusterClient.asInternalUser.index({ id: definition.id, index: STREAMS_INDEX, - document: definition, + document: { ...definition, managed: true }, refresh: 'wait_for', }); } @@ -101,15 +101,32 @@ export async function listStreams({ size: 10000, sort: [{ id: 'asc' }], }); - const definitions = response.hits.hits.map((hit) => hit._source!); + + const dataStreams = await listDataStreamsAsStreams({ scopedClusterClient }); + const definitions = response.hits.hits.map((hit) => ({ ...hit._source!, managed: true })); const total = response.hits.total!; return { - definitions, - total: typeof total === 'number' ? total : total.value, + definitions: [...definitions, ...dataStreams], + total: (typeof total === 'number' ? total : total.value) + dataStreams.length, }; } +export async function listDataStreamsAsStreams({ + scopedClusterClient, +}: ListStreamsParams): Promise { + const response = await scopedClusterClient.asInternalUser.indices.getDataStream(); + return response.data_streams + .filter((dataStream) => dataStream.template.endsWith('@stream') === false) + .map((dataStream) => ({ + id: dataStream.name, + managed: false, + children: [], + fields: [], + processing: [], + })); +} + interface ReadStreamParams extends BaseParams { id: string; skipAccessCheck?: boolean; @@ -137,16 +154,80 @@ export async function readStream({ } } return { - definition, + definition: { + ...definition, + managed: true, + }, }; } catch (e) { if (e.meta?.statusCode === 404) { - throw new DefinitionNotFound(`Stream definition for ${id} not found.`); + return readDataStreamAsStream({ id, scopedClusterClient, skipAccessCheck }); } throw e; } } +export async function readDataStreamAsStream({ + id, + scopedClusterClient, + skipAccessCheck, +}: ReadStreamParams) { + const response = await scopedClusterClient.asInternalUser.indices.getDataStream({ name: id }); + if (response.data_streams.length === 1) { + const definition: StreamDefinition = { + id, + managed: false, + children: [], + fields: [], + processing: [], + }; + if (!skipAccessCheck) { + const hasAccess = await checkReadAccess({ id, scopedClusterClient }); + if (!hasAccess) { + throw new DefinitionNotFound(`Stream definition for ${id} not found.`); + } + } + // retrieve linked index template, component template and ingest pipeline + const templateName = response.data_streams[0].template; + const componentTemplates: string[] = []; + const template = await scopedClusterClient.asInternalUser.indices.getIndexTemplate({ + name: templateName, + }); + if (template.index_templates.length) { + template.index_templates[0].index_template.composed_of.forEach((componentTemplateName) => { + componentTemplates.push(componentTemplateName); + }); + } + const writeIndexName = response.data_streams[0].indices.at(-1)?.index_name!; + const currentIndex = await scopedClusterClient.asInternalUser.indices.get({ + index: writeIndexName, + }); + const ingestPipelineId = currentIndex[writeIndexName].settings?.index?.default_pipeline!; + + definition.unmanaged_elasticsearch_assets = [ + { + type: 'ingest_pipeline', + id: ingestPipelineId, + }, + ...componentTemplates.map((componentTemplateName) => ({ + type: 'component_template' as const, + id: componentTemplateName, + })), + { + type: 'index_template', + id: templateName, + }, + { + type: 'data_stream', + id, + }, + ]; + + return { definition }; + } + throw new DefinitionNotFound(`Stream definition for ${id} not found.`); +} + interface ReadAncestorsParams extends BaseParams { id: string; } @@ -285,6 +366,10 @@ export async function syncStream({ rootDefinition, logger, }: SyncStreamParams) { + if (!definition.managed) { + // TODO For now, we just don't allow reads at all - later on we will relax this to allow certain operations, but they will use a completely different syncing logic + throw new Error('Cannot sync an unmanaged stream'); + } const componentTemplate = generateLayer(definition.id, definition); await upsertComponent({ esClient: scopedClusterClient.asCurrentUser, diff --git a/x-pack/plugins/streams/server/routes/streams/edit.ts b/x-pack/plugins/streams/server/routes/streams/edit.ts index cda73907d2302..6125aa2470b94 100644 --- a/x-pack/plugins/streams/server/routes/streams/edit.ts +++ b/x-pack/plugins/streams/server/routes/streams/edit.ts @@ -76,6 +76,7 @@ export const editStreamRoute = createServerRoute({ children: [], fields: [], processing: [], + managed: true, }; await syncStream({ @@ -87,7 +88,7 @@ export const editStreamRoute = createServerRoute({ await syncStream({ scopedClusterClient, - definition: { ...streamDefinition, id: params.path.id }, + definition: { ...streamDefinition, id: params.path.id, managed: true }, rootDefinition: parentDefinition, logger, }); diff --git a/x-pack/plugins/streams/server/routes/streams/fork.ts b/x-pack/plugins/streams/server/routes/streams/fork.ts index 12dce248dcdd1..a4d846ceccb35 100644 --- a/x-pack/plugins/streams/server/routes/streams/fork.ts +++ b/x-pack/plugins/streams/server/routes/streams/fork.ts @@ -55,7 +55,7 @@ export const forkStreamsRoute = createServerRoute({ id: params.path.id, }); - const childDefinition = { ...params.body.stream, children: [] }; + const childDefinition = { ...params.body.stream, children: [], managed: true }; // check whether root stream has a child of the given name already if (rootDefinition.children.some((child) => child.id === childDefinition.id)) { diff --git a/x-pack/plugins/streams/server/routes/streams/list.ts b/x-pack/plugins/streams/server/routes/streams/list.ts index d3b88ffc36a45..774a256e5ba4a 100644 --- a/x-pack/plugins/streams/server/routes/streams/list.ts +++ b/x-pack/plugins/streams/server/routes/streams/list.ts @@ -52,9 +52,11 @@ export interface StreamTree { children: StreamTree[]; } -function asTrees(definitions: Array<{ id: string }>) { +function asTrees(definitions: Array<{ id: string; managed?: boolean }>) { const trees: StreamTree[] = []; - const ids = definitions.map((definition) => definition.id); + const ids = definitions + .filter((definition) => definition.managed) + .map((definition) => definition.id); ids.sort((a, b) => a.split('.').length - b.split('.').length); diff --git a/x-pack/plugins/streams/server/routes/streams/read.ts b/x-pack/plugins/streams/server/routes/streams/read.ts index b9d21ef25b673..5c503e2b7e625 100644 --- a/x-pack/plugins/streams/server/routes/streams/read.ts +++ b/x-pack/plugins/streams/server/routes/streams/read.ts @@ -45,6 +45,13 @@ export const readStreamRoute = createServerRoute({ id: params.path.id, }); + if (streamEntity.definition.managed === false) { + return { + ...streamEntity.definition, + inheritedFields: [], + }; + } + const { ancestors } = await readAncestors({ id: streamEntity.definition.id, scopedClusterClient, diff --git a/x-pack/plugins/streams_app/public/components/entity_detail_view/index.tsx b/x-pack/plugins/streams_app/public/components/entity_detail_view/index.tsx index 68c5f9d6798ec..8e423908af27d 100644 --- a/x-pack/plugins/streams_app/public/components/entity_detail_view/index.tsx +++ b/x-pack/plugins/streams_app/public/components/entity_detail_view/index.tsx @@ -4,9 +4,10 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ -import { EuiFlexGroup, EuiFlexItem, EuiIcon, EuiLink, EuiPanel } from '@elastic/eui'; +import { EuiFlexGroup, EuiFlexItem, EuiIcon, EuiLink, EuiPanel, EuiBadge } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import React from 'react'; +import { StreamDefinition } from '@kbn/streams-plugin/common'; import { useStreamsAppBreadcrumbs } from '../../hooks/use_streams_app_breadcrumbs'; import { useStreamsAppRouter } from '../../hooks/use_streams_app_router'; import { EntityOverviewTabList } from '../entity_overview_tab_list'; @@ -25,6 +26,7 @@ export function EntityDetailViewWithoutParams({ selectedTab, tabs, entity, + definition, }: { selectedTab: string; tabs: EntityViewTab[]; @@ -32,6 +34,7 @@ export function EntityDetailViewWithoutParams({ displayName?: string; id: string; }; + definition?: StreamDefinition; }) { const router = useStreamsAppRouter(); useStreamsAppBreadcrumbs(() => { @@ -86,7 +89,26 @@ export function EntityDetailViewWithoutParams({ } + title={ + + {entity.displayName} + {definition && !definition.managed ? ( + <> + {' '} + + {i18n.translate( + 'xpack.streams.entityDetailViewWithoutParams.unmanagedBadgeLabel', + { defaultMessage: 'Unmanaged' } + )} + + + ) : null} + + } + /> + } > { diff --git a/x-pack/plugins/streams_app/public/components/stream_detail_management/index.tsx b/x-pack/plugins/streams_app/public/components/stream_detail_management/index.tsx index 534d883905b17..749b0e659d659 100644 --- a/x-pack/plugins/streams_app/public/components/stream_detail_management/index.tsx +++ b/x-pack/plugins/streams_app/public/components/stream_detail_management/index.tsx @@ -7,13 +7,14 @@ import React from 'react'; import { i18n } from '@kbn/i18n'; import { StreamDefinition } from '@kbn/streams-plugin/common'; -import { EuiButtonGroup, EuiFlexGroup, EuiFlexItem } from '@elastic/eui'; +import { EuiButtonGroup, EuiFlexGroup, EuiFlexItem, EuiListGroup, EuiText } from '@elastic/eui'; import { useStreamsAppParams } from '../../hooks/use_streams_app_params'; import { RedirectTo } from '../redirect_to'; import { useStreamsAppRouter } from '../../hooks/use_streams_app_router'; import { StreamDetailRouting } from '../stream_detail_routing'; import { StreamDetailEnriching } from '../stream_detail_enriching'; import { StreamDetailSchemaEditor } from '../stream_detail_schema_editor'; +import { useKibana } from '../../hooks/use_kibana'; type ManagementSubTabs = 'route' | 'enrich' | 'schemaEditor'; @@ -33,6 +34,18 @@ export function StreamDetailManagement({ } = useStreamsAppParams('/{key}/management/{subtab}'); const router = useStreamsAppRouter(); + if (subtab === 'overview') { + if (!definition) { + return null; + } + if (definition.managed) { + return ( + + ); + } + return ; + } + const tabs = { route: { content: ( @@ -66,6 +79,15 @@ export function StreamDetailManagement({ ); } + if (!definition?.managed) { + return ( + + ); + } + const selectedTabObject = tabs[subtab]; return ( @@ -90,3 +112,84 @@ export function StreamDetailManagement({ ); } + +function assetToLink(asset: { type: string; id: string }) { + switch (asset.type) { + case 'index_template': + return `/app/management/data/index_management/templates/${asset.id}`; + case 'component_template': + return `/app/management/data/index_management/component_templates/${asset.id}`; + case 'data_stream': + return `/app/management/data/index_management/data_streams/${asset.id}`; + case 'ingest_pipeline': + return `/app/management/ingest/ingest_pipelines?pipeline=${asset.id}`; + default: + return ''; + } +} + +function assetToTitle(asset: { type: string; id: string }) { + switch (asset.type) { + case 'index_template': + return i18n.translate('xpack.streams.streamDetailView.indexTemplate', { + defaultMessage: 'Index template', + }); + case 'component_template': + return i18n.translate('xpack.streams.streamDetailView.componentTemplate', { + defaultMessage: 'Component template', + }); + case 'data_stream': + return i18n.translate('xpack.streams.streamDetailView.dataStream', { + defaultMessage: 'Data stream', + }); + case 'ingest_pipeline': + return i18n.translate('xpack.streams.streamDetailView.ingestPipeline', { + defaultMessage: 'Ingest pipeline', + }); + default: + return ''; + } +} + +function UnmanagedStreamOverview({ definition }: { definition: StreamDefinition }) { + const { + core: { + http: { basePath }, + }, + } = useKibana(); + const groupedAssets = (definition.unmanaged_elasticsearch_assets ?? []).reduce((acc, asset) => { + const title = assetToTitle(asset); + if (title) { + acc[title] = acc[title] ?? []; + acc[title].push(asset); + } + return acc; + }, {} as Record>); + return ( + + +

+ {i18n.translate('xpack.streams.streamDetailView.unmanagedStreamOverview', { + defaultMessage: + 'This stream is not managed. Follow the links to stack management to change the related Elasticsearch objects.', + })} +

+
+ {Object.entries(groupedAssets).map(([title, assets]) => ( +
+ +

{title}

+
+ ({ + label: asset.id, + href: basePath.prepend(assetToLink(asset)), + iconType: 'index', + target: '_blank', + }))} + /> +
+ ))} +
+ ); +} diff --git a/x-pack/plugins/streams_app/public/components/stream_detail_view/index.tsx b/x-pack/plugins/streams_app/public/components/stream_detail_view/index.tsx index 4f213e855c175..d091fb5758a1e 100644 --- a/x-pack/plugins/streams_app/public/components/stream_detail_view/index.tsx +++ b/x-pack/plugins/streams_app/public/components/stream_detail_view/index.tsx @@ -63,5 +63,12 @@ export function StreamDetailView() { }, ]; - return ; + return ( + + ); } diff --git a/x-pack/plugins/streams_app/public/components/streams_app_page_header/streams_app_page_header_title.tsx b/x-pack/plugins/streams_app/public/components/streams_app_page_header/streams_app_page_header_title.tsx index ff7d6581dea4f..a0fcc554f5133 100644 --- a/x-pack/plugins/streams_app/public/components/streams_app_page_header/streams_app_page_header_title.tsx +++ b/x-pack/plugins/streams_app/public/components/streams_app_page_header/streams_app_page_header_title.tsx @@ -7,7 +7,7 @@ import { EuiTitle } from '@elastic/eui'; import React from 'react'; -export function StreamsAppPageHeaderTitle({ title }: { title: string }) { +export function StreamsAppPageHeaderTitle({ title }: { title: React.ReactNode }) { return (

{title}

diff --git a/x-pack/plugins/streams_app/public/components/streams_table/index.tsx b/x-pack/plugins/streams_app/public/components/streams_table/index.tsx index f92c94f115e9b..39814ed904555 100644 --- a/x-pack/plugins/streams_app/public/components/streams_table/index.tsx +++ b/x-pack/plugins/streams_app/public/components/streams_table/index.tsx @@ -46,10 +46,10 @@ export function StreamsTable({ name: i18n.translate('xpack.streams.streamsTable.nameColumnTitle', { defaultMessage: 'Name', }), - render: (_, { id }) => { + render: (_, { id, managed }) => { return ( - + ), },