diff --git a/src/core/server/opensearch/version_check/ensure_opensearch_version.test.ts b/src/core/server/opensearch/version_check/ensure_opensearch_version.test.ts index 9b3b0cdb1d91..709e5c4309f2 100644 --- a/src/core/server/opensearch/version_check/ensure_opensearch_version.test.ts +++ b/src/core/server/opensearch/version_check/ensure_opensearch_version.test.ts @@ -253,18 +253,18 @@ describe('pollOpenSearchNodesVersion', () => { it('returns compatibility results and isCompatible=true with filters', (done) => { expect.assertions(2); const target = { - id: '0', + cluster_id: '0', attribute: 'foo', }; const filter = { - id: '1', + cluster_id: '1', attribute: 'bar', }; // will filter out every odd index const nodes = createNodesWithAttribute( - target.id, - filter.id, + target.cluster_id, + filter.cluster_id, target.attribute, filter.attribute, '5.1.0', @@ -273,6 +273,10 @@ describe('pollOpenSearchNodesVersion', () => { '5.1.1-Beta1' ); + const filteredNodes = nodes; + delete filteredNodes.nodes['node-1']; + delete filteredNodes.nodes['node-3']; + // @ts-expect-error we need to return an incompatible type to use the testScheduler here internalClient.cluster.state.mockReturnValueOnce({ body: nodes }); @@ -280,7 +284,7 @@ describe('pollOpenSearchNodesVersion', () => { pollOpenSearchNodesVersion({ internalClient, - optimizedHealthcheck: { id: target.id, filters: { custom_attribute: filter.attribute } }, + optimizedHealthcheck: { id: 'cluster_id', filters: { custom_attribute: filter.attribute } }, opensearchVersionCheckInterval: 1, ignoreVersionMismatch: false, opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION, @@ -290,7 +294,7 @@ describe('pollOpenSearchNodesVersion', () => { .subscribe({ next: (result) => { expect(result).toEqual( - mapNodesVersionCompatibility(nodes, OPENSEARCH_DASHBOARDS_VERSION, false) + mapNodesVersionCompatibility(filteredNodes, OPENSEARCH_DASHBOARDS_VERSION, false) ); expect(result.isCompatible).toBe(true); }, @@ -302,18 +306,18 @@ describe('pollOpenSearchNodesVersion', () => { it('returns compatibility results and isCompatible=false with filters', (done) => { expect.assertions(2); const target = { - id: '0', + cluster_id: '0', attribute: 'foo', }; const filter = { - id: '1', + cluster_id: '1', attribute: 'bar', }; // will filter out every odd index const nodes = createNodesWithAttribute( - target.id, - filter.id, + target.cluster_id, + filter.cluster_id, target.attribute, filter.attribute, '5.1.0', @@ -322,6 +326,10 @@ describe('pollOpenSearchNodesVersion', () => { '5.1.1-Beta1' ); + const filteredNodes = nodes; + delete filteredNodes.nodes['node-1']; + delete filteredNodes.nodes['node-3']; + // @ts-expect-error we need to return an incompatible type to use the testScheduler here internalClient.cluster.state.mockReturnValueOnce({ body: nodes }); @@ -329,7 +337,7 @@ describe('pollOpenSearchNodesVersion', () => { pollOpenSearchNodesVersion({ internalClient, - optimizedHealthcheck: { id: target.id, filters: { custom_attribute: filter.attribute } }, + optimizedHealthcheck: { id: 'cluster_id', filters: { custom_attribute: filter.attribute } }, opensearchVersionCheckInterval: 1, ignoreVersionMismatch: false, opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION, @@ -339,7 +347,7 @@ describe('pollOpenSearchNodesVersion', () => { .subscribe({ next: (result) => { expect(result).toEqual( - mapNodesVersionCompatibility(nodes, OPENSEARCH_DASHBOARDS_VERSION, false) + mapNodesVersionCompatibility(filteredNodes, OPENSEARCH_DASHBOARDS_VERSION, false) ); expect(result.isCompatible).toBe(false); }, diff --git a/src/core/server/opensearch/version_check/ensure_opensearch_version.ts b/src/core/server/opensearch/version_check/ensure_opensearch_version.ts index b8025947cd88..49609844b4a8 100644 --- a/src/core/server/opensearch/version_check/ensure_opensearch_version.ts +++ b/src/core/server/opensearch/version_check/ensure_opensearch_version.ts @@ -35,7 +35,6 @@ import { timer, of, from, Observable } from 'rxjs'; import { map, distinctUntilChanged, catchError, exhaustMap, mergeMap } from 'rxjs/operators'; -import { get } from 'lodash'; import { ApiResponse } from '@opensearch-project/opensearch'; import { opensearchVersionCompatibleWithOpenSearchDashboards, @@ -53,25 +52,29 @@ import type { OpenSearchClient } from '../client'; * If the supplied node attribute is missing then we return null and use default fan out behavior * @param {OpenSearchClient} internalClient * @param {OptimizedHealthcheck} healthcheck - * @returns {string|null} '_local' if all nodes have the same cluster_id, otherwise null + * @returns {string|string[]|null} '_local' if all nodes have the same cluster_id, array of node ids if different cluster_id, null if no cluster_id or nodes returned */ export const getNodeId = async ( internalClient: OpenSearchClient, healthcheck: OptimizedHealthcheck -): Promise => { +): Promise => { try { + // If missing an id, we have nothing to check + if (!healthcheck.id) return null; + let path = `nodes.*.attributes.${healthcheck.id}`; const filters = healthcheck.filters; - if (filters) { - Object.keys(filters).forEach((key) => { - path += `,nodes.*.attributes.${key}`; - }); + const filterKeys = filters ? Object.keys(filters) : []; + + for (const key of filterKeys) { + path += `,nodes.*.attributes.${key}`; } const state = (await internalClient.cluster.state({ metric: 'nodes', filter_path: [path], })) as ApiResponse; + /* Aggregate different cluster_ids from the OpenSearch nodes * if all the nodes have the same cluster_id, retrieve nodes.info from _local node only * Using _cluster/state/nodes to retrieve the cluster_id of each node from cluster manager node which is considered to be a lightweight operation @@ -79,37 +82,34 @@ export const getNodeId = async ( * else there are no nodes in the cluster */ const nodes = state.body.nodes; - let nodeIds = Object.keys(nodes); - if (nodeIds.length === 0) { - return null; - } + const nodeIds = new Set(Object.keys(nodes)); /* * If filters are set look for the key and value and filter out any node that matches * the value for that attribute. */ - if (filters) { - nodeIds.forEach((id) => { - Object.keys(filters).forEach((key) => { - const attributeValue = get(nodes[id], `attributes.${key}`, null); - if (attributeValue === filters[key]) { - delete nodes[id]; - } - }); - }); + for (const id of nodeIds) { + for (const key of filterKeys) { + const attributeValue = nodes[id].attributes?.[key] ?? null; - nodeIds = Object.keys(nodes); - if (nodeIds.length === 0) { - return null; + if (attributeValue === filters![key]) nodeIds.delete(id); } } - const sharedClusterId = get(nodes[nodeIds[0]], `attributes.${healthcheck.id}`, null); + if (nodeIds.size === 0) return null; + + const [firstNodeId] = nodeIds; + const sharedClusterId = nodes[firstNodeId].attributes?.[healthcheck.id] ?? null; + // If cluster_id is not set then fan out + if (sharedClusterId === null) return null; + + // If a node is found to have a different cluster_id, return node ids + for (const id of nodeIds) { + if (nodes[id].attributes?.[healthcheck.id] !== sharedClusterId) return Array.from(nodeIds); + } - return sharedClusterId === null || - nodes.find((node: any) => sharedClusterId !== get(node, `attributes.${healthcheck.id}`, null)) - ? null - : '_local'; + // When all nodes share the same cluster_id, return _local + return '_local'; } catch (e) { return null; }