-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Multiple Data Source] Support SigV4 as a new auth type of datasource with aliased client #3470
Changes from all commits
837cd21
af8815f
bcdc5c8
7989b81
19d7d9c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,15 +3,16 @@ | |
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
import { Client } from '@opensearch-project/opensearch'; | ||
import { Client } from '@opensearch-project/opensearch-next'; | ||
import { Client as LegacyClient } from 'elasticsearch'; | ||
import LRUCache from 'lru-cache'; | ||
import { Logger } from 'src/core/server'; | ||
import { AuthType } from '../../common/data_sources'; | ||
import { DataSourcePluginConfigType } from '../../config'; | ||
|
||
export interface OpenSearchClientPoolSetup { | ||
getClientFromPool: (id: string) => Client | LegacyClient | undefined; | ||
addClientToPool: (endpoint: string, client: Client | LegacyClient) => void; | ||
getClientFromPool: (endpoint: string, authType: AuthType) => Client | LegacyClient | undefined; | ||
addClientToPool: (endpoint: string, authType: AuthType, client: Client | LegacyClient) => void; | ||
} | ||
|
||
/** | ||
|
@@ -21,23 +22,28 @@ export interface OpenSearchClientPoolSetup { | |
* It reuse TPC connections for each OpenSearch endpoint. | ||
*/ | ||
export class OpenSearchClientPool { | ||
// LRU cache | ||
// LRU cache of client | ||
// key: data source endpoint | ||
// value: OpenSearch client object | Legacy client object | ||
private cache?: LRUCache<string, Client | LegacyClient>; | ||
// value: OpenSearch client | Legacy client | ||
private clientCache?: LRUCache<string, Client | LegacyClient>; | ||
// LRU cache of aws clients | ||
// key: endpoint + dataSourceId + lastUpdatedTime together to support update case. | ||
// value: OpenSearch client | Legacy client | ||
private awsClientCache?: LRUCache<string, Client | LegacyClient>; | ||
private isClosed = false; | ||
|
||
constructor(private logger: Logger) {} | ||
|
||
public setup(config: DataSourcePluginConfigType): OpenSearchClientPoolSetup { | ||
const logger = this.logger; | ||
const { size } = config.clientPool; | ||
const MAX_AGE = 15 * 60 * 1000; // by default, TCP connection times out in 15 minutes | ||
|
||
this.cache = new LRUCache({ | ||
this.clientCache = new LRUCache({ | ||
max: size, | ||
maxAge: 15 * 60 * 1000, // by default, TCP connection times out in 15 minutes | ||
maxAge: MAX_AGE, | ||
|
||
async dispose(endpoint, client) { | ||
async dispose(key, client) { | ||
try { | ||
await client.close(); | ||
} catch (error: any) { | ||
|
@@ -50,12 +56,34 @@ export class OpenSearchClientPool { | |
}); | ||
this.logger.info(`Created data source client pool of size ${size}`); | ||
|
||
const getClientFromPool = (endpoint: string) => { | ||
return this.cache!.get(endpoint); | ||
// aws client specific pool | ||
this.awsClientCache = new LRUCache({ | ||
max: size, | ||
maxAge: MAX_AGE, | ||
|
||
async dispose(key, client) { | ||
try { | ||
await client.close(); | ||
} catch (error: any) { | ||
logger.warn( | ||
`Error closing OpenSearch client when removing from aws client pool: ${error.message}` | ||
); | ||
} | ||
}, | ||
}); | ||
this.logger.info(`Created data source aws client pool of size ${size}`); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This message needs rewording. |
||
|
||
const getClientFromPool = (key: string, authType: AuthType) => { | ||
const selectedCache = authType === AuthType.SigV4 ? this.awsClientCache : this.clientCache; | ||
|
||
return selectedCache!.get(key); | ||
}; | ||
|
||
const addClientToPool = (endpoint: string, client: Client | LegacyClient) => { | ||
this.cache!.set(endpoint, client); | ||
const addClientToPool = (key: string, authType: string, client: Client | LegacyClient) => { | ||
const selectedCache = authType === AuthType.SigV4 ? this.awsClientCache : this.clientCache; | ||
if (!selectedCache?.has(key)) { | ||
return selectedCache!.set(key, client); | ||
} | ||
}; | ||
|
||
return { | ||
|
@@ -71,7 +99,15 @@ export class OpenSearchClientPool { | |
if (this.isClosed) { | ||
return; | ||
} | ||
await Promise.all(this.cache!.values().map((client) => client.close())); | ||
this.isClosed = true; | ||
|
||
try { | ||
await Promise.all([ | ||
...this.clientCache!.values().map((client) => client.close()), | ||
...this.awsClientCache!.values().map((client) => client.close()), | ||
]); | ||
this.isClosed = true; | ||
} catch (error) { | ||
this.logger.error(`Error closing clients in pool. ${error}`); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,7 @@ import { DataSourcePluginConfigType } from '../../config'; | |
import { ClientMock, parseClientOptionsMock } from './configure_client.test.mocks'; | ||
import { OpenSearchClientPoolSetup } from './client_pool'; | ||
import { configureClient } from './configure_client'; | ||
import { ClientOptions } from '@opensearch-project/opensearch'; | ||
import { ClientOptions } from '@opensearch-project/opensearch-next'; | ||
// eslint-disable-next-line @osd/eslint/no-restricted-paths | ||
import { opensearchClientMock } from '../../../../core/server/opensearch/client/mocks'; | ||
import { cryptographyServiceSetupMock } from '../cryptography_service.mocks'; | ||
|
@@ -137,7 +137,7 @@ describe('configureClient', () => { | |
configureClient(dataSourceClientParams, clientPoolSetup, config, logger) | ||
).rejects.toThrowError(); | ||
|
||
expect(ClientMock).toHaveBeenCalledTimes(1); | ||
expect(ClientMock).not.toHaveBeenCalled(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This previously made sure it was called but also that it was called only once; is that no longer true? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no longer true. The logic was imporved. If password determined to be contaminated, there's no need to call the |
||
expect(savedObjectsMock.get).toHaveBeenCalledTimes(1); | ||
expect(decodeAndDecryptSpy).toHaveBeenCalledTimes(1); | ||
}); | ||
|
@@ -152,7 +152,7 @@ describe('configureClient', () => { | |
configureClient(dataSourceClientParams, clientPoolSetup, config, logger) | ||
).rejects.toThrowError(); | ||
|
||
expect(ClientMock).toHaveBeenCalledTimes(1); | ||
expect(ClientMock).not.toHaveBeenCalled(); | ||
expect(savedObjectsMock.get).toHaveBeenCalledTimes(1); | ||
expect(decodeAndDecryptSpy).toHaveBeenCalledTimes(1); | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a common practice in OSD to issue warnings with the term "Error"? If not, we should change this to
Also, why "AWS", that too lowercase?