From 5474104cfd4bde3e5a882179be60cc3024829b90 Mon Sep 17 00:00:00 2001 From: Bandini <63824432+bandinib-amzn@users.noreply.github.com> Date: Fri, 15 Mar 2024 14:49:05 -0700 Subject: [PATCH] Improves connection pooling support for AWSSigV4 clients in data sources. (#6135) * Upgrade @opensearch/opensearch@2.6.0 which inherits AWSSigv4 to .child Signed-off-by: Bandini Bhopi * Uses client.child for aws sigv4 connection Signed-off-by: Bandini Bhopi * Import http-aws-es connector class Signed-off-by: Bandini Bhopi * Refactor client caching mechanism Signed-off-by: Bandini Bhopi * Fix UT Signed-off-by: Bandini Bhopi * Added UT for client pool Signed-off-by: Bandini Bhopi * Revert client pool changes from authentication method Signed-off-by: Bandini Bhopi --------- Signed-off-by: Bandini Bhopi (cherry picked from commit 09f55636e4370b003c42d5bcd8ae93535c230c00) --- package.json | 2 +- .../authentication_methods_registry.test.ts | 6 +- .../authentication_methods_registry.ts | 10 + .../client/configure_client.test.mocks.ts | 9 - .../server/client/configure_client.test.ts | 356 ++++++++++++-- .../server/client/configure_client.ts | 83 ++-- .../server/client/configure_client_utils.ts | 71 ++- .../legacy/configure_legacy_client.test.ts | 450 ++++++++++++++++-- .../server/legacy/configure_legacy_client.ts | 88 ++-- .../server/legacy/http_aws_es/connector.js | 134 ++++++ .../legacy/http_aws_es/connector.test.js | 98 ++++ src/plugins/data_source/server/types.ts | 10 +- .../server/util/credential_provider.ts | 14 +- yarn.lock | 8 +- 14 files changed, 1144 insertions(+), 195 deletions(-) create mode 100644 src/plugins/data_source/server/legacy/http_aws_es/connector.js create mode 100644 src/plugins/data_source/server/legacy/http_aws_es/connector.test.js diff --git a/package.json b/package.json index e81ad2c2b999..edc5211389e0 100644 --- a/package.json +++ b/package.json @@ -148,7 +148,7 @@ "@hapi/vision": "^6.1.0", "@hapi/wreck": "^17.1.0", "@opensearch-project/opensearch": "^1.1.0", - "@opensearch-project/opensearch-next": "npm:@opensearch-project/opensearch@^2.3.1", + "@opensearch-project/opensearch-next": "npm:@opensearch-project/opensearch@^2.6.0", "@osd/ace": "1.0.0", "@osd/analytics": "1.0.0", "@osd/apm-config-loader": "1.0.0", diff --git a/src/plugins/data_source/server/auth_registry/authentication_methods_registry.test.ts b/src/plugins/data_source/server/auth_registry/authentication_methods_registry.test.ts index c7692acee782..948641870a8a 100644 --- a/src/plugins/data_source/server/auth_registry/authentication_methods_registry.test.ts +++ b/src/plugins/data_source/server/auth_registry/authentication_methods_registry.test.ts @@ -5,13 +5,11 @@ import { AuthenticationMethodRegistery } from './authentication_methods_registry'; import { AuthenticationMethod } from '../../server/types'; -import { AuthType } from '../../common/data_sources'; const createAuthenticationMethod = ( authMethod: Partial ): AuthenticationMethod => ({ name: 'unknown', - authType: AuthType.NoAuth, credentialProvider: jest.fn(), ...authMethod, }); @@ -61,14 +59,14 @@ describe('AuthenticationMethodRegistery', () => { registry.registerAuthenticationMethod( createAuthenticationMethod({ name: 'typeA', - authType: AuthType.NoAuth, + credentialProvider: jest.fn(), }) ); const typeA = registry.getAuthenticationMethod('typeA')!; expect(() => { - typeA.authType = AuthType.SigV4; + typeA.credentialProvider = jest.fn(); }).toThrow(); expect(() => { typeA.name = 'foo'; diff --git a/src/plugins/data_source/server/auth_registry/authentication_methods_registry.ts b/src/plugins/data_source/server/auth_registry/authentication_methods_registry.ts index e2f39498e007..9fe2eb1e37e3 100644 --- a/src/plugins/data_source/server/auth_registry/authentication_methods_registry.ts +++ b/src/plugins/data_source/server/auth_registry/authentication_methods_registry.ts @@ -5,6 +5,7 @@ import { deepFreeze } from '@osd/std'; import { AuthenticationMethod } from '../../server/types'; +import { AuthType } from '../../common/data_sources'; export type IAuthenticationMethodRegistery = Omit< AuthenticationMethodRegistery, @@ -18,6 +19,15 @@ export class AuthenticationMethodRegistery { * Authentication Method can only be registered once. subsequent calls with the same method name will throw an error. */ public registerAuthenticationMethod(method: AuthenticationMethod) { + if ( + method.name === AuthType.NoAuth || + method.name === AuthType.UsernamePasswordType || + method.name === AuthType.SigV4 + ) { + throw new Error( + `Must not be no_auth or username_password or sigv4 for registered auth types` + ); + } if (this.authMethods.has(method.name)) { throw new Error(`Authentication method '${method.name}' is already registered`); } diff --git a/src/plugins/data_source/server/client/configure_client.test.mocks.ts b/src/plugins/data_source/server/client/configure_client.test.mocks.ts index d1135531da9c..98d4811086fd 100644 --- a/src/plugins/data_source/server/client/configure_client.test.mocks.ts +++ b/src/plugins/data_source/server/client/configure_client.test.mocks.ts @@ -21,12 +21,3 @@ export const authRegistryCredentialProviderMock = jest.fn(); jest.doMock('../util/credential_provider', () => ({ authRegistryCredentialProvider: authRegistryCredentialProviderMock, })); - -export const CredentialsMock = jest.fn(); -jest.doMock('aws-sdk', () => { - const actual = jest.requireActual('aws-sdk'); - return { - ...actual, - Credentials: CredentialsMock, - }; -}); diff --git a/src/plugins/data_source/server/client/configure_client.test.ts b/src/plugins/data_source/server/client/configure_client.test.ts index 9850d8773371..4e9460401476 100644 --- a/src/plugins/data_source/server/client/configure_client.test.ts +++ b/src/plugins/data_source/server/client/configure_client.test.ts @@ -11,22 +11,22 @@ import { AuthType, UsernamePasswordTypedContent, SigV4Content, + SigV4ServiceName, } from '../../common/data_sources/types'; import { DataSourcePluginConfigType } from '../../config'; import { ClientMock, parseClientOptionsMock, authRegistryCredentialProviderMock, - CredentialsMock, } from './configure_client.test.mocks'; -import { OpenSearchClientPoolSetup } from './client_pool'; +import { OpenSearchClientPool, OpenSearchClientPoolSetup } from './client_pool'; import { configureClient } from './configure_client'; import { ClientOptions } from '@opensearch-project/opensearch-next'; // eslint-disable-next-line @osd/eslint/no-restricted-paths import { opensearchClientMock } from '../../../../core/server/opensearch/client/mocks'; import { cryptographyServiceSetupMock } from '../cryptography_service.mocks'; import { CryptographyServiceSetup } from '../cryptography_service'; -import { DataSourceClientParams, AuthenticationMethod } from '../types'; +import { DataSourceClientParams, AuthenticationMethod, ClientParameters } from '../types'; import { CustomApiSchemaRegistry } from '../schema_registry'; import { IAuthenticationMethodRegistery } from '../auth_registry'; import { authenticationMethodRegisteryMock } from '../auth_registry/authentication_methods_registry.mock'; @@ -39,7 +39,6 @@ describe('configureClient', () => { let config: DataSourcePluginConfigType; let savedObjectsMock: jest.Mocked; let cryptographyMock: jest.Mocked; - let clientPoolSetup: OpenSearchClientPoolSetup; let clientOptions: ClientOptions; let dataSourceAttr: DataSourceAttributes; let dsClient: ReturnType; @@ -48,15 +47,20 @@ describe('configureClient', () => { let sigV4AuthContent: SigV4Content; let customApiSchemaRegistry: CustomApiSchemaRegistry; let authenticationMethodRegistery: jest.Mocked; + let clientParameters: ClientParameters; const customAuthContent = { region: 'us-east-1', roleARN: 'test-role', }; + const clientPoolSetup: OpenSearchClientPoolSetup = { + getClientFromPool: jest.fn(), + addClientToPool: jest.fn(), + }; + const authMethod: AuthenticationMethod = { name: 'typeA', - authType: AuthType.SigV4, credentialProvider: jest.fn(), }; @@ -102,11 +106,6 @@ describe('configureClient', () => { }, } as DataSourceAttributes; - clientPoolSetup = { - getClientFromPool: jest.fn(), - addClientToPool: jest.fn(), - }; - savedObjectsMock.get.mockResolvedValueOnce({ id: DATA_SOURCE_ID, type: DATA_SOURCE_SAVED_OBJECT_TYPE, @@ -121,13 +120,21 @@ describe('configureClient', () => { customApiSchemaRegistryPromise: Promise.resolve(customApiSchemaRegistry), }; + clientParameters = { + authType: AuthType.SigV4, + endpoint: dataSourceAttr.endpoint, + cacheKeySuffix: '', + credentials: sigV4AuthContent, + }; + ClientMock.mockImplementation(() => dsClient); authenticationMethodRegistery.getAuthenticationMethod.mockImplementation(() => authMethod); + authRegistryCredentialProviderMock.mockReturnValue(clientParameters); }); afterEach(() => { ClientMock.mockReset(); - CredentialsMock.mockReset(); + authRegistryCredentialProviderMock.mockReset(); }); test('configure client with auth.type == no_auth, will call new Client() to create client', async () => { @@ -212,9 +219,21 @@ describe('configureClient', () => { encryptionContext: { endpoint: 'http://localhost' }, }); - await configureClient(dataSourceClientParams, clientPoolSetup, config, logger); + const client = await configureClient(dataSourceClientParams, clientPoolSetup, config, logger); expect(ClientMock).toHaveBeenCalledTimes(1); + expect(client).toBe(dsClient.child.mock.results[0].value); + expect(dsClient.child).toBeCalledWith({ + auth: { + credentials: { + accessKeyId: 'accessKey', + secretAccessKey: 'accessKey', + sessionToken: '', + }, + region: sigV4AuthContent.region, + service: 'aoss', + }, + }); }); test('configure test client for non-exist datasource should not call saved object api, nor decode any credential', async () => { @@ -279,12 +298,7 @@ describe('configureClient', () => { references: [], }); - authRegistryCredentialProviderMock.mockReturnValue({ - credential: sigV4AuthContent, - type: AuthType.SigV4, - }); - - await configureClient( + const client = await configureClient( { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, clientPoolSetup, config, @@ -294,10 +308,17 @@ describe('configureClient', () => { expect(authenticationMethodRegistery.getAuthenticationMethod).toHaveBeenCalledTimes(1); expect(ClientMock).toHaveBeenCalledTimes(1); expect(savedObjectsMock.get).toHaveBeenCalledTimes(1); - expect(CredentialsMock).toHaveBeenCalledTimes(1); - expect(CredentialsMock).toBeCalledWith({ - accessKeyId: sigV4AuthContent.accessKey, - secretAccessKey: sigV4AuthContent.secretKey, + expect(client).toBe(dsClient.child.mock.results[0].value); + expect(dsClient.child).toBeCalledWith({ + auth: { + credentials: { + accessKeyId: sigV4AuthContent.accessKey, + secretAccessKey: sigV4AuthContent.secretKey, + sessionToken: '', + }, + region: sigV4AuthContent.region, + service: SigV4ServiceName.OpenSearch, + }, }); }); @@ -317,11 +338,11 @@ describe('configureClient', () => { }); authRegistryCredentialProviderMock.mockReturnValue({ - credential: mockCredentials, - type: AuthType.SigV4, + ...clientParameters, + credentials: mockCredentials, }); - await configureClient( + const client = await configureClient( { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, clientPoolSetup, config, @@ -329,11 +350,286 @@ describe('configureClient', () => { ); expect(ClientMock).toHaveBeenCalledTimes(1); - expect(CredentialsMock).toHaveBeenCalledTimes(1); - expect(CredentialsMock).toBeCalledWith({ - accessKeyId: mockCredentials.accessKey, - secretAccessKey: mockCredentials.secretKey, - sessionToken: mockCredentials.sessionToken, + expect(client).toBe(dsClient.child.mock.results[0].value); + expect(dsClient.child).toBeCalledWith({ + auth: { + credentials: { + accessKeyId: mockCredentials.accessKey, + secretAccessKey: mockCredentials.secretKey, + sessionToken: mockCredentials.sessionToken, + }, + region: mockCredentials.region, + service: SigV4ServiceName.OpenSearch, + }, + }); + }); + + test('configure client with auth method from registry, service == aoss, should successfully call new Client()', async () => { + savedObjectsMock.get.mockReset().mockResolvedValueOnce({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...dataSourceAttr, + auth: { + type: AuthType.SigV4, + credentials: { ...customAuthContent, service: 'aoss' }, + }, + }, + references: [], + }); + + const client = await configureClient( + { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, + clientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(1); + expect(client).toBe(dsClient.child.mock.results[0].value); + expect(dsClient.child).toBeCalledWith({ + auth: { + credentials: { + accessKeyId: sigV4AuthContent.accessKey, + secretAccessKey: sigV4AuthContent.secretKey, + sessionToken: '', + }, + region: sigV4AuthContent.region, + service: 'aoss', + }, + }); + }); + + describe('Client Pool', () => { + let opensearchClientPoolSetup: OpenSearchClientPoolSetup; + let openSearchClientPool: OpenSearchClientPool; + beforeEach(() => { + openSearchClientPool = new OpenSearchClientPool(logger); + opensearchClientPoolSetup = openSearchClientPool.setup(config); + }); + + describe('NoAuth', () => { + beforeEach(() => { + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...dataSourceAttr, + auth: { + type: AuthType.NoAuth, + }, + }, + references: [], + }); + }); + + test('For same endpoint only one client object should be created', async () => { + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + + expect(ClientMock).toHaveBeenCalledTimes(1); + }); + + test('For different endpoints multiple client objects should be created', async () => { + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + + const mockDataSourceAttr = { ...dataSourceAttr, endpoint: 'http://test.com' }; + + savedObjectsMock.get.mockReset().mockResolvedValueOnce({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...mockDataSourceAttr, + auth: { + type: AuthType.NoAuth, + }, + }, + references: [], + }); + + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + + expect(ClientMock).toHaveBeenCalledTimes(2); + }); + }); + + describe('UserNamePassword', () => { + beforeEach(() => { + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: dataSourceAttr, + references: [], + }); + jest.spyOn(cryptographyMock, 'decodeAndDecrypt').mockResolvedValue({ + decryptedText: 'password', + encryptionContext: { endpoint: 'http://localhost' }, + }); + }); + + test('For same endpoint only one client object should be created', async () => { + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + + expect(ClientMock).toHaveBeenCalledTimes(1); + }); + + test('For different endpoints multiple client objects should be created', async () => { + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + + const mockDataSourceAttr = { ...dataSourceAttr, endpoint: 'http://test.com' }; + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: mockDataSourceAttr, + references: [], + }); + jest.spyOn(cryptographyMock, 'decodeAndDecrypt').mockResolvedValue({ + decryptedText: 'password', + encryptionContext: { endpoint: 'http://test.com' }, + }); + + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + + expect(ClientMock).toHaveBeenCalledTimes(2); + }); + }); + + describe('AWSSigV4', () => { + beforeEach(() => { + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...dataSourceAttr, + auth: { + type: AuthType.SigV4, + credentials: sigV4AuthContent, + }, + }, + references: [], + }); + + jest.spyOn(cryptographyMock, 'decodeAndDecrypt').mockResolvedValue({ + decryptedText: 'accessKey', + encryptionContext: { endpoint: 'http://localhost' }, + }); + }); + test('For same endpoint only one client object should be created', async () => { + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + + expect(ClientMock).toHaveBeenCalledTimes(1); + }); + + test('For different endpoints multiple client objects should be created', async () => { + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + + const mockDataSourceAttr = { ...dataSourceAttr, endpoint: 'http://test.com' }; + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...mockDataSourceAttr, + auth: { + type: AuthType.SigV4, + credentials: sigV4AuthContent, + }, + }, + references: [], + }); + + jest.spyOn(cryptographyMock, 'decodeAndDecrypt').mockResolvedValue({ + decryptedText: 'accessKey', + encryptionContext: { endpoint: 'http://test.com' }, + }); + await configureClient(dataSourceClientParams, opensearchClientPoolSetup, config, logger); + + expect(ClientMock).toHaveBeenCalledTimes(2); + }); + }); + + describe('Auth Method from Registry', () => { + beforeEach(() => { + const authMethodWithClientPool: AuthenticationMethod = { + name: 'clientPoolTest', + credentialProvider: jest.fn(), + }; + authenticationMethodRegistery.getAuthenticationMethod + .mockReset() + .mockImplementation(() => authMethodWithClientPool); + const mockDataSourceAttr = { ...dataSourceAttr, name: 'custom_auth' }; + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...mockDataSourceAttr, + auth: { + type: AuthType.SigV4, + credentials: customAuthContent, + }, + }, + references: [], + }); + }); + test('If endpoint is same for multiple requests client pool size should be 1', async () => { + await configureClient( + { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, + opensearchClientPoolSetup, + config, + logger + ); + + await configureClient( + { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, + opensearchClientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(1); + }); + + test('If endpoint is different for two requests client pool size should be 2', async () => { + await configureClient( + { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, + opensearchClientPoolSetup, + config, + logger + ); + + const mockDataSourceAttr = { + ...dataSourceAttr, + endpoint: 'http://test.com', + name: 'custom_auth', + }; + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...mockDataSourceAttr, + auth: { + type: AuthType.SigV4, + credentials: customAuthContent, + }, + }, + references: [], + }); + authRegistryCredentialProviderMock.mockReturnValue({ + ...clientParameters, + endpoint: 'http://test.com', + cacheKeySuffix: 'test', + }); + + await configureClient( + { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, + opensearchClientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(2); + }); }); }); }); diff --git a/src/plugins/data_source/server/client/configure_client.ts b/src/plugins/data_source/server/client/configure_client.ts index 20d3d3773960..87b7bc3584ee 100644 --- a/src/plugins/data_source/server/client/configure_client.ts +++ b/src/plugins/data_source/server/client/configure_client.ts @@ -5,19 +5,19 @@ import { Client, ClientOptions } from '@opensearch-project/opensearch-next'; import { Client as LegacyClient } from 'elasticsearch'; -import { Credentials } from 'aws-sdk'; import { AwsSigv4Signer } from '@opensearch-project/opensearch-next/aws'; import { Logger, OpenSearchDashboardsRequest } from '../../../../../src/core/server'; import { AuthType, DataSourceAttributes, SigV4Content, + SigV4ServiceName, UsernamePasswordTypedContent, } from '../../common/data_sources'; import { DataSourcePluginConfigType } from '../../config'; import { CryptographyServiceSetup } from '../cryptography_service'; import { createDataSourceError } from '../lib/error'; -import { DataSourceClientParams } from '../types'; +import { DataSourceClientParams, ClientParameters } from '../types'; import { parseClientOptions } from './client_config'; import { OpenSearchClientPoolSetup } from './client_pool'; import { @@ -25,10 +25,9 @@ import { getAWSCredential, getCredential, getDataSource, + getAuthenticationMethod, generateCacheKey, - getSigV4Credentials, } from './configure_client_utils'; -import { IAuthenticationMethodRegistery } from '../auth_registry'; import { authRegistryCredentialProvider } from '../util/credential_provider'; export const configureClient = async ( @@ -47,6 +46,7 @@ export const configureClient = async ( ): Promise => { let dataSource; let requireDecryption = true; + let clientParams; try { // configure test client @@ -69,10 +69,18 @@ export const configureClient = async ( dataSource = await getDataSource(dataSourceId!, savedObjects); } + const authenticationMethod = getAuthenticationMethod(dataSource, authRegistry); + if (authenticationMethod !== undefined) { + clientParams = await authRegistryCredentialProvider(authenticationMethod, { + dataSourceAttr: dataSource, + request, + cryptography, + }); + } const rootClient = getRootClient( dataSource, openSearchClientPoolSetup.getClientFromPool, - dataSourceId + clientParams ) as Client; const registeredSchema = (await customApiSchemaRegistryPromise).getAll(); @@ -86,7 +94,7 @@ export const configureClient = async ( rootClient, dataSourceId, request, - authRegistry, + clientParams, requireDecryption ); } catch (error: any) { @@ -122,30 +130,30 @@ const getQueryClient = async ( rootClient?: Client, dataSourceId?: string, request?: OpenSearchDashboardsRequest, - authRegistry?: IAuthenticationMethodRegistery, + clientParams?: ClientParameters, requireDecryption: boolean = true ): Promise => { let credential; + let cacheKeySuffix; let { auth: { type }, - name, + endpoint, } = dataSourceAttr; - const { endpoint } = dataSourceAttr; - name = name ?? type; const clientOptions = parseClientOptions(config, endpoint, registeredSchema); - const cacheKey = generateCacheKey(dataSourceAttr, dataSourceId); - const authenticationMethod = authRegistry?.getAuthenticationMethod(name); - if (authenticationMethod !== undefined) { - const credentialProvider = await authRegistryCredentialProvider(authenticationMethod, { - dataSourceAttr, - request, - cryptography, - }); - credential = credentialProvider.credential; - type = credentialProvider.type; + if (clientParams !== undefined) { + credential = clientParams.credentials; + type = clientParams.authType; + cacheKeySuffix = clientParams.cacheKeySuffix; + endpoint = clientParams.endpoint; + + if (credential.service === undefined) { + credential = { ...credential, service: dataSourceAttr.auth.credentials?.service }; + } } + const cacheKey = generateCacheKey(endpoint, cacheKeySuffix); + switch (type) { case AuthType.NoAuth: if (!rootClient) rootClient = new Client(clientOptions); @@ -172,10 +180,12 @@ const getQueryClient = async ( ? await getAWSCredential(dataSourceAttr, cryptography!) : (dataSourceAttr.auth.credentials as SigV4Content)); - const awsClient = rootClient ? rootClient : getAWSClient(credential, clientOptions); - addClientToPool(cacheKey, type, awsClient); + if (!rootClient) { + rootClient = getAWSClient(credential, clientOptions); + } + addClientToPool(cacheKey, type, rootClient); - return awsClient; + return getAWSChildClient(rootClient, credential); default: throw Error(`${type} is not a supported auth type for data source`); @@ -200,21 +210,28 @@ const getBasicAuthClient = ( }; const getAWSClient = (credential: SigV4Content, clientOptions: ClientOptions): Client => { - const { accessKey, secretKey, region, service, sessionToken } = credential; - const sigv4Credentials = getSigV4Credentials(accessKey, secretKey, sessionToken); - - const credentialProvider = (): Promise => { - return new Promise((resolve) => { - resolve(sigv4Credentials); - }); - }; + const { region } = credential; return new Client({ ...AwsSigv4Signer({ region, - getCredentials: credentialProvider, - service, }), ...clientOptions, }); }; + +const getAWSChildClient = (rootClient: Client, credential: SigV4Content): Client => { + const { accessKey, secretKey, region, service, sessionToken } = credential; + + return rootClient.child({ + auth: { + credentials: { + accessKeyId: accessKey, + secretAccessKey: secretKey, + sessionToken: sessionToken ?? '', + }, + region, + service: service ?? SigV4ServiceName.OpenSearch, + }, + }); +}; diff --git a/src/plugins/data_source/server/client/configure_client_utils.ts b/src/plugins/data_source/server/client/configure_client_utils.ts index 9392b21a24b8..9591af451ef8 100644 --- a/src/plugins/data_source/server/client/configure_client_utils.ts +++ b/src/plugins/data_source/server/client/configure_client_utils.ts @@ -5,8 +5,10 @@ import { Client } from '@opensearch-project/opensearch-next'; import { Client as LegacyClient } from 'elasticsearch'; -import { Credentials } from 'aws-sdk'; -import { SavedObjectsClientContract } from '../../../../../src/core/server'; +import { + OpenSearchDashboardsRequest, + SavedObjectsClientContract, +} from '../../../../../src/core/server'; import { DATA_SOURCE_SAVED_OBJECT_TYPE } from '../../common'; import { DataSourceAttributes, @@ -16,6 +18,8 @@ import { } from '../../common/data_sources'; import { CryptographyServiceSetup } from '../cryptography_service'; import { createDataSourceError } from '../lib/error'; +import { IAuthenticationMethodRegistery } from '../auth_registry'; +import { AuthenticationMethod, ClientParameters } from '../types'; /** * Get the root client of datasource from @@ -29,23 +33,23 @@ import { createDataSourceError } from '../lib/error'; export const getRootClient = ( dataSourceAttr: DataSourceAttributes, getClientFromPool: (endpoint: string, authType: AuthType) => Client | LegacyClient | undefined, - dataSourceId?: string + clientParams?: ClientParameters ): Client | LegacyClient | undefined => { - const { + let cacheKeySuffix; + let { auth: { type }, - lastUpdatedTime, + endpoint, } = dataSourceAttr; - let cachedClient; - const cacheKey = generateCacheKey(dataSourceAttr, dataSourceId); - - // return undefined when building SigV4 test client with new credentials - if (type === AuthType.SigV4) { - cachedClient = dataSourceId && lastUpdatedTime ? getClientFromPool(cacheKey, type) : undefined; - } else { - cachedClient = getClientFromPool(cacheKey, type); + + if (clientParams !== undefined) { + endpoint = clientParams.endpoint; + cacheKeySuffix = clientParams.cacheKeySuffix; + type = clientParams.authType; } - return cachedClient; + const cacheKey = generateCacheKey(endpoint, cacheKeySuffix); + + return getClientFromPool(cacheKey, type); }; export const getDataSource = async ( @@ -129,38 +133,17 @@ export const getAWSCredential = async ( return credential; }; -export const generateCacheKey = (dataSourceAttr: DataSourceAttributes, dataSourceId?: string) => { +export const generateCacheKey = (endpoint: string, cacheKeySuffix?: string) => { const CACHE_KEY_DELIMITER = ','; - const { - auth: { type }, - endpoint, - lastUpdatedTime, - } = dataSourceAttr; - // opensearch-js client doesn't support spawning child with aws sigv4 connection class, - // we are storing/getting the actual client instead of rootClient in/from aws client pool, - // by a key of ",," - const key = - type === AuthType.SigV4 - ? endpoint + CACHE_KEY_DELIMITER + dataSourceId + CACHE_KEY_DELIMITER + lastUpdatedTime - : endpoint; - + let key = endpoint; + if (cacheKeySuffix) key += CACHE_KEY_DELIMITER + cacheKeySuffix; return key; }; -export const getSigV4Credentials = ( - accessKeyId: string, - secretAccessKey: string, - sessionToken?: string -): Credentials => { - let sigv4Credentials: Credentials; - if (sessionToken) { - sigv4Credentials = new Credentials({ - accessKeyId, - secretAccessKey, - sessionToken, - }); - } else { - sigv4Credentials = new Credentials({ accessKeyId, secretAccessKey }); - } - return sigv4Credentials; +export const getAuthenticationMethod = ( + dataSourceAttr: DataSourceAttributes, + authRegistry?: IAuthenticationMethodRegistery +): AuthenticationMethod => { + const name = dataSourceAttr.name ?? dataSourceAttr.auth.type; + return authRegistry?.getAuthenticationMethod(name) as AuthenticationMethod; }; diff --git a/src/plugins/data_source/server/legacy/configure_legacy_client.test.ts b/src/plugins/data_source/server/legacy/configure_legacy_client.test.ts index 5685392dbc10..581e545315e2 100644 --- a/src/plugins/data_source/server/legacy/configure_legacy_client.test.ts +++ b/src/plugins/data_source/server/legacy/configure_legacy_client.test.ts @@ -6,18 +6,25 @@ import { SavedObjectsClientContract } from '../../../../core/server'; import { loggingSystemMock, savedObjectsClientMock } from '../../../../core/server/mocks'; import { DATA_SOURCE_SAVED_OBJECT_TYPE } from '../../common'; -import { AuthType, DataSourceAttributes, SigV4Content } from '../../common/data_sources'; +import { + AuthType, + DataSourceAttributes, + SigV4Content, + SigV4ServiceName, +} from '../../common/data_sources'; import { DataSourcePluginConfigType } from '../../config'; import { cryptographyServiceSetupMock } from '../cryptography_service.mocks'; import { CryptographyServiceSetup } from '../cryptography_service'; -import { DataSourceClientParams, LegacyClientCallAPIParams, AuthenticationMethod } from '../types'; -import { OpenSearchClientPoolSetup } from '../client'; +import { + DataSourceClientParams, + LegacyClientCallAPIParams, + AuthenticationMethod, + ClientParameters, +} from '../types'; +import { OpenSearchClientPool, OpenSearchClientPoolSetup } from '../client'; import { ConfigOptions } from 'elasticsearch'; import { ClientMock, parseClientOptionsMock } from './configure_legacy_client.test.mocks'; -import { - authRegistryCredentialProviderMock, - CredentialsMock, -} from '../client/./configure_client.test.mocks'; +import { authRegistryCredentialProviderMock } from '../client/configure_client.test.mocks'; import { configureLegacyClient } from './configure_legacy_client'; import { CustomApiSchemaRegistry } from '../schema_registry'; import { IAuthenticationMethodRegistery } from '../auth_registry'; @@ -31,11 +38,11 @@ describe('configureLegacyClient', () => { let config: DataSourcePluginConfigType; let savedObjectsMock: jest.Mocked; let cryptographyMock: jest.Mocked; - let clientPoolSetup: OpenSearchClientPoolSetup; let configOptions: ConfigOptions; let dataSourceAttr: DataSourceAttributes; let sigV4AuthContent: SigV4Content; let authenticationMethodRegistery: jest.Mocked; + let clientParameters: ClientParameters; let mockOpenSearchClientInstance: { close: jest.Mock; @@ -52,9 +59,13 @@ describe('configureLegacyClient', () => { roleARN: 'test-role', }; + const clientPoolSetup: OpenSearchClientPoolSetup = { + getClientFromPool: jest.fn(), + addClientToPool: jest.fn(), + }; + const authMethod: AuthenticationMethod = { name: 'typeA', - authType: AuthType.SigV4, credentialProvider: jest.fn(), }; @@ -99,11 +110,6 @@ describe('configureLegacyClient', () => { secretKey: 'secretKey', }; - clientPoolSetup = { - getClientFromPool: jest.fn(), - addClientToPool: jest.fn(), - }; - callApiParams = { endpoint: 'ping', }; @@ -122,6 +128,13 @@ describe('configureLegacyClient', () => { customApiSchemaRegistryPromise: Promise.resolve(customApiSchemaRegistry), }; + clientParameters = { + authType: AuthType.SigV4, + endpoint: dataSourceAttr.endpoint, + cacheKeySuffix: '', + credentials: sigV4AuthContent, + }; + ClientMock.mockImplementation(() => mockOpenSearchClientInstance); mockOpenSearchClientInstance.ping.mockImplementation(function mockCall(this: any) { @@ -132,11 +145,12 @@ describe('configureLegacyClient', () => { }); authenticationMethodRegistery.getAuthenticationMethod.mockImplementation(() => authMethod); + authRegistryCredentialProviderMock.mockReturnValue(clientParameters); }); afterEach(() => { ClientMock.mockReset(); - CredentialsMock.mockReset(); + authRegistryCredentialProviderMock.mockReset(); jest.resetAllMocks(); }); @@ -220,9 +234,21 @@ describe('configureLegacyClient', () => { expect(parseClientOptionsMock).toHaveBeenCalled(); expect(ClientMock).toHaveBeenCalledTimes(1); - expect(ClientMock).toHaveBeenCalledWith(expect.objectContaining({ service: 'aoss' })); expect(savedObjectsMock.get).toHaveBeenCalledTimes(1); + expect(mockOpenSearchClientInstance.ping).toHaveBeenLastCalledWith({ + headers: { + auth: { + credentials: { + accessKeyId: 'accessKey', + secretAccessKey: 'accessKey', + sessionToken: '', + }, + region: sigV4AuthContent.region, + service: 'aoss', + }, + }, + }); }); test('configure client with auth.type == username_password and password contaminated', async () => { @@ -291,11 +317,6 @@ describe('configureLegacyClient', () => { references: [], }); - authRegistryCredentialProviderMock.mockReturnValue({ - credential: sigV4AuthContent, - type: AuthType.SigV4, - }); - await configureLegacyClient( { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, callApiParams, @@ -307,10 +328,19 @@ describe('configureLegacyClient', () => { expect(authenticationMethodRegistery.getAuthenticationMethod).toHaveBeenCalledTimes(1); expect(ClientMock).toHaveBeenCalledTimes(1); expect(savedObjectsMock.get).toHaveBeenCalledTimes(1); - expect(CredentialsMock).toHaveBeenCalledTimes(1); - expect(CredentialsMock).toBeCalledWith({ - accessKeyId: sigV4AuthContent.accessKey, - secretAccessKey: sigV4AuthContent.secretKey, + expect(mockOpenSearchClientInstance.ping).toHaveBeenCalledTimes(1); + expect(mockOpenSearchClientInstance.ping).toHaveBeenLastCalledWith({ + headers: { + auth: { + credentials: { + accessKeyId: sigV4AuthContent.accessKey, + secretAccessKey: sigV4AuthContent.secretKey, + sessionToken: '', + }, + region: sigV4AuthContent.region, + service: SigV4ServiceName.OpenSearch, + }, + }, }); }); @@ -330,8 +360,48 @@ describe('configureLegacyClient', () => { }); authRegistryCredentialProviderMock.mockReturnValue({ - credential: mockCredentials, - type: AuthType.SigV4, + ...clientParameters, + credentials: mockCredentials, + }); + + await configureLegacyClient( + { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, + callApiParams, + clientPoolSetup, + config, + logger + ); + expect(authRegistryCredentialProviderMock).toHaveBeenCalled(); + expect(authenticationMethodRegistery.getAuthenticationMethod).toHaveBeenCalledTimes(1); + expect(ClientMock).toHaveBeenCalledTimes(1); + expect(savedObjectsMock.get).toHaveBeenCalledTimes(1); + expect(mockOpenSearchClientInstance.ping).toHaveBeenCalledTimes(1); + expect(mockOpenSearchClientInstance.ping).toHaveBeenLastCalledWith({ + headers: { + auth: { + credentials: { + accessKeyId: mockCredentials.accessKey, + secretAccessKey: mockCredentials.secretKey, + sessionToken: mockCredentials.sessionToken, + }, + region: mockCredentials.region, + service: SigV4ServiceName.OpenSearch, + }, + }, + }); + }); + test('configureLegacyClient with auth method from registry, service == aoss, should successfully call new Client()', async () => { + savedObjectsMock.get.mockReset().mockResolvedValueOnce({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...dataSourceAttr, + auth: { + type: AuthType.SigV4, + credentials: { ...customAuthContent, service: 'aoss' }, + }, + }, + references: [], }); await configureLegacyClient( @@ -345,11 +415,327 @@ describe('configureLegacyClient', () => { expect(authenticationMethodRegistery.getAuthenticationMethod).toHaveBeenCalledTimes(1); expect(ClientMock).toHaveBeenCalledTimes(1); expect(savedObjectsMock.get).toHaveBeenCalledTimes(1); - expect(CredentialsMock).toHaveBeenCalledTimes(1); - expect(CredentialsMock).toBeCalledWith({ - accessKeyId: mockCredentials.accessKey, - secretAccessKey: mockCredentials.secretKey, - sessionToken: mockCredentials.sessionToken, + expect(mockOpenSearchClientInstance.ping).toHaveBeenCalledTimes(1); + expect(mockOpenSearchClientInstance.ping).toHaveBeenLastCalledWith({ + headers: { + auth: { + credentials: { + accessKeyId: sigV4AuthContent.accessKey, + secretAccessKey: sigV4AuthContent.secretKey, + sessionToken: '', + }, + region: sigV4AuthContent.region, + service: 'aoss', + }, + }, + }); + }); + + describe('Client Pool', () => { + let opensearchClientPoolSetup: OpenSearchClientPoolSetup; + let openSearchClientPool: OpenSearchClientPool; + beforeEach(() => { + openSearchClientPool = new OpenSearchClientPool(logger); + opensearchClientPoolSetup = openSearchClientPool.setup(config); + }); + + describe('NoAuth', () => { + beforeEach(() => { + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...dataSourceAttr, + auth: { + type: AuthType.NoAuth, + }, + }, + references: [], + }); + }); + + test('For same endpoint only one client object should be created', async () => { + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(1); + }); + + test('For different endpoints multiple client objects should be created', async () => { + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + const mockDataSourceAttr = { ...dataSourceAttr, endpoint: 'http://test.com' }; + + savedObjectsMock.get.mockReset().mockResolvedValueOnce({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...mockDataSourceAttr, + auth: { + type: AuthType.NoAuth, + }, + }, + references: [], + }); + + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(2); + }); + }); + + describe('UserNamePassword', () => { + beforeEach(() => { + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: dataSourceAttr, + references: [], + }); + jest.spyOn(cryptographyMock, 'decodeAndDecrypt').mockResolvedValue({ + decryptedText: 'password', + encryptionContext: { endpoint: 'http://localhost' }, + }); + }); + + test('For same endpoint only one client object should be created', async () => { + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(1); + }); + + test('For different endpoints multiple client objects should be created', async () => { + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + const mockDataSourceAttr = { ...dataSourceAttr, endpoint: 'http://test.com' }; + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: mockDataSourceAttr, + references: [], + }); + jest.spyOn(cryptographyMock, 'decodeAndDecrypt').mockResolvedValue({ + decryptedText: 'password', + encryptionContext: { endpoint: 'http://test.com' }, + }); + + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(2); + }); + }); + + describe('AWSSigV4', () => { + beforeEach(() => { + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...dataSourceAttr, + auth: { + type: AuthType.SigV4, + credentials: sigV4AuthContent, + }, + }, + references: [], + }); + + jest.spyOn(cryptographyMock, 'decodeAndDecrypt').mockResolvedValue({ + decryptedText: 'accessKey', + encryptionContext: { endpoint: 'http://localhost' }, + }); + }); + test('For same endpoint only one client object should be created', async () => { + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(1); + }); + + test('For different endpoints multiple client objects should be created', async () => { + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + const mockDataSourceAttr = { ...dataSourceAttr, endpoint: 'http://test.com' }; + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...mockDataSourceAttr, + auth: { + type: AuthType.SigV4, + credentials: sigV4AuthContent, + }, + }, + references: [], + }); + + jest.spyOn(cryptographyMock, 'decodeAndDecrypt').mockResolvedValue({ + decryptedText: 'accessKey', + encryptionContext: { endpoint: 'http://test.com' }, + }); + await configureLegacyClient( + dataSourceClientParams, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(2); + }); + }); + + describe('Auth Method from Registry', () => { + beforeEach(() => { + const authMethodWithClientPool: AuthenticationMethod = { + name: 'clientPoolTest', + credentialProvider: jest.fn(), + }; + authenticationMethodRegistery.getAuthenticationMethod + .mockReset() + .mockImplementation(() => authMethodWithClientPool); + const mockDataSourceAttr = { ...dataSourceAttr, name: 'custom_auth' }; + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...mockDataSourceAttr, + auth: { + type: AuthType.SigV4, + credentials: customAuthContent, + }, + }, + references: [], + }); + }); + test(' If endpoint is same for multiple requests client pool size should be 1', async () => { + await configureLegacyClient( + { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + await configureLegacyClient( + { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(1); + }); + + test('If endpoint is different for two requests client pool size should be 2', async () => { + await configureLegacyClient( + { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + const mockDataSourceAttr = { + ...dataSourceAttr, + endpoint: 'http://test.com', + name: 'custom_auth', + }; + savedObjectsMock.get.mockReset().mockResolvedValue({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { + ...mockDataSourceAttr, + auth: { + type: AuthType.SigV4, + credentials: customAuthContent, + }, + }, + references: [], + }); + authRegistryCredentialProviderMock.mockReturnValue({ + ...clientParameters, + endpoint: 'http://test.com', + cacheKeySuffix: 'test', + }); + + await configureLegacyClient( + { ...dataSourceClientParams, authRegistry: authenticationMethodRegistery }, + callApiParams, + opensearchClientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(2); + }); }); }); }); diff --git a/src/plugins/data_source/server/legacy/configure_legacy_client.ts b/src/plugins/data_source/server/legacy/configure_legacy_client.ts index 05bd5dbc1d27..fe3db6848380 100644 --- a/src/plugins/data_source/server/legacy/configure_legacy_client.ts +++ b/src/plugins/data_source/server/legacy/configure_legacy_client.ts @@ -7,7 +7,7 @@ import { Client } from '@opensearch-project/opensearch-next'; import { Client as LegacyClient, ConfigOptions } from 'elasticsearch'; import { Config } from 'aws-sdk'; import { get } from 'lodash'; -import HttpAmazonESConnector from 'http-aws-es'; +import HttpAmazonESConnector from './http_aws_es/connector'; import { Headers, LegacyAPICaller, @@ -21,10 +21,11 @@ import { DataSourceAttributes, SigV4Content, UsernamePasswordTypedContent, + SigV4ServiceName, } from '../../common/data_sources'; import { DataSourcePluginConfigType } from '../../config'; import { CryptographyServiceSetup } from '../cryptography_service'; -import { DataSourceClientParams, LegacyClientCallAPIParams } from '../types'; +import { DataSourceClientParams, LegacyClientCallAPIParams, ClientParameters } from '../types'; import { OpenSearchClientPoolSetup } from '../client'; import { parseClientOptions } from './client_config'; import { createDataSourceError } from '../lib/error'; @@ -33,10 +34,9 @@ import { getAWSCredential, getCredential, getDataSource, + getAuthenticationMethod, generateCacheKey, - getSigV4Credentials, } from '../client/configure_client_utils'; -import { IAuthenticationMethodRegistery } from '../auth_registry'; import { authRegistryCredentialProvider } from '../util/credential_provider'; export const configureLegacyClient = async ( @@ -55,10 +55,20 @@ export const configureLegacyClient = async ( ) => { try { const dataSourceAttr = await getDataSource(dataSourceId!, savedObjects); + let clientParams; + + const authenticationMethod = getAuthenticationMethod(dataSourceAttr, authRegistry); + if (authenticationMethod !== undefined) { + clientParams = await authRegistryCredentialProvider(authenticationMethod, { + dataSourceAttr, + request, + cryptography, + }); + } const rootClient = getRootClient( dataSourceAttr, openSearchClientPoolSetup.getClientFromPool, - dataSourceId + clientParams ) as LegacyClient; const registeredSchema = (await customApiSchemaRegistryPromise).getAll(); @@ -73,7 +83,7 @@ export const configureLegacyClient = async ( rootClient, dataSourceId, request, - authRegistry + clientParams ); } catch (error: any) { logger.debug( @@ -106,29 +116,29 @@ const getQueryClient = async ( rootClient?: LegacyClient, dataSourceId?: string, request?: OpenSearchDashboardsRequest, - authRegistry?: IAuthenticationMethodRegistery + authClientParams?: ClientParameters ) => { let credential; + let cacheKeySuffix; let { auth: { type }, - name, + endpoint: nodeUrl, } = dataSourceAttr; - const { endpoint: nodeUrl } = dataSourceAttr; - name = name ?? type; const clientOptions = parseClientOptions(config, nodeUrl, registeredSchema); - const cacheKey = generateCacheKey(dataSourceAttr, dataSourceId); - const authenticationMethod = authRegistry?.getAuthenticationMethod(name); - if (authenticationMethod !== undefined) { - const credentialProvider = await authRegistryCredentialProvider(authenticationMethod, { - dataSourceAttr, - request, - cryptography, - }); - credential = credentialProvider.credential; - type = credentialProvider.type; + if (authClientParams !== undefined) { + credential = authClientParams.credentials; + type = authClientParams.authType; + cacheKeySuffix = authClientParams.cacheKeySuffix; + nodeUrl = authClientParams.endpoint; + + if (credential.service === undefined) { + credential = { ...credential, service: dataSourceAttr.auth.credentials?.service }; + } } + const cacheKey = generateCacheKey(nodeUrl, cacheKeySuffix); + switch (type) { case AuthType.NoAuth: if (!rootClient) rootClient = new LegacyClient(clientOptions); @@ -154,14 +164,12 @@ const getQueryClient = async ( credential = (credential as SigV4Content) ?? (await getAWSCredential(dataSourceAttr, cryptography)); - const awsClient = rootClient ? rootClient : getAWSClient(credential, clientOptions); - addClientToPool(cacheKey, type, awsClient); + if (!rootClient) { + rootClient = getAWSClient(credential, clientOptions); + } + addClientToPool(cacheKey, type, rootClient); - return await (callAPI.bind(null, awsClient) as LegacyAPICaller)( - endpoint, - clientParams, - options - ); + return await getAWSChildClient(rootClient, { endpoint, clientParams, options }, credential); default: throw Error(`${type} is not a supported auth type for data source`); @@ -231,16 +239,34 @@ const getBasicAuthClient = async ( }; const getAWSClient = (credential: SigV4Content, clientOptions: ConfigOptions): LegacyClient => { - const { accessKey, secretKey, region, service, sessionToken } = credential; - const credentials = getSigV4Credentials(accessKey, secretKey, sessionToken); + const { region } = credential; const client = new LegacyClient({ connectionClass: HttpAmazonESConnector, awsConfig: new Config({ region, - credentials, }), - service, ...clientOptions, }); return client; }; + +const getAWSChildClient = async ( + rootClient: LegacyClient, + { endpoint, clientParams = {}, options }: LegacyClientCallAPIParams, + credential: SigV4Content +): Promise => { + const { accessKey, secretKey, region, service, sessionToken } = credential; + const authHeaders = { + auth: { + credentials: { + accessKeyId: accessKey, + secretAccessKey: secretKey, + sessionToken: sessionToken ?? '', + }, + region, + service: service ?? SigV4ServiceName.OpenSearch, + }, + }; + clientParams.headers = Object.assign({}, clientParams.headers, authHeaders); + return await (callAPI.bind(null, rootClient) as LegacyAPICaller)(endpoint, clientParams, options); +}; diff --git a/src/plugins/data_source/server/legacy/http_aws_es/connector.js b/src/plugins/data_source/server/legacy/http_aws_es/connector.js new file mode 100644 index 000000000000..09833209fba4 --- /dev/null +++ b/src/plugins/data_source/server/legacy/http_aws_es/connector.js @@ -0,0 +1,134 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * A connection handler for Amazon ES. + * + * Uses the aws-sdk to make signed requests to an Amazon ES endpoint. + * + * @param client {Client} - The Client that this class belongs to + * @param config {Object} - Configuration options + * @param [config.protocol=http:] {String} - The HTTP protocol that this connection will use, can be set to https: + * @class HttpConnector + */ +import { Config, Credentials } from 'aws-sdk'; +const AWS = require('aws-sdk'); +const HttpConnector = require('elasticsearch/src/lib/connectors/http'); +const HttpClient = require('http-aws-es/src/node'); +const crypto = require('crypto'); + +class HttpAmazonESConnector extends HttpConnector { + constructor(host, config) { + super(host, config); + + const protocol = host.protocol; + const port = host.port; + const endpoint = new AWS.Endpoint(host.host); + + if (protocol) endpoint.protocol = protocol.replace(/:?$/, ':'); + if (port) endpoint.port = port; + + this.awsConfig = config.awsConfig || AWS.config; + this.endpoint = endpoint; + this.httpOptions = config.httpOptions || this.awsConfig.httpOptions; + this.httpClient = new HttpClient(); + this.service = config.service || 'es'; + } + + request(params, cb) { + const reqParams = this.makeReqParams(params); + + let req; + let cancelled; + + const cancel = () => { + cancelled = true; + req && req.abort(); + }; + + const done = (err, response, status, headers) => { + this.log.trace(params.method, reqParams, params.body, response, status); + cb(err, response, status, headers); + }; + + // load creds + this.getAWSCredentials(reqParams) + .catch((e) => { + if (e && e.message) e.message = `AWS Credentials error: ${e.message}`; + throw e; + }) + .then((creds) => { + if (cancelled) { + return; + } + + const request = this.createRequest(params, reqParams); + // Sign the request (Sigv4) + this.signRequest(request, creds); + + request.headers['x-amz-content-sha256'] = crypto + .createHash('sha256') + .update(request.body || '', 'utf8') + .digest('hex'); + + req = this.httpClient.handleRequest(request, this.httpOptions, done); + }) + .catch(done); + + return cancel; + } + + getAWSCredentials(reqParams) { + if (reqParams.headers && reqParams.headers.auth) { + const awssigv4Cred = reqParams.headers.auth; + const accessKeyId = awssigv4Cred.credentials.accessKeyId || null; + const secretAccessKey = awssigv4Cred.credentials.secretAccessKey; + const sessionToken = awssigv4Cred.credentials.sessionToken; + const region = awssigv4Cred.region; + this.service = awssigv4Cred.service; + delete reqParams.headers.auth; + + this.awsConfig = new Config({ + region, + credentials: sessionToken + ? new Credentials({ accessKeyId, secretAccessKey, sessionToken }) + : new Credentials({ accessKeyId, secretAccessKey }), + }); + } + return new Promise((resolve, reject) => { + this.awsConfig.getCredentials((err, creds) => { + if (err) return reject(err); + return resolve(creds); + }); + }); + } + + createRequest(params, reqParams) { + const request = new AWS.HttpRequest(this.endpoint); + + // copy across params + Object.assign(request, reqParams); + + request.region = this.awsConfig.region; + if (!request.headers) request.headers = {}; + const body = params.body; + + if (body) { + const contentLength = Buffer.isBuffer(body) ? body.length : Buffer.byteLength(body); + request.headers['Content-Length'] = contentLength; + request.body = body; + } + request.headers.Host = this.endpoint.host; + + return request; + } + + signRequest(request, creds) { + const signer = new AWS.Signers.V4(request, this.service); + signer.addAuthorization(creds, new Date()); + } +} + +module.exports = HttpAmazonESConnector; diff --git a/src/plugins/data_source/server/legacy/http_aws_es/connector.test.js b/src/plugins/data_source/server/legacy/http_aws_es/connector.test.js new file mode 100644 index 000000000000..65999bd2254c --- /dev/null +++ b/src/plugins/data_source/server/legacy/http_aws_es/connector.test.js @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +const EventEmitter = require('events').EventEmitter; + +const AWS = require('aws-sdk'); +const expect = require('chai').expect; +const Host = require('elasticsearch/src/lib/host'); +const sinon = require('sinon'); + +const Connector = require('./connector'); + +describe('constructor', function () { + it('throws when no host is provided', function () { + expect(() => new Connector()).to.throw(); + }); + + it('assigns httpOptions', function () { + const httpOptions = { foo: 'bar' }; + const host = new Host(); + const connector = new Connector(host, { httpOptions }); + + expect(connector.httpOptions).to.deep.equal(httpOptions); + }); +}); + +describe('request', function () { + let connector; + beforeEach(function () { + AWS.config.update({ + region: 'us-east-1', + }); + + const host = new Host(); + connector = new Connector(host, {}); + + sinon.stub(connector, 'getAWSCredentials').resolves({ + secretAccessKey: 'abc', + accessKeyId: 'abc', + }); + + this.signRequest = sinon.stub(connector, 'signRequest'); + }); + + it('returns a cancel function that aborts the request', function (done) { + const fakeReq = new EventEmitter(); + + fakeReq.setNoDelay = sinon.stub(); + fakeReq.setSocketKeepAlive = sinon.stub(); + fakeReq.abort = sinon.stub(); + + sinon.stub(connector.httpClient, 'handleRequest').returns(fakeReq); + + const cancel = connector.request({}, () => {}); + + // since getCredentials is async, we have to let the event loop tick + setTimeout(() => { + try { + expect(cancel).to.be.a('function'); + + cancel(); + + expect(fakeReq.abort.called).to.be.true; + + done(); + } catch (e) { + done(e); + } + }); + }); + + it('calls callback with error', function (done) { + const error = new Error(); + + const fakeReq = new EventEmitter(); + + fakeReq.setNoDelay = sinon.stub(); + fakeReq.setSocketKeepAlive = sinon.stub(); + + sinon + .stub(connector.httpClient, 'handleRequest') + .callsFake(function (request, options, callback) { + callback(error); + return fakeReq; + }); + + connector.request({}, function (err) { + try { + expect(err).to.deep.equal(error); + done(); + } catch (e) { + done(e); + } + }); + }); +}); diff --git a/src/plugins/data_source/server/types.ts b/src/plugins/data_source/server/types.ts index 12b975881e7e..847e2f72ff68 100644 --- a/src/plugins/data_source/server/types.ts +++ b/src/plugins/data_source/server/types.ts @@ -51,11 +51,17 @@ export interface DataSourceCredentialsProviderOptions { export type DataSourceCredentialsProvider = ( options: DataSourceCredentialsProviderOptions -) => Promise; +) => Promise; + +export interface ClientParameters { + authType: AuthType; + endpoint: string; + cacheKeySuffix: string; + credentials: UsernamePasswordTypedContent | SigV4Content; +} export interface AuthenticationMethod { name: string; - authType: AuthType; credentialProvider: DataSourceCredentialsProvider; } diff --git a/src/plugins/data_source/server/util/credential_provider.ts b/src/plugins/data_source/server/util/credential_provider.ts index d737c932fd95..df3eeb60e5df 100644 --- a/src/plugins/data_source/server/util/credential_provider.ts +++ b/src/plugins/data_source/server/util/credential_provider.ts @@ -3,12 +3,16 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { DataSourceCredentialsProviderOptions, AuthenticationMethod } from '../types'; +import { + DataSourceCredentialsProviderOptions, + AuthenticationMethod, + ClientParameters, +} from '../types'; export const authRegistryCredentialProvider = async ( authenticationMethod: AuthenticationMethod, options: DataSourceCredentialsProviderOptions -) => ({ - credential: await authenticationMethod.credentialProvider(options), - type: authenticationMethod.authType, -}); +): Promise => { + const clientParameters = await authenticationMethod.credentialProvider(options); + return clientParameters as ClientParameters; +}; diff --git a/yarn.lock b/yarn.lock index 006887dca258..b69c779c621b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2535,10 +2535,10 @@ mkdirp "^1.0.4" rimraf "^3.0.2" -"@opensearch-project/opensearch-next@npm:@opensearch-project/opensearch@^2.3.1": - version "2.3.1" - resolved "https://registry.yarnpkg.com/@opensearch-project/opensearch/-/opensearch-2.3.1.tgz#3596e2f1f0615a7555102f6f941f0e0ec645c2cd" - integrity sha512-Kg8tddAx6sinStnNi6IeGilfvLWlonIxaRdVNiJcNPr1yMqd0c9TSegn18zKr0Pb0IM9xBIGBSkRPuh67ZN6Hw== +"@opensearch-project/opensearch-next@npm:@opensearch-project/opensearch@^2.6.0": + version "2.6.0" + resolved "https://registry.yarnpkg.com/@opensearch-project/opensearch/-/opensearch-2.6.0.tgz#cbacb34f92aed04e98cabcdc0dc65eb495023880" + integrity sha512-zgDSa/qUpoEwA+Nxjtv0qtln63M+hS4SVO94R9XjwzJAoqsUiNMjjzF6D6Djq/xJMgCzIYjvBZ5vUlB8/kXwjQ== dependencies: aws4 "^1.11.0" debug "^4.3.1"