Skip to content

Commit

Permalink
updated
Browse files Browse the repository at this point in the history
Signed-off-by: Kawika Avilla <[email protected]>
  • Loading branch information
kavilla committed Aug 29, 2024
1 parent c40b6be commit 7d40c09
Showing 1 changed file with 74 additions and 88 deletions.
162 changes: 74 additions & 88 deletions src/plugins/query_enhancements/public/datasets/s3_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { HttpSetup, SavedObjectsClientContract } from 'opensearch-dashboards/public';
import { Observable, Subscription, timer } from 'rxjs';
import { timer } from 'rxjs';
import { filter, map, mergeMap, takeWhile } from 'rxjs/operators';
import {
DATA_STRUCTURE_META_TYPES,
DEFAULT_DATA,
DataSourceMeta,
DataStructure,
DataStructureCustomMeta,
Dataset,
Expand All @@ -26,27 +22,30 @@ const S3_ICON = 'visTable';

export const s3TypeConfig: DatasetTypeConfig = {
id: DATASET.S3,
title: DATASET.S3,
title: 'S3 Connections',
meta: {
icon: { type: S3_ICON },
tooltip: 'S3 Data Source',
tooltip: 'Amazon S3 Connections',
},

toDataset: (path: DataStructure[]): Dataset => {
const s3 = path[path.length - 1];
const dataSource = path.find((ds) => ds.type === DATASET.S3);
const dataSource = path.find((ds) => ds.type === 'DATA_SOURCE');
const connection = path.find((ds) => ds.type === 'CONNECTION');
const database = path.find((ds) => ds.type === 'DATABASE');
const table = path[path.length - 1];

return {
id: s3.id,
title: s3.title,
id: table.id,
title: `${connection?.title}.${database?.title}.${table.title}`,
type: DATASET.S3,
dataSource: dataSource
? {
id: dataSource.id,
title: dataSource.title,
type: dataSource.type,
meta: table.meta as DataSourceMeta,
}
: undefined,
: DEFAULT_DATA.STRUCTURES.LOCAL_DATASOURCE,
};
},

Expand Down Expand Up @@ -111,38 +110,63 @@ const fetch = (
http: HttpSetup,
path: DataStructure[],
type: 'DATABASE' | 'TABLE'
): Observable<DataStructure[]> => {
const dataSource = path.find((ds) => ds.type === 'DATA_SOURCE');
const parent = path[path.length - 1];
const meta = parent.meta as DataStructureCustomMeta;

return timer(0, 5000).pipe(
mergeMap(() =>
http.fetch('../../api/enhancements/datasource/jobs', {
query: {
id: dataSource?.id,
queryId: meta.query.id,
): Promise<DataStructure[]> => {
return new Promise((resolve, reject) => {
const dataSource = path.find((ds) => ds.type === 'DATA_SOURCE');
const parent = path[path.length - 1];
const meta = parent.meta as DataStructureCustomMeta;

timer(0, 5000)
.pipe(
mergeMap(() =>
http.fetch('../../api/enhancements/datasource/jobs', {
query: {
id: dataSource?.id,
queryId: meta.query.id,
},
})
),
takeWhile(
(response) => response.status !== 'SUCCESS' && response.status !== 'FAILED',
true
),
filter((response) => response.status === 'SUCCESS'),
map((response) => {
if (response.status === 'FAILED') {
throw new Error('Job failed');
}
return response.datarows.map((item: string[]) => ({
id: `${parent.id}.${item[type === 'DATABASE' ? 0 : 1]}`,
title: item[type === 'DATABASE' ? 0 : 1],
type,
meta: {
type: DATA_STRUCTURE_META_TYPES.CUSTOM,
query: meta.query,
session: meta.session,
} as DataStructureCustomMeta,
}));
})
)
.subscribe({
next: (dataStructures) => {
resolve(dataStructures);
},
})
),
takeWhile((response) => response.status !== 'SUCCESS' && response.status !== 'FAILED', true),
filter((response) => response.status === 'SUCCESS'),
map((response) => {
if (response.status === 'FAILED') {
throw new Error('Job failed');
}
return response.datarows.map((item: string[]) => ({
id: `${parent.id}.${item[type === 'DATABASE' ? 0 : 1]}`,
title: item[type === 'DATABASE' ? 0 : 1],
type,
meta: {
type: DATA_STRUCTURE_META_TYPES.CUSTOM,
query: meta.query,
session: meta.session,
} as DataStructureCustomMeta,
}));
})
);
error: (error) => {
reject(error);
},
complete: () => {
reject(new Error('No response'));
},
});
});
};

const setMeta = (dataStructure: DataStructure, response: any) => {
return {
...dataStructure.meta,
query: { id: response.queryId },
session: { id: response.sessionId },
} as DataStructureCustomMeta;
};

const fetchDataSources = async (client: SavedObjectsClientContract): Promise<DataStructure[]> => {
Expand Down Expand Up @@ -192,7 +216,7 @@ const fetchDatabases = async (http: HttpSetup, path: DataStructure[]): Promise<D
const dataSource = path.find((ds) => ds.type === 'DATA_SOURCE');
const connection = path[path.length - 1];
const query = (connection.meta as DataStructureCustomMeta).query;
const jobResponse = await http.post(`../../api/enhancements/datasource/jobs`, {
const response = await http.post(`../../api/enhancements/datasource/jobs`, {
body: JSON.stringify({
lang: 'sql',
query: `SHOW DATABASES in ${connection.title}`,
Expand All @@ -201,34 +225,15 @@ const fetchDatabases = async (http: HttpSetup, path: DataStructure[]): Promise<D
query,
});

connection.meta = {
...connection.meta,
query: {
id: jobResponse.queryId,
},
session: {
id: jobResponse.sessionId,
},
} as DataStructureCustomMeta;
connection.meta = setMeta(connection, response);

return new Promise((resolve, reject) => {
const subscription: Subscription = fetch(http, path, 'DATABASE').subscribe({
next: (dataStructures) => {
subscription.unsubscribe();
resolve(dataStructures);
},
error: (error) => {
subscription.unsubscribe();
reject(error);
},
});
});
return fetch(http, path, 'DATABASE');
};

const fetchTables = async (http: HttpSetup, path: DataStructure[]): Promise<DataStructure[]> => {
const dataSource = path.find((ds) => ds.type === 'DATA_SOURCE');
const database = path[path.length - 1];
const jobResponse = await http.post(`../../api/enhancements/datasource/jobs`, {
const response = await http.post(`../../api/enhancements/datasource/jobs`, {
body: JSON.stringify({
lang: 'sql',
query: `SHOW TABLES in ${database.title}`,
Expand All @@ -239,26 +244,7 @@ const fetchTables = async (http: HttpSetup, path: DataStructure[]): Promise<Data
},
});

database.meta = {
...database.meta,
query: {
id: jobResponse.queryId,
},
session: {
id: jobResponse.sessionId,
},
} as DataStructureCustomMeta;
database.meta = setMeta(database, response);

return new Promise((resolve, reject) => {
const subscription: Subscription = fetch(http, path, 'TABLE').subscribe({
next: (dataStructures) => {
subscription.unsubscribe();
resolve(dataStructures);
},
error: (error) => {
subscription.unsubscribe();
reject(error);
},
});
});
return fetch(http, path, 'TABLE');
};

0 comments on commit 7d40c09

Please sign in to comment.