diff --git a/common/utils.ts b/common/utils.ts index 166f703b0925..61df3020063a 100644 --- a/common/utils.ts +++ b/common/utils.ts @@ -48,7 +48,7 @@ export class DataFramePolling { private interval: number = 5000, private onPollingSuccess?: (data: T) => boolean, private onPollingError?: (error: Error) => boolean - ) { } + ) {} fetchData(params?: P) { this.loading = true; @@ -126,17 +126,14 @@ export const fetchDataFrame = ( ); }; -export const fetchDataFramePolling = ( - context: FetchDataFrameContext, - df: IDataFrame -) => { +export const fetchDataFramePolling = (context: FetchDataFrameContext, df: IDataFrame) => { const { http, path, signal } = context; const queryId = df.meta?.queryId; return from( http.fetch({ method: 'GET', path: `${path}/${queryId}`, - signal + signal, }) ); -} +}; diff --git a/public/plugin.tsx b/public/plugin.tsx index a46c5e2fb3ea..b6fa35fb960a 100644 --- a/public/plugin.tsx +++ b/public/plugin.tsx @@ -8,7 +8,7 @@ import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from '../../.. import { IStorageWrapper, Storage } from '../../../src/plugins/opensearch_dashboards_utils/public'; import { ConfigSchema } from '../common/config'; import { createQueryAssistExtension } from './query_assist'; -import { PPLSearchInterceptor, SQLSearchInterceptor } from './search'; +import { PPLSearchInterceptor, SQLSearchInterceptor, SQLAsyncSearchInterceptor } from './search'; import { setData, setStorage } from './services'; import { QueryEnhancementsPluginSetup, @@ -16,7 +16,6 @@ import { QueryEnhancementsPluginStart, QueryEnhancementsPluginStartDependencies, } from './types'; -import { SQLAsyncQlSearchInterceptor } from './search/sql_async_search_interceptor'; export type PublicConfig = Pick; @@ -50,7 +49,7 @@ export class QueryEnhancementsPlugin usageCollector: data.search.usageCollector, }); - const sqlAsyncSearchInterceptor = new SQLAsyncQlSearchInterceptor({ + const sqlAsyncSearchInterceptor = new SQLAsyncSearchInterceptor({ toasts: core.notifications.toasts, http: core.http, uiSettings: core.uiSettings, @@ -58,7 +57,6 @@ export class QueryEnhancementsPlugin usageCollector: data.search.usageCollector, }); - data.__enhance({ ui: { query: { diff --git a/public/search/index.ts b/public/search/index.ts index 624e7cf6e7b5..9835c1345f02 100644 --- a/public/search/index.ts +++ b/public/search/index.ts @@ -5,3 +5,4 @@ export { PPLSearchInterceptor } from './ppl_search_interceptor'; export { SQLSearchInterceptor } from './sql_search_interceptor'; +export { SQLAsyncSearchInterceptor } from './sql_async_search_interceptor'; diff --git a/public/search/sql_async_search_interceptor.ts b/public/search/sql_async_search_interceptor.ts index a30dc05021fb..b9be68cd126b 100644 --- a/public/search/sql_async_search_interceptor.ts +++ b/public/search/sql_async_search_interceptor.ts @@ -1,6 +1,7 @@ import { trimEnd } from 'lodash'; -import { BehaviorSubject, Observable, from, throwError } from 'rxjs'; +import { BehaviorSubject, Observable, throwError } from 'rxjs'; import { i18n } from '@osd/i18n'; +import { concatMap } from 'rxjs/operators'; import { DataPublicPluginStart, IOpenSearchDashboardsSearchRequest, @@ -9,17 +10,26 @@ import { SearchInterceptor, SearchInterceptorDeps, } from '../../../../src/plugins/data/public'; -import { getRawDataFrame, getRawQueryString, IDataFrameResponse } from '../../../../src/plugins/data/common'; -import { API, DataFramePolling, FetchDataFrameContext, SEARCH_STRATEGY, fetchDataFrame, fetchDataFramePolling } from '../../common'; +import { + getRawDataFrame, + getRawQueryString, + IDataFrameResponse, +} from '../../../../src/plugins/data/common'; +import { + API, + DataFramePolling, + FetchDataFrameContext, + SEARCH_STRATEGY, + fetchDataFrame, + fetchDataFramePolling, +} from '../../common'; import { QueryEnhancementsPluginStartDependencies } from '../types'; -import { concatMap } from 'rxjs/operators'; -export class SQLAsyncQlSearchInterceptor extends SearchInterceptor { +export class SQLAsyncSearchInterceptor extends SearchInterceptor { protected queryService!: DataPublicPluginStart['query']; protected aggsService!: DataPublicPluginStart['search']['aggs']; protected dataFrame$ = new BehaviorSubject(undefined); - constructor(deps: SearchInterceptorDeps) { super(deps); @@ -47,8 +57,8 @@ export class SQLAsyncQlSearchInterceptor extends SearchInterceptor { return throwError(this.handleSearchError('DataFrame is not defined', request, signal!)); } - const queryString = dataFrame.meta?.queryConfig?.formattedQs() ?? getRawQueryString(searchRequest) ?? ''; - + const queryString = + dataFrame.meta?.queryConfig?.formattedQs() ?? getRawQueryString(searchRequest) ?? ''; const onPollingSuccess = (pollingResult: any) => { if (pollingResult && pollingResult.body.meta.status === 'SUCCESS') { @@ -65,12 +75,11 @@ export class SQLAsyncQlSearchInterceptor extends SearchInterceptor { return true; } return false; - } + }; const onPollingError = (error: Error) => { - console.error('Polling error:', error); throw new Error(); - } + }; return fetchDataFrame(dfContext, queryString, dataFrame).pipe( concatMap((jobResponse) => { diff --git a/server/index.ts b/server/index.ts index b8437d8a3397..342d3d222643 100644 --- a/server/index.ts +++ b/server/index.ts @@ -20,6 +20,7 @@ export function plugin(initializerContext: PluginInitializerContext) { export { Facet, + JobsFacet, OpenSearchPPLPlugin, OpenSearchObservabilityPlugin, shimStats, diff --git a/server/routes/index.ts b/server/routes/index.ts index b0d70ee04400..a0bdd39e812a 100644 --- a/server/routes/index.ts +++ b/server/routes/index.ts @@ -104,42 +104,4 @@ export function defineRoutes( defineRoute(logger, router, searchStrategies, SEARCH_STRATEGY.SQL); defineRoute(logger, router, searchStrategies, SEARCH_STRATEGY.SQLAsync); registerQueryAssistRoutes(router); - // defineRoute(logger, router, searchStrategies, SEARCH_STRATEGY.SQLAsync); - // sql async jobs - router.post( - { - path: `/api/sqlasyncql/jobs`, - validate: { - body: schema.object({ - query: schema.object({ - qs: schema.string(), - format: schema.string(), - }), - df: schema.any(), - dataSource: schema.nullable(schema.string()), - }), - }, - }, - async (context, req, res): Promise => { - try { - const queryRes: IDataFrameResponse = await searchStrategies.sqlasync.search( - context, - req as any, - {} - ); - const result: any = { - body: { - ...queryRes, - }, - }; - return res.ok(result); - } catch (err) { - logger.error(err); - return res.custom({ - statusCode: 500, - body: err, - }); - } - } - ); } diff --git a/server/search/sql_async_search_strategy.ts b/server/search/sql_async_search_strategy.ts index 84b06acd1c61..36c6f1c99f4c 100644 --- a/server/search/sql_async_search_strategy.ts +++ b/server/search/sql_async_search_strategy.ts @@ -7,13 +7,14 @@ import { SharedGlobalConfig, Logger, ILegacyClusterClient } from 'opensearch-das import { Observable } from 'rxjs'; import { ISearchStrategy, SearchUsage } from '../../../../src/plugins/data/server'; import { + DATA_FRAME_TYPES, + IDataFrameError, IDataFrameResponse, IOpenSearchDashboardsSearchRequest, PartialDataFrame, createDataFrame, } from '../../../../src/plugins/data/common'; -import { Facet } from '../utils'; -import { JobsFacet } from '../utils'; +import { Facet, JobsFacet } from '../utils'; export const sqlAsyncSearchStrategyProvider = ( config$: Observable, @@ -35,15 +36,15 @@ export const sqlAsyncSearchStrategyProvider = ( datasource: df?.meta?.queryConfig?.dataSource, lang: 'sql', sessionId: df?.meta?.sessionId, - } + }; const rawResponse = await sqlAsyncFacet.describeQuery(context, request); // handles failure if (!rawResponse.success) { return { - type: 'data_frame_polling', + type: DATA_FRAME_TYPES.POLLING, body: { error: rawResponse.data }, took: rawResponse.took, - }; + } as IDataFrameError; } const queryId = rawResponse.data?.queryId; const sessionId = rawResponse.data?.sessionId; @@ -60,13 +61,13 @@ export const sqlAsyncSearchStrategyProvider = ( }; dataFrame.name = request.body?.datasource; return { - type: 'data_frame_polling', + type: DATA_FRAME_TYPES.POLLING, body: dataFrame, took: rawResponse.took, }; } else { const queryId = request.params.queryId; - request.params = { queryId } + request.params = { queryId }; const asyncResponse = await sqlAsyncJobsFacet.describeQuery(request); const status = asyncResponse.data.status; const partial: PartialDataFrame = { @@ -77,21 +78,21 @@ export const sqlAsyncSearchStrategyProvider = ( dataFrame.fields.forEach((field, index) => { field.values = asyncResponse?.data.datarows.map((row: any) => row[index]); }); - + dataFrame.size = asyncResponse?.data?.datarows?.length || 0; - + dataFrame.meta = { status, queryId, - error: status === 'FAILED' && asyncResponse.data?.error + error: status === 'FAILED' && asyncResponse.data?.error, }; dataFrame.name = request.body?.datasource; - + // TODO: MQL should this be the time for polling or the time for job creation? if (usage) usage.trackSuccess(asyncResponse.took); - + return { - type: 'data_frame_polling', + type: DATA_FRAME_TYPES.POLLING, body: dataFrame, took: asyncResponse.took, }; @@ -103,4 +104,4 @@ export const sqlAsyncSearchStrategyProvider = ( } }, }; -}; \ No newline at end of file +};