From fb55cfe748b8442beeb62b026ba0d0a7fddb1320 Mon Sep 17 00:00:00 2001 From: Christopher Henn Date: Wed, 31 Jul 2019 13:31:20 -0700 Subject: [PATCH] refactor(ui): support receiving limited Flux responses Adds support for fetching a Flux response with a limit, and returning a partial response instead of an error if that limit is exceeded. The logic is implemented here instead of in influxdb-client-js as part of #14482. --- ui/src/dashboards/resources.ts | 43 ------- ui/src/dashboards/utils/sources.ts | 12 -- .../components/verifyStep/DataListening.tsx | 5 +- ui/src/shared/apis/query.ts | 112 ++++++++++++++++-- ui/src/shared/components/TimeSeries.tsx | 8 +- ui/src/shared/constants/index.ts | 4 + ui/src/timeMachine/actions/queries.ts | 8 +- ui/src/timeMachine/apis/queryBuilder.ts | 6 +- ui/src/types/queries.ts | 95 +-------------- ui/src/variables/utils/ValueFetcher.ts | 2 +- 10 files changed, 128 insertions(+), 167 deletions(-) delete mode 100644 ui/src/dashboards/utils/sources.ts diff --git a/ui/src/dashboards/resources.ts b/ui/src/dashboards/resources.ts index d0655e9a244..862470b6926 100644 --- a/ui/src/dashboards/resources.ts +++ b/ui/src/dashboards/resources.ts @@ -10,7 +10,6 @@ import { SourceAuthenticationMethod, SourceLinks, TimeRange, - QueryConfig, TableOptions, } from 'src/types' @@ -83,48 +82,6 @@ export const service: Service = { }, } -export const queryConfig: QueryConfig = { - database: 'telegraf', - measurement: 'cpu', - retentionPolicy: 'autogen', - fields: [ - { - value: 'mean', - type: 'func', - alias: 'mean_usage_idle', - args: [ - { - value: 'usage_idle', - type: 'field', - alias: '', - }, - ], - }, - { - value: 'mean', - type: 'func', - alias: 'mean_usage_user', - args: [ - { - value: 'usage_user', - type: 'field', - alias: '', - }, - ], - }, - ], - tags: {}, - groupBy: { - time: 'auto', - tags: [], - }, - areTagsAccepted: false, - fill: 'null', - rawText: null, - range: null, - shifts: null, -} - export const axes: Axes = { x: { bounds: ['', ''], diff --git a/ui/src/dashboards/utils/sources.ts b/ui/src/dashboards/utils/sources.ts deleted file mode 100644 index 3b5053680a3..00000000000 --- a/ui/src/dashboards/utils/sources.ts +++ /dev/null @@ -1,12 +0,0 @@ -import {QueryConfig, Source} from 'src/types' - -export const nextSource = ( - prevQuery: QueryConfig, - nextQuery: QueryConfig -): Source => { - if (nextQuery.source) { - return nextQuery.source - } - - return prevQuery.source -} diff --git a/ui/src/dataLoaders/components/verifyStep/DataListening.tsx b/ui/src/dataLoaders/components/verifyStep/DataListening.tsx index 925effd0c8b..191cce04014 100644 --- a/ui/src/dataLoaders/components/verifyStep/DataListening.tsx +++ b/ui/src/dataLoaders/components/verifyStep/DataListening.tsx @@ -122,7 +122,10 @@ class DataListening extends PureComponent { let timePassed: number try { - const response = await runQuery(orgID, script).promise + const response = await runQuery(orgID, script).promise.then( + ({csv}) => csv + ) + responseLength = response.length timePassed = Number(new Date()) - this.startTime } catch (err) { diff --git a/ui/src/shared/apis/query.ts b/ui/src/shared/apis/query.ts index 97bf7f636d1..ec88bf66d8c 100644 --- a/ui/src/shared/apis/query.ts +++ b/ui/src/shared/apis/query.ts @@ -1,19 +1,117 @@ -// APIs -import {client} from 'src/utils/api' +// Constants +import {FLUX_RESPONSE_BYTES_LIMIT} from 'src/shared/constants' // Types import {CancelBox} from 'src/types/promises' -import {File} from '@influxdata/influx' +import {File, Query, CancellationError} from 'src/types' -const MAX_RESPONSE_CHARS = 50000 * 160 +export interface RunQueryResult { + csv: string + didTruncate: boolean + bytesRead: number +} export const runQuery = ( orgID: string, query: string, extern?: File -): CancelBox => { - return client.queries.execute(orgID, query, { +): CancelBox => { + const url = `/api/v2/query?${new URLSearchParams({orgID})}` + + const headers = { + 'Content-Type': 'application/json', + 'Accept-Encoding': 'gzip', + } + + const body: Query = { + query, extern, - limitChars: MAX_RESPONSE_CHARS, + dialect: {annotations: ['group', 'datatype', 'default']}, + } + + const controller = new AbortController() + + const request = fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(body), + signal: controller.signal, }) + + const promise = request + .then(processResponse) + .catch(e => + e.name === 'AbortError' + ? Promise.reject(new CancellationError()) + : Promise.reject(e) + ) + + return { + promise, + cancel: () => controller.abort(), + } +} + +const processResponse = async (response: Response): Promise => { + const reader = response.body.getReader() + const decoder = new TextDecoder() + + let csv = '' + let bytesRead = 0 + let didTruncate = false + + let read = await reader.read() + + while (!read.done) { + const text = decoder.decode(read.value) + + bytesRead += read.value.byteLength + + if (bytesRead > FLUX_RESPONSE_BYTES_LIMIT) { + csv += trimPartialLines(text) + didTruncate = true + break + } else { + csv += text + read = await reader.read() + } + } + + reader.cancel() + + return { + csv, + bytesRead, + didTruncate, + } +} + +/* + Given an arbitrary text chunk of a Flux CSV, trim partial lines off of the end + of the text. + + For example, given the following partial Flux response, + + r,baz,3 + foo,bar,baz,2 + foo,bar,b + + we want to trim the last incomplete line, so that the result is + + r,baz,3 + foo,bar,baz,2 + +*/ +const trimPartialLines = (partialResp: string): string => { + let i = partialResp.length - 1 + + while (partialResp[i] !== '\n') { + if (i <= 0) { + return partialResp + } + + i -= 1 + } + + return partialResp.slice(0, i + 1) } diff --git a/ui/src/shared/components/TimeSeries.tsx b/ui/src/shared/components/TimeSeries.tsx index 3cbec982be7..3f65749f8b2 100644 --- a/ui/src/shared/components/TimeSeries.tsx +++ b/ui/src/shared/components/TimeSeries.tsx @@ -6,7 +6,7 @@ import {withRouter, WithRouterProps} from 'react-router' import {fromFlux, FromFluxResult} from '@influxdata/giraffe' // API -import {runQuery} from 'src/shared/apis/query' +import {runQuery, RunQueryResult} from 'src/shared/apis/query' // Utils import {checkQueryResult} from 'src/shared/utils/checkQueryResult' @@ -80,7 +80,7 @@ class TimeSeries extends Component { public state: State = defaultState() - private pendingResults: Array> = [] + private pendingResults: Array> = [] public async componentDidMount() { this.reload() @@ -143,7 +143,9 @@ class TimeSeries extends Component { }) // Wait for new queries to complete - const files = await Promise.all(this.pendingResults.map(r => r.promise)) + const files = await Promise.all( + this.pendingResults.map(r => r.promise.then(({csv}) => csv)) + ) const duration = Date.now() - startTime const giraffeResult = fromFlux(files.join('\n\n')) diff --git a/ui/src/shared/constants/index.ts b/ui/src/shared/constants/index.ts index 1bee8eaaf2d..b96e8e5885f 100644 --- a/ui/src/shared/constants/index.ts +++ b/ui/src/shared/constants/index.ts @@ -50,6 +50,10 @@ export const CLOUD_URL = process.env.CLOUD_URL export const CLOUD_CHECKOUT_PATH = process.env.CLOUD_CHECKOUT_PATH export const CLOUD_BILLING_PATH = process.env.CLOUD_BILLING_PATH +export const FLUX_RESPONSE_BYTES_LIMIT = CLOUD + ? 10 * 1024 * 1024 + : 100 * 1024 * 1024 + export const VIS_SIG_DIGITS = 4 export const VIS_THEME: Partial = { diff --git a/ui/src/timeMachine/actions/queries.ts b/ui/src/timeMachine/actions/queries.ts index 1c862406eb9..3fca55e8d1b 100644 --- a/ui/src/timeMachine/actions/queries.ts +++ b/ui/src/timeMachine/actions/queries.ts @@ -1,7 +1,7 @@ import {get} from 'lodash' // API -import {runQuery} from 'src/shared/apis/query' +import {runQuery, RunQueryResult} from 'src/shared/apis/query' // Actions import {refreshVariableValues, selectValue} from 'src/variables/actions' @@ -84,7 +84,7 @@ export const refreshTimeMachineVariableValues = () => async ( await dispatch(refreshVariableValues(contextID, variablesToRefresh)) } -let pendingResults: Array> = [] +let pendingResults: Array> = [] export const executeQueries = () => async (dispatch, getState: GetState) => { const {view, timeRange} = getActiveTimeMachine(getState()) @@ -116,7 +116,9 @@ export const executeQueries = () => async (dispatch, getState: GetState) => { return runQuery(orgID, text, extern) }) - const files = await Promise.all(pendingResults.map(r => r.promise)) + const files = await Promise.all( + pendingResults.map(r => r.promise.then(({csv}) => csv)) + ) const duration = Date.now() - startTime diff --git a/ui/src/timeMachine/apis/queryBuilder.ts b/ui/src/timeMachine/apis/queryBuilder.ts index dff0b174ee0..595c49258ab 100644 --- a/ui/src/timeMachine/apis/queryBuilder.ts +++ b/ui/src/timeMachine/apis/queryBuilder.ts @@ -31,7 +31,7 @@ export function findBuckets({orgID}: FindBucketsOptions): CancelableQuery { const {promise, cancel} = runQuery(orgID, query) return { - promise: promise.then(resp => extractCol(resp, 'name')), + promise: promise.then(({csv}) => extractCol(csv, 'name')), cancel, } } @@ -74,7 +74,7 @@ export function findKeys({ const {promise, cancel} = runQuery(orgID, query) return { - promise: promise.then(resp => extractCol(resp, '_value')), + promise: promise.then(({csv}) => extractCol(csv, '_value')), cancel, } } @@ -117,7 +117,7 @@ export function findValues({ const {promise, cancel} = runQuery(orgID, query) return { - promise: promise.then(resp => extractCol(resp, '_value')), + promise: promise.then(({csv}) => extractCol(csv, '_value')), cancel, } } diff --git a/ui/src/types/queries.ts b/ui/src/types/queries.ts index 1c5bb70965d..815d45a2e6b 100644 --- a/ui/src/types/queries.ts +++ b/ui/src/types/queries.ts @@ -1,86 +1,4 @@ -import {Source} from 'src/types' - -export interface Query { - text: string - id: string - queryConfig: QueryConfig -} - -export interface QueryConfig { - id?: string - database?: string - measurement?: string - retentionPolicy?: string - fields?: Field[] - tags: Tags - groupBy?: GroupBy - areTagsAccepted: boolean - rawText?: string - range?: DurationRange | null - source?: Source | null // doesn't come from server -- is set in CellEditorOverlay - fill?: string - status?: Status - shifts?: TimeShift[] - lower?: string - upper?: string - isQuerySupportedByExplorer?: boolean // doesn't come from server -- is set in CellEditorOverlay -} - -export interface Field { - value: string - type: string - alias?: string - args?: FieldArg[] -} - -export interface FieldArg { - value: string - type: string - alias?: string - args?: FieldArg[] -} - -export interface FieldFunc extends Field { - args: FuncArg[] -} -export interface FuncArg { - type: string - value: string - alias?: string -} - -export interface ApplyFuncsToFieldArgs { - field: Field - funcs: FuncArg[] -} - -export interface Tag { - key: string - value: string -} - -export type TagValues = string[] - -export interface Tags { - [key: string]: TagValues -} - -export interface GroupBy { - time?: string | null - tags?: string[] -} - -export interface Namespace { - database: string - retentionPolicy: string -} - -export interface Status { - loading?: boolean - error?: string - warn?: string - success?: string -} +export {Query, Dialect} from 'src/client' export interface TimeRange { lower: string @@ -90,14 +8,3 @@ export interface TimeRange { label?: string duration?: string } - -export interface DurationRange { - lower: string - upper?: string -} - -export interface TimeShift { - label: string - unit: string - quantity: string -} diff --git a/ui/src/variables/utils/ValueFetcher.ts b/ui/src/variables/utils/ValueFetcher.ts index b44947be155..73a1bc14607 100644 --- a/ui/src/variables/utils/ValueFetcher.ts +++ b/ui/src/variables/utils/ValueFetcher.ts @@ -89,7 +89,7 @@ export class DefaultValueFetcher implements ValueFetcher { const extern = buildVarsOption(variables) const request = runQuery(orgID, query, extern) - const promise = request.promise.then(csv => { + const promise = request.promise.then(({csv}) => { const values = extractValues(csv, prevSelection, defaultSelection) this.cache[key] = values