Skip to content

Commit

Permalink
Allow to overwrite storage policy for Iflux write API
Browse files Browse the repository at this point in the history
  • Loading branch information
anuprout committed Feb 9, 2021
1 parent a3853ee commit b79cb86
Showing 1 changed file with 42 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/query/api/v1/handler/influxdb/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,22 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/m3db/m3/src/cmd/services/m3coordinator/ingest"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage/m3/storagemetadata"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/query/util/logging"

imodels "github.com/influxdata/influxdb/models"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/headers"
xhttp "github.com/m3db/m3/src/x/net/http"
xtime "github.com/m3db/m3/src/x/time"
"go.uber.org/zap"
Expand Down Expand Up @@ -293,6 +297,44 @@ func (iwh *ingestWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
return
}
opts := ingest.WriteOptions{}

// **** changes to overwrite the storage policy based on request headers *****

if v := strings.TrimSpace(r.Header.Get(headers.MetricsTypeHeader)); v != "" {
metricsType, err := storagemetadata.ParseMetricsType(v)
if err != nil {
xhttp.WriteError(w, err)
return
}

opts.DownsampleOverride = true
opts.DownsampleMappingRules = nil

strPolicy := strings.TrimSpace(r.Header.Get(headers.MetricsStoragePolicyHeader))
switch metricsType {
case storagemetadata.UnaggregatedMetricsType:
if strPolicy != "" {
xhttp.WriteError(w, errors.New("storage policy should not be set for unaggregated metrics"))
return
}
default:
parsed, err := policy.ParseStoragePolicy(strPolicy)
if err != nil {
err = fmt.Errorf("could not parse storage policy: %v", err)
xhttp.WriteError(w, err)
return
}

// Make sure this specific storage policy is used for the writes.
opts.WriteOverride = true
opts.WriteStoragePolicies = policy.StoragePolicies{
parsed,
}
}
}

// ********************************************************************************

iter := &ingestIterator{points: points, tagOpts: iwh.tagOpts, promRewriter: iwh.promRewriter}
batchErr := iwh.handlerOpts.DownsamplerAndWriter().WriteBatch(r.Context(), iter, opts)
if batchErr == nil {
Expand Down

0 comments on commit b79cb86

Please sign in to comment.