Skip to content

Commit

Permalink
[Backport 2.x] Implement saved query substitution for S3 integrations (
Browse files Browse the repository at this point in the history
…#1711)

* Add example queries for cloudfront



* Add Flint S3 label to haproxy



* Add query substitution method to build process



* Fix tests



* Update build API with new data source fields



* Update frontend to use new API term



* Use new fields in original integration creation request



* Fix broken data source usage ref



---------


(cherry picked from commit 0b58e64)

Signed-off-by: Simeon Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Simeon Widdis <[email protected]>
  • Loading branch information
3 people authored Apr 17, 2024
1 parent 8e4bbd8 commit b1405f5
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 38 deletions.
2 changes: 2 additions & 0 deletions docs/integrations/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ the author in hindsight.
If working on S3-based integrations, it's worth noting that queries have some values
[substituted](https://github.com/opensearch-project/dashboards-observability/blob/4e1e0e585/public/components/integrations/components/setup_integration.tsx#L438) when installing. They are:

- `{table_name}` is the fully qualified name of the Flint table, typically `datasource.database.object_name`.
This is also substituted in any linked Saved Queries when using S3-based integrations.
- `{s3_bucket_location}` to locate data.
- `{s3_checkpoint_location}` to store intermediate results, which is required by Spark.
- `{object_name}` used for giving tables a unique name per-integration to avoid collisions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,14 @@ const createIndexMapping = async (
});
};

const createDataSourceMappings = async (
const createIndexPatternMappings = async (
targetDataSource: string,
integrationTemplateId: string,
integration: IntegrationConfig,
setToast: (title: string, color?: Color, text?: string | undefined) => void
): Promise<void> => {
// TODO the nested methods still need the dataSource -> indexPattern rename applied, sub-methods
// here still have old naming convention
const http = coreRefs.http!;
const data = await http.get(`${INTEGRATIONS_BASE}/repository/${integrationTemplateId}/schema`);
let error: string | null = null;
Expand Down Expand Up @@ -282,25 +284,42 @@ export async function addIntegrationRequest(
integration: IntegrationConfig,
setToast: (title: string, color?: Color, text?: string | undefined) => void,
name?: string,
dataSource?: string,
indexPattern?: string,
workflows?: string[],
skipRedirect?: boolean
skipRedirect?: boolean,
dataSourceInfo?: { dataSource: string; tableName: string }
): Promise<boolean> {
const http = coreRefs.http!;
if (addSample) {
createDataSourceMappings(
createIndexPatternMappings(
`ss4o_${integration.type}-${integrationTemplateId}-*-sample`,
integrationTemplateId,
integration,
setToast
);
name = `${integrationTemplateId}-sample`;
dataSource = `ss4o_${integration.type}-${integrationTemplateId}-sample-sample`;
indexPattern = `ss4o_${integration.type}-${integrationTemplateId}-sample-sample`;
}

const createReqBody: {
name?: string;
indexPattern?: string;
workflows?: string[];
dataSource?: string;
tableName?: string;
} = {
name,
indexPattern,
workflows,
};
if (dataSourceInfo) {
createReqBody.dataSource = dataSourceInfo.dataSource;
createReqBody.tableName = dataSourceInfo.tableName;
}

let response: boolean = await http
.post(`${INTEGRATIONS_BASE}/store/${templateName}`, {
body: JSON.stringify({ name, dataSource, workflows }),
body: JSON.stringify(createReqBody),
})
.then((res) => {
setToast(`${name} integration successfully added!`, 'success');
Expand All @@ -326,13 +345,13 @@ export async function addIntegrationRequest(
});
const requestBody =
data.sampleData
.map((record) => `{"create": { "_index": "${dataSource}" } }\n${JSON.stringify(record)}`)
.map((record) => `{"create": { "_index": "${indexPattern}" } }\n${JSON.stringify(record)}`)
.join('\n') + '\n';
response = await http
.post(CONSOLE_PROXY, {
body: requestBody,
query: {
path: `${dataSource}/_bulk?refresh=wait_for`,
path: `${indexPattern}/_bulk?refresh=wait_for`,
method: 'POST',
},
})
Expand Down
12 changes: 7 additions & 5 deletions public/components/integrations/components/setup_integration.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,12 @@ export function SetupIntegrationFormInputs({
);
}

const makeTableName = (config: IntegrationSetupInputs): string => {
return `${config.connectionDataSource}.default.${config.connectionTableName}`;
};

const prepareQuery = (query: string, config: IntegrationSetupInputs): string => {
let queryStr = query.replaceAll(
'{table_name}',
`${config.connectionDataSource}.default.${config.connectionTableName}`
);
let queryStr = query.replaceAll('{table_name}', makeTableName(config));
queryStr = queryStr.replaceAll('{s3_bucket_location}', config.connectionLocation);
queryStr = queryStr.replaceAll('{s3_checkpoint_location}', config.checkpointLocation);
queryStr = queryStr.replaceAll('{object_name}', config.connectionTableName);
Expand Down Expand Up @@ -516,7 +517,8 @@ const addIntegration = async ({
config.displayName,
`flint_${config.connectionDataSource}_default_${config.connectionTableName}__*`,
config.enabledWorkflows,
setIsInstalling ? true : false
setIsInstalling ? true : false,
{ dataSource: config.connectionDataSource, tableName: makeTableName(config) }
);
if (setIsInstalling) {
setIsInstalling(false, res);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"attributes":{"createdTimeMs":1713289099101,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Top IPs by Request Count","query":"SELECT c_ip, COUNT(*) AS request_count FROM {table_name} GROUP BY c_ip ORDER BY request_count DESC LIMIT 10;","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Top IPs by Request Count","version":1},"id":"1d07d010-fc18-11ee-99c9-43e5dbd0692c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:52:30.414Z","version":"WzI3NTEsMV0="}
{"attributes":{"createdTimeMs":1713293044079,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Top Status by Count","query":"SELECT sc_status, COUNT(*) AS status_count FROM {table_name} GROUP BY sc_status ORDER BY status_count DESC LIMIT 10;","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Top Status by Count","version":1},"id":"4c6b8820-fc21-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T18:44:47.956Z","version":"WzI4MzAsMV0="}
{"attributes":{"createdTimeMs":1713290175184,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Number of Requests","query":"SELECT COUNT(*) AS request_count FROM {table_name};","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Number of Requests","version":1},"id":"9e6a9b40-fc1a-11ee-99c9-43e5dbd0692c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:56:15.220Z","version":"WzI3NTIsMV0="}
{"attributes":{"createdTimeMs":1713293161193,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Total Bytes Served","query":"SELECT SUM(sc_bytes) AS total_bytes_served FROM {table_name};","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Total Bytes Served","version":1},"id":"92398eb0-fc21-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T18:46:01.242Z","version":"WzI4MzEsMV0="}
{"attributes":{"createdTimeMs":1713293269224,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Average Time Taken","query":"SELECT AVG(time_taken) AS average_time_taken FROM {table_name};","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Average Time Taken","version":1},"id":"d2a038a0-fc21-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T18:47:49.290Z","version":"WzI4MzIsMV0="}
{"attributes":{"createdTimeMs":1713293425335,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Slow Requests from Average Time threshold","query":"WITH avg_time AS (SELECT AVG(time_to_first_byte) AS avg_time FROM {table_name}) SELECT * FROM {table_name} CROSS JOIN avg_time WHERE time_to_first_byte > 1 * avg_time LIMIT 10;","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Slow Requests from Average Time threshold","version":1},"id":"2fac4250-fc22-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T18:59:34.785Z","version":"WzI4MzQsMV0="}
{"attributes":{"createdTimeMs":1713294061574,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"Requests by User Agent","query":"SELECT * FROM {table_name} WHERE cs_user_agent LIKE '%Chrome%' LIMIT 10;","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Requests by User Agent","version":1},"id":"aae73c80-fc23-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T19:01:01.640Z","version":"WzI4MzUsMV0="}
{"exportedCount":7,"missingRefCount":0,"missingReferences":[]}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
"extension": "sql",
"type": "query",
"workflows": ["dashboards"]
},
{
"name": "example_queries",
"version": "1.0.0",
"extension": "ndjson",
"type": "savedObjectBundle",
"workflows": ["queries"]
}
],
"sampleData": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"description": "Analyze HAProxy access logs.",
"license": "Apache-2.0",
"type": "logs",
"labels": ["Observability", "Logs"],
"labels": ["Observability", "Logs", "Flint S3"],
"author": "OpenSearch",
"sourceUrl": "https://github.com/opensearch-project/dashboards-observability/tree/main/server/adaptors/integrations/__data__/repository/haproxy/info",
"workflows": [
Expand Down
12 changes: 6 additions & 6 deletions server/adaptors/integrations/__test__/builder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('IntegrationInstanceBuilder', () => {
describe('build', () => {
it('should build an integration instance', async () => {
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};

Expand Down Expand Up @@ -131,7 +131,7 @@ describe('IntegrationInstanceBuilder', () => {

it('should reject with an error if integration is not valid', async () => {
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};
jest
Expand All @@ -143,7 +143,7 @@ describe('IntegrationInstanceBuilder', () => {

it('should reject with an error if getAssets rejects', async () => {
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};

Expand All @@ -160,7 +160,7 @@ describe('IntegrationInstanceBuilder', () => {

it('should reject with an error if postAssets throws an error', async () => {
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};
const remappedAssets = [
Expand Down Expand Up @@ -297,7 +297,7 @@ describe('IntegrationInstanceBuilder', () => {
},
];
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};
const expectedInstance = {
Expand Down Expand Up @@ -333,7 +333,7 @@ describe('IntegrationInstanceBuilder', () => {
},
];
const options = {
dataSource: 'instance-datasource',
indexPattern: 'instance-datasource',
name: 'instance-name',
};

Expand Down
2 changes: 1 addition & 1 deletion server/adaptors/integrations/__test__/manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ describe('IntegrationsKibanaBackend', () => {
expect(mockRepository.getIntegration).toHaveBeenCalledWith(templateName);
expect(instanceBuilder.build).toHaveBeenCalledWith(template, {
name,
dataSource: 'datasource',
indexPattern: 'datasource',
});
expect(mockSavedObjectsClient.create).toHaveBeenCalledWith(
'integration-instance',
Expand Down
6 changes: 4 additions & 2 deletions server/adaptors/integrations/integrations_adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ export interface IntegrationsAdaptor {
loadIntegrationInstance: (
templateName: string,
name: string,
dataSource: string,
workflows?: string[]
indexPattern: string,
workflows?: string[],
dataSource?: string,
tableName?: string
) => Promise<IntegrationInstance>;

deleteIntegrationInstance: (id: string) => Promise<unknown>;
Expand Down
76 changes: 66 additions & 10 deletions server/adaptors/integrations/integrations_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import { deepCheck } from './repository/utils';

interface BuilderOptions {
name: string;
dataSource: string;
indexPattern: string;
workflows?: string[];
dataSource?: string;
tableName?: string;
}

interface SavedObject {
Expand Down Expand Up @@ -42,12 +44,67 @@ export class IntegrationInstanceBuilder {
return Promise.reject(assets.error);
}
const remapped = this.remapIDs(this.getSavedObjectBundles(assets.value, options.workflows));
const withDataSource = this.remapDataSource(remapped, options.dataSource);
const refs = await this.postAssets(withDataSource);
const withDataSource = this.remapDataSource(remapped, options.indexPattern);
const withSubstitutedQueries = this.substituteQueries(
withDataSource,
options.dataSource,
options.tableName
);
const refs = await this.postAssets(withSubstitutedQueries as SavedObjectsBulkCreateObject[]);
const builtInstance = await this.buildInstance(integration, refs, options);
return builtInstance;
}

// If we have a data source or table specified, hunt for saved queries and update them with the
// new DS/table.
substituteQueries(assets: SavedObject[], dataSource?: string, tableName?: string): SavedObject[] {
if (!dataSource) {
return assets;
}

assets = assets.map((asset) => {
if (asset.type === 'observability-search') {
const savedQuery = ((asset.attributes as unknown) as {
savedQuery: {
// The actual SavedSearchAttributes type uses "dataSources", but when exporting it's
// "data_sources". I'm not sure why the discrepancy exists but since that's the exported
// format we need to define our own type here.
data_sources: string;
query: string;
query_lang: string;
};
}).savedQuery;
if (!savedQuery.data_sources) {
return asset;
}
const dataSources = JSON.parse(savedQuery.data_sources) as Array<{
name: string;
type: string;
label: string;
value: string;
}>;
for (const ds of dataSources) {
if (ds.type !== 's3glue') {
continue; // Nothing to do
}
// TODO is there a distinction between these where we should only set one? They're all
// equivalent in every export I've seen.
ds.name = dataSource;
ds.label = dataSource;
ds.value = dataSource;
}
savedQuery.data_sources = JSON.stringify(dataSources);

if (savedQuery.query_lang === 'SQL' && tableName) {
savedQuery.query = savedQuery.query.replaceAll('{table_name}', tableName);
}
}
return asset;
});

return assets;
}

getSavedObjectBundles(
assets: ParsedIntegrationAsset[],
includeWorkflows?: string[]
Expand All @@ -69,18 +126,14 @@ export class IntegrationInstanceBuilder {
.flat() as SavedObject[];
}

remapDataSource(
assets: SavedObject[],
dataSource: string | undefined
): Array<{ type: string; attributes: { title: string } }> {
remapDataSource(assets: SavedObject[], dataSource: string | undefined): SavedObject[] {
if (!dataSource) return assets;
assets = assets.map((asset) => {
return assets.map((asset) => {
if (asset.type === 'index-pattern') {
asset.attributes.title = dataSource;
}
return asset;
});
return assets;
}

remapIDs(assets: SavedObject[]): SavedObject[] {
Expand Down Expand Up @@ -136,7 +189,10 @@ export class IntegrationInstanceBuilder {
return Promise.resolve({
name: options.name,
templateName: config.value.name,
dataSource: options.dataSource,
// Before data sources existed we called the index pattern a data source. Now we need the old
// name for BWC but still use the new data sources in building, so we map the variable only
// for returned output here
dataSource: options.indexPattern,
creationDate: new Date().toISOString(),
assets: refs,
});
Expand Down
10 changes: 7 additions & 3 deletions server/adaptors/integrations/integrations_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@ export class IntegrationsManager implements IntegrationsAdaptor {
loadIntegrationInstance = async (
templateName: string,
name: string,
dataSource: string,
workflows?: string[]
indexPattern: string,
workflows?: string[],
dataSource?: string,
tableName?: string
): Promise<IntegrationInstance> => {
const template = await this.repository.getIntegration(templateName);
if (template === null) {
Expand All @@ -171,8 +173,10 @@ export class IntegrationsManager implements IntegrationsAdaptor {
addRequestToMetric('integrations', 'create', 'count');
const result = await this.instanceBuilder.build(template, {
name,
dataSource,
indexPattern,
workflows,
dataSource,
tableName,
});
const test = await this.client.create('integration-instance', result);
return Promise.resolve({ ...result, id: test.id });
Expand Down
8 changes: 6 additions & 2 deletions server/routes/integrations/integrations_router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ export function registerIntegrationsRoute(router: IRouter) {
}),
body: schema.object({
name: schema.string(),
dataSource: schema.string(),
indexPattern: schema.string(),
workflows: schema.maybe(schema.arrayOf(schema.string())),
dataSource: schema.maybe(schema.string()),
tableName: schema.maybe(schema.string()),
}),
},
},
Expand All @@ -92,8 +94,10 @@ export function registerIntegrationsRoute(router: IRouter) {
return a.loadIntegrationInstance(
request.params.templateName,
request.body.name,
request.body.indexPattern,
request.body.workflows,
request.body.dataSource,
request.body.workflows
request.body.tableName
);
});
}
Expand Down

0 comments on commit b1405f5

Please sign in to comment.