Skip to content

Commit

Permalink
init commit to add datasource client lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongnansu committed Jul 20, 2022
1 parent db8c74e commit 282e578
Show file tree
Hide file tree
Showing 14 changed files with 656 additions and 7 deletions.
14 changes: 14 additions & 0 deletions src/core/server/core_route_handler_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
} from './opensearch';
import { Auditor } from './audit_trail';
import { InternalUiSettingsServiceStart, IUiSettingsClient } from './ui_settings';
import { InternalOpenSearchDataServiceStart } from './opensearch_data/types';

class CoreOpenSearchRouteHandlerContext {
#client?: IScopedClusterClient;
Expand Down Expand Up @@ -69,6 +70,15 @@ class CoreOpenSearchRouteHandlerContext {
}
}

class CoreOpenSearchDataSourceRouteHandlerContext {
constructor(private readonly opensearchDataStart: InternalOpenSearchDataServiceStart) {}

public async getClient(dataSourceId: string) {
const client = await this.opensearchDataStart.client.asDataSource(dataSourceId);
return client;
}
}

class CoreSavedObjectsRouteHandlerContext {
constructor(
private readonly savedObjectsStart: InternalSavedObjectsServiceStart,
Expand Down Expand Up @@ -115,6 +125,7 @@ export class CoreRouteHandlerContext {
readonly opensearch: CoreOpenSearchRouteHandlerContext;
readonly savedObjects: CoreSavedObjectsRouteHandlerContext;
readonly uiSettings: CoreUiSettingsRouteHandlerContext;
readonly opensearchData: CoreOpenSearchDataSourceRouteHandlerContext;

constructor(
private readonly coreStart: InternalCoreStart,
Expand All @@ -124,6 +135,9 @@ export class CoreRouteHandlerContext {
this.coreStart.opensearch,
this.request
);
this.opensearchData = new CoreOpenSearchDataSourceRouteHandlerContext(
this.coreStart.opensearchData
);
this.savedObjects = new CoreSavedObjectsRouteHandlerContext(
this.coreStart.savedObjects,
this.request
Expand Down
4 changes: 4 additions & 0 deletions src/core/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import {
configSchema as opensearchsearchConfigSchema,
OpenSearchServiceStart,
IScopedClusterClient,
OpenSearchClient,
} from './opensearch';
import { HttpServiceSetup, HttpServiceStart } from './http';
import { HttpResources } from './http_resources';
Expand Down Expand Up @@ -403,6 +404,9 @@ export interface RequestHandlerContext {
client: ILegacyScopedClusterClient;
};
};
opensearchData: {
getClient(dataSourceId: string): Promise<OpenSearchClient>;
};
uiSettings: {
client: IUiSettingsClient;
};
Expand Down
6 changes: 6 additions & 0 deletions src/core/server/internal_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,18 @@ import { InternalStatusServiceSetup } from './status';
import { AuditTrailSetup, AuditTrailStart } from './audit_trail';
import { InternalLoggingServiceSetup } from './logging';
import { CoreUsageDataStart } from './core_usage_data';
import {
InternalOpenSearchDataServiceSetup,
InternalOpenSearchDataServiceStart,
} from './opensearch_data/types';

/** @internal */
export interface InternalCoreSetup {
capabilities: CapabilitiesSetup;
context: ContextSetup;
http: InternalHttpServiceSetup;
opensearch: InternalOpenSearchServiceSetup;
opensearchData: InternalOpenSearchDataServiceSetup;
savedObjects: InternalSavedObjectsServiceSetup;
status: InternalStatusServiceSetup;
uiSettings: InternalUiSettingsServiceSetup;
Expand All @@ -72,6 +77,7 @@ export interface InternalCoreSetup {
export interface InternalCoreStart {
capabilities: CapabilitiesStart;
opensearch: InternalOpenSearchServiceStart;
opensearchData: InternalOpenSearchDataServiceStart;
http: InternalHttpServiceStart;
metrics: InternalMetricsServiceStart;
savedObjects: InternalSavedObjectsServiceStart;
Expand Down
103 changes: 103 additions & 0 deletions src/core/server/opensearch_data/client/data_source_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

import { Client } from '@opensearch-project/opensearch';
import { Logger } from '../../logging';
import { OpenSearchClient, OpenSearchClientConfig } from '../../opensearch/client';
import { SavedObjectsClientContract } from '../../saved_objects/types';

/**
* TODO: update doc
* Represents an OpenSearch cluster API client created by the platform.
* It allows to call API on behalf of the internal OpenSearch Dashboards user and
* the actual user that is derived from the request headers (via `asScoped(...)`).
*
* @public
**/
export interface IDdataSourceClient {
/**
* TODO: update doc. Creates a {@link IScopedClusterClient | scoped cluster client} bound to given {@link ScopeableRequest | request}
*/
asDataSource: (dataSourceId: string) => Promise<OpenSearchClient>;
}

/**
* See {@link IClusterClient}
*
* @public
*/
export interface ICustomDataSourceClient extends IDdataSourceClient {
/**
* Closes the data source client. After that client cannot be used and one should
* create a new client instance to be able to interact with OpenSearch API.
*/
close: () => Promise<void>;
}

export class DataSourceClient implements ICustomDataSourceClient {
public dataSourceClientsPool: Map<string, Client>;
private savedObjectClient: SavedObjectsClientContract;
private isDataSourceFeautureEnabled = true;
private isClosed = false;

constructor(
private readonly config: OpenSearchClientConfig,
savedObjectClient: SavedObjectsClientContract,
logger: Logger
) {
// init pool as empty
this.dataSourceClientsPool = new Map<string, Client>();
this.savedObjectClient = savedObjectClient;
// TODO: 1.read config and determine isDataSourceEnabled Flag
// 2. throw error if isDataSourceEnabled == false, while API is called
}
async asDataSource(dataSourceId: string) {
// 1. fetch meta info of data source using saved_object client
const dataSource = await this.savedObjectClient.get('data-source', dataSourceId);

// 2. TODO: parse to DataSource object, need update once dataSource type is in place
const dataSourceObj = dataSource!.attributes as any;
const url = dataSourceObj.endpoint.url;
/**
* TODO:
* credential manager will provide "decrypt(authId: string)" to return auth
* Example code: cosnt {username, password} = credentialManager.decrpt(dataSourceObj.authId)
*/
const username = dataSourceObj.endpoint.credentials.username;
const password = dataSourceObj.endpoint.credentials.password;

// 2. build/find client and return
let dataSourceClient = this.dataSourceClientsPool.get(dataSourceId);
if (!dataSourceClient) {
// TODO: make use of existing default clientConfig to build client
dataSourceClient = new Client({
node: url,
auth: {
username,
password,
},
});
// update pool
this.dataSourceClientsPool.set(dataSourceId, dataSourceClient);
}
return dataSourceClient;
}

// close anything in pool
public async close() {
if (this.isClosed) {
return;
}
this.isClosed = true;
await Promise.all([
this.dataSourceClientsPool.forEach((v, k) => {
v.close();
}),
]);
}
}
98 changes: 98 additions & 0 deletions src/core/server/opensearch_data/opensearch_data_service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

import { Observable, Subject } from 'rxjs';
import { first, map, shareReplay, takeUntil } from 'rxjs/operators';
import { CoreService } from 'src/core/types';
import { AuditorFactory, AuditTrailStart } from '../audit_trail';
import { CoreContext } from '../core_context';
import { Logger } from '../logging';
import { OpenSearchClientConfig } from '../opensearch/client';
import { OpenSearchConfig, OpenSearchConfigType } from '../opensearch/opensearch_config';
import { OpenSearchService } from '../opensearch/opensearch_service';
import {
InternalOpenSearchServiceSetup,
InternalOpenSearchServiceStart,
} from '../opensearch/types';
import { pollOpenSearchNodesVersion } from '../opensearch/version_check/ensure_opensearch_version';
import {
InternalSavedObjectsServiceSetup,
InternalSavedObjectsServiceStart,
SavedObjectsClient,
} from '../saved_objects';
import { SavedObjectsClientContract } from '../types';
import { DataSourceClient } from './client/data_source_client';

interface StartDeps {
savedObjects: InternalSavedObjectsServiceStart;
auditTrail: AuditTrailStart;
}

export class OpenSearchDataService implements CoreService<any, any> {
private readonly log: Logger;
private readonly config$: Observable<OpenSearchConfig>;
private auditorFactory?: AuditorFactory;
private stop$ = new Subject();
private opensearchDashboardsVersion: string;
private savedObjectClient?: SavedObjectsClientContract;
private dataSourceClient?: DataSourceClient;

constructor(private readonly coreContext: CoreContext) {
this.opensearchDashboardsVersion = coreContext.env.packageInfo.version;
this.log = coreContext.logger.get('opensearch-data-service');
this.config$ = coreContext.configService
.atPath<OpenSearchConfigType>('opensearch') // TODO: update if we'll add data source specific configs
.pipe(map((rawConfig) => new OpenSearchConfig(rawConfig)));
}

public async setup(): Promise<any> {
this.log.debug('Setting up opensearch data service');

const config = await this.config$.pipe(first()).toPromise();

// TODO: update accordingly when we decide how to check node/version compatibility in setup stage

// const opensearchNodesCompatibility$ = pollOpenSearchNodesVersion({
// internalClient: this.client.asInternalUser,
// optimizedHealthcheckId: config.optimizedHealthcheckId,
// log: this.log,
// ignoreVersionMismatch: config.ignoreVersionMismatch,
// opensearchVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
// opensearchDashboardsVersion: this.opensearchDashboardsVersion,
// }).pipe(takeUntil(this.stop$), shareReplay({ refCount: true, bufferSize: 1 }));

return {};
}

public async start({ savedObjects, auditTrail }: StartDeps): Promise<any> {
this.auditorFactory = auditTrail;
const config = await this.config$.pipe(first()).toPromise();
this.savedObjectClient = new SavedObjectsClient(savedObjects.createInternalRepository());
this.dataSourceClient = this.createDataSourceClient('data-source', config);

return {
dataSourceClient: this.dataSourceClient,
};
}

public async stop() {
this.log.debug('Stopping opensearch data service');
this.stop$.next();
if (this.dataSourceClient) {
await this.dataSourceClient.close();
}
}

private createDataSourceClient(type: string, config: OpenSearchClientConfig) {
return new DataSourceClient(
config,
this.savedObjectClient!,
this.coreContext.logger.get('opensearch', type)
);
}
}
44 changes: 44 additions & 0 deletions src/core/server/opensearch_data/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

import { Observable } from 'rxjs';
import { OpenSearchStatusMeta } from '../opensearch/types';
import { NodesVersionCompatibility } from '../opensearch/version_check/ensure_opensearch_version';
import { ServiceStatus } from '../status';
import { IDdataSourceClient } from './client/data_source_client';

/**
* @public
*/
export interface OpenSearchDataServiceSetup {
opensearchNodesCompatibility$: Observable<NodesVersionCompatibility>;
status$: Observable<ServiceStatus<OpenSearchStatusMeta>>;
}

/** @internal */
export type InternalOpenSearchDataServiceSetup = OpenSearchDataServiceSetup;

/**
* @public
*/
export interface OpenSearchDataServiceStart {
/**
* A pre-configured {@link IDdataSourceClient | OpenSearch client}
*
* @example
* ```js
* const client = core.opensearchDataSource.client;
* ```
*/
readonly client: IDdataSourceClient;
}

/**
* @internal
*/
export type InternalOpenSearchDataServiceStart = OpenSearchDataServiceStart;
Loading

0 comments on commit 282e578

Please sign in to comment.