From b79cb869a61ecb9762a4f47ff97e2acfa54c4acf Mon Sep 17 00:00:00 2001 From: Anup Rout Date: Tue, 9 Feb 2021 18:13:16 -0500 Subject: [PATCH] Allow to overwrite storage policy for Iflux write API --- src/query/api/v1/handler/influxdb/write.go | 42 ++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/query/api/v1/handler/influxdb/write.go b/src/query/api/v1/handler/influxdb/write.go index dbced1317b..da1f77073c 100644 --- a/src/query/api/v1/handler/influxdb/write.go +++ b/src/query/api/v1/handler/influxdb/write.go @@ -26,18 +26,22 @@ import ( "fmt" "io/ioutil" "net/http" + "strings" "time" "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/storage/m3/storagemetadata" "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/query/util/logging" imodels "github.com/influxdata/influxdb/models" xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/headers" xhttp "github.com/m3db/m3/src/x/net/http" xtime "github.com/m3db/m3/src/x/time" "go.uber.org/zap" @@ -293,6 +297,44 @@ func (iwh *ingestWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) return } opts := ingest.WriteOptions{} + + // **** changes to overwrite the storage policy based on request headers ***** + + if v := strings.TrimSpace(r.Header.Get(headers.MetricsTypeHeader)); v != "" { + metricsType, err := storagemetadata.ParseMetricsType(v) + if err != nil { + xhttp.WriteError(w, err) + return + } + + opts.DownsampleOverride = true + opts.DownsampleMappingRules = nil + + strPolicy := strings.TrimSpace(r.Header.Get(headers.MetricsStoragePolicyHeader)) + switch metricsType { + case storagemetadata.UnaggregatedMetricsType: + if strPolicy != "" { + xhttp.WriteError(w, errors.New("storage policy should not be set for unaggregated metrics")) + return + } + default: + parsed, err := policy.ParseStoragePolicy(strPolicy) + if err != nil { + err = fmt.Errorf("could not parse storage policy: %v", err) + xhttp.WriteError(w, err) + return + } + + // Make sure this specific storage policy is used for the writes. + opts.WriteOverride = true + opts.WriteStoragePolicies = policy.StoragePolicies{ + parsed, + } + } + } + + // ******************************************************************************** + iter := &ingestIterator{points: points, tagOpts: iwh.tagOpts, promRewriter: iwh.promRewriter} batchErr := iwh.handlerOpts.DownsamplerAndWriter().WriteBatch(r.Context(), iter, opts) if batchErr == nil {