From 3ebc33cd32d74e4d8b12a452a06e08803b020fac Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Tue, 17 Nov 2020 16:04:50 -0500 Subject: [PATCH] fix(http/legacy): filter on Default=true when no RP given in V1 write (#20074) --- CHANGELOG.md | 1 + http/legacy/write_handler.go | 3 + http/legacy/write_handler_test.go | 95 +++++++++++++++++++++++++++++-- 3 files changed, 95 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 87d4ef9ffca..2e029e656c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ 1. [20047](https://github.com/influxdata/influxdb/pull/20047): Allow scraper to ignore insecure certificates on a target. Thanks @cmackenzie1! 1. [20064](https://github.com/influxdata/influxdb/pull/20064): Ensure Flux reads across all shards. 1. [20076](https://github.com/influxdata/influxdb/pull/20076): Remove internal `influxd upgrade` subcommands from help text. +1. [20074](https://github.com/influxdata/influxdb/pull/20074): Use default DBRP mapping on V1 write when no RP is specified. ## v2.0.1 [2020-11-10] diff --git a/http/legacy/write_handler.go b/http/legacy/write_handler.go index 77270d18eb7..7951c96e02e 100644 --- a/http/legacy/write_handler.go +++ b/http/legacy/write_handler.go @@ -196,6 +196,9 @@ func (h *WriteHandler) findMapping(ctx context.Context, orgID influxdb.ID, db, r } if rp != "" { filter.RetentionPolicy = &rp + } else { + b := true // Can't get a direct pointer to `true`... + filter.Default = &b } mappings, count, err := h.DBRPMappingService.FindMany(ctx, filter) diff --git a/http/legacy/write_handler_test.go b/http/legacy/write_handler_test.go index fb11b8fffd9..51c73d89aa1 100644 --- a/http/legacy/write_handler_test.go +++ b/http/legacy/write_handler_test.go @@ -25,7 +25,7 @@ import ( var generator = snowflake.NewDefaultIDGenerator() -func TestWriteHandler_BucketAndMappingExists(t *testing.T) { +func TestWriteHandler_BucketAndMappingExistsDefaultRP(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -50,6 +50,7 @@ func TestWriteHandler_BucketAndMappingExists(t *testing.T) { BucketID: bucket.ID, Database: "mydb", RetentionPolicy: "autogen", + Default: true, } lineProtocolBody = "m,t1=v1 f1=2 100" @@ -60,6 +61,7 @@ func TestWriteHandler_BucketAndMappingExists(t *testing.T) { FindMany(gomock.Any(), influxdb.DBRPMappingFilterV2{ OrgID: &mapping.OrganizationID, Database: &mapping.Database, + Default: &mapping.Default, }).Return([]*influxdb.DBRPMappingV2{mapping}, 1, nil) findBucketByID := bucketService. @@ -104,6 +106,87 @@ func TestWriteHandler_BucketAndMappingExists(t *testing.T) { assert.Equal(t, "", w.Body.String()) } +func TestWriteHandler_BucketAndMappingExistsSpecificRP(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + // Mocked Services + eventRecorder = mocks.NewMockEventRecorder(ctrl) + dbrpMappingSvc = mocks.NewMockDBRPMappingServiceV2(ctrl) + bucketService = mocks.NewMockBucketService(ctrl) + pointsWriter = mocks.NewMockPointsWriter(ctrl) + + // Found Resources + orgID = generator.ID() + bucket = &influxdb.Bucket{ + ID: generator.ID(), + OrgID: orgID, + Name: "mydb/autogen", + RetentionPolicyName: "autogen", + RetentionPeriod: 72 * time.Hour, + } + mapping = &influxdb.DBRPMappingV2{ + OrganizationID: orgID, + BucketID: bucket.ID, + Database: "mydb", + RetentionPolicy: "autogen", + Default: true, + } + + lineProtocolBody = "m,t1=v1 f1=2 100" + ) + + findAutogenMapping := dbrpMappingSvc. + EXPECT(). + FindMany(gomock.Any(), influxdb.DBRPMappingFilterV2{ + OrgID: &mapping.OrganizationID, + Database: &mapping.Database, + RetentionPolicy: &mapping.RetentionPolicy, + }).Return([]*influxdb.DBRPMappingV2{mapping}, 1, nil) + + findBucketByID := bucketService. + EXPECT(). + FindBucketByID(gomock.Any(), bucket.ID).Return(bucket, nil) + + points := parseLineProtocol(t, lineProtocolBody) + writePoints := pointsWriter. + EXPECT(). + WritePoints(gomock.Any(), orgID, bucket.ID, pointsMatcher{points}).Return(nil) + + recordWriteEvent := eventRecorder.EXPECT(). + Record(gomock.Any(), gomock.Any()) + + gomock.InOrder( + findAutogenMapping, + findBucketByID, + writePoints, + recordWriteEvent, + ) + + perms := newPermissions(influxdb.WriteAction, influxdb.BucketsResourceType, &orgID, nil) + auth := newAuthorization(orgID, perms...) + ctx := pcontext.SetAuthorizer(context.Background(), auth) + r := newWriteRequest(ctx, lineProtocolBody) + params := r.URL.Query() + params.Set("db", "mydb") + params.Set("rp", "autogen") + r.URL.RawQuery = params.Encode() + + handler := NewWriterHandler(&PointsWriterBackend{ + HTTPErrorHandler: DefaultErrorHandler, + Logger: zaptest.NewLogger(t), + BucketService: bucketService, + DBRPMappingService: dbrp.NewAuthorizedService(dbrpMappingSvc), + PointsWriter: pointsWriter, + EventRecorder: eventRecorder, + }) + w := httptest.NewRecorder() + handler.ServeHTTP(w, r) + assert.Equal(t, http.StatusNoContent, w.Code) + assert.Equal(t, "", w.Body.String()) +} + func TestWriteHandler_BucketAndMappingExistsNoPermissions(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -129,6 +212,7 @@ func TestWriteHandler_BucketAndMappingExistsNoPermissions(t *testing.T) { BucketID: bucket.ID, Database: "mydb", RetentionPolicy: "autogen", + Default: true, } lineProtocolBody = "m,t1=v1 f1=2 100" @@ -139,6 +223,7 @@ func TestWriteHandler_BucketAndMappingExistsNoPermissions(t *testing.T) { FindMany(gomock.Any(), influxdb.DBRPMappingFilterV2{ OrgID: &mapping.OrganizationID, Database: &mapping.Database, + Default: &mapping.Default, }).Return([]*influxdb.DBRPMappingV2{mapping}, 1, nil) findBucketByID := bucketService. @@ -205,13 +290,15 @@ func TestWriteHandler_MappingNotExists(t *testing.T) { } lineProtocolBody = "m,t1=v1 f1=2 100" + badRp = "foo" ) findAutogenMapping := dbrpMappingSvc. EXPECT(). FindMany(gomock.Any(), influxdb.DBRPMappingFilterV2{ - OrgID: &mapping.OrganizationID, - Database: &mapping.Database, + OrgID: &mapping.OrganizationID, + Database: &mapping.Database, + RetentionPolicy: &badRp, }).Return(nil, 0, dbrp.ErrDBRPNotFound) recordWriteEvent := eventRecorder.EXPECT(). @@ -228,7 +315,7 @@ func TestWriteHandler_MappingNotExists(t *testing.T) { r := newWriteRequest(ctx, lineProtocolBody) params := r.URL.Query() params.Set("db", "mydb") - params.Set("rp", "") + params.Set("rp", badRp) r.URL.RawQuery = params.Encode() handler := NewWriterHandler(&PointsWriterBackend{