From 8099421eaa363bc7b8d86790a5de5010e16e51da Mon Sep 17 00:00:00 2001 From: Ginny Guan Date: Thu, 3 Mar 2022 16:26:15 +0800 Subject: [PATCH] feat(data): Make MaxEventSize a service configuration setting 25000KB (25MB) by default close #3237 Signed-off-by: Ginny Guan --- cmd/core-data/res/configuration.toml | 2 + internal/core/data/config/config.go | 1 + internal/core/data/controller/http/event.go | 22 +++++-- .../core/data/controller/http/event_test.go | 66 ++++++++++++++++++- .../messaging}/subscriber.go | 14 +++- internal/core/data/init.go | 4 +- internal/io/event.go | 28 -------- internal/pkg/utils/payload.go | 22 +++++++ internal/pkg/utils/payload_test.go | 35 ++++++++++ 9 files changed, 154 insertions(+), 40 deletions(-) rename internal/core/data/{application => controller/messaging}/subscriber.go (87%) delete mode 100644 internal/io/event.go create mode 100644 internal/pkg/utils/payload.go create mode 100644 internal/pkg/utils/payload_test.go diff --git a/cmd/core-data/res/configuration.toml b/cmd/core-data/res/configuration.toml index 4d809a031a..7e7f7bf91f 100644 --- a/cmd/core-data/res/configuration.toml +++ b/cmd/core-data/res/configuration.toml @@ -1,3 +1,5 @@ +MaxEventSize = 25000 # Defines the maximum event size in kilobytes + [Writable] PersistData = true LogLevel = "INFO" diff --git a/internal/core/data/config/config.go b/internal/core/data/config/config.go index 1694ca2467..453eec7934 100644 --- a/internal/core/data/config/config.go +++ b/internal/core/data/config/config.go @@ -26,6 +26,7 @@ type ConfigurationStruct struct { Registry bootstrapConfig.RegistryInfo Service bootstrapConfig.ServiceInfo SecretStore bootstrapConfig.SecretStoreInfo + MaxEventSize int64 } type WritableInfo struct { diff --git a/internal/core/data/controller/http/event.go b/internal/core/data/controller/http/event.go index 9394dff1e6..a41c912cc9 100644 --- a/internal/core/data/controller/http/event.go +++ b/internal/core/data/controller/http/event.go @@ -2,6 +2,7 @@ package http import ( "bytes" + "io" "math" "net/http" "strconv" @@ -10,7 +11,7 @@ import ( "github.com/edgexfoundry/edgex-go/internal/core/data/application" dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" - "github.com/edgexfoundry/edgex-go/internal/io" + edgexIO "github.com/edgexfoundry/edgex-go/internal/io" "github.com/edgexfoundry/edgex-go/internal/pkg" "github.com/edgexfoundry/edgex-go/internal/pkg/utils" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" @@ -25,7 +26,7 @@ import ( ) type EventController struct { - readers map[string]io.DtoReader + readers map[string]edgexIO.DtoReader mux sync.RWMutex dic *di.Container } @@ -33,12 +34,12 @@ type EventController struct { // NewEventController creates and initializes an EventController func NewEventController(dic *di.Container) *EventController { return &EventController{ - readers: make(map[string]io.DtoReader), + readers: make(map[string]edgexIO.DtoReader), dic: dic, } } -func (ec *EventController) getReader(r *http.Request) io.DtoReader { +func (ec *EventController) getReader(r *http.Request) edgexIO.DtoReader { contentType := strings.ToLower(r.Header.Get(common.ContentType)) ec.mux.RLock() reader, ok := ec.readers[contentType] @@ -49,7 +50,7 @@ func (ec *EventController) getReader(r *http.Request) io.DtoReader { ec.mux.Lock() defer ec.mux.Unlock() - reader = io.NewDtoReader(contentType) + reader = edgexIO.NewDtoReader(contentType) ec.readers[contentType] = reader return reader } @@ -63,6 +64,7 @@ func (ec *EventController) AddEvent(w http.ResponseWriter, r *http.Request) { lc := container.LoggingClientFrom(ec.dic.Get) ctx := r.Context() + config := dataContainer.ConfigurationFrom(ec.dic.Get) // URL parameters vars := mux.Vars(r) @@ -71,8 +73,16 @@ func (ec *EventController) AddEvent(w http.ResponseWriter, r *http.Request) { sourceName := vars[common.SourceName] var addEventReqDTO requestDTO.AddEventRequest + var err errors.EdgeX + + dataBytes, readErr := io.ReadAll(r.Body) + if readErr == nil { + err = utils.CheckPayloadSize(dataBytes, config.MaxEventSize*1000) + } + if readErr != nil { + err = errors.NewCommonEdgeX(errors.KindIOError, "AddEventRequest I/O reading failed", nil) + } - dataBytes, err := io.ReadAddEventRequestInBytes(r.Body) if err == nil { // Per https://github.com/edgexfoundry/edgex-go/pull/3202#discussion_r587618347 // V2 shall asynchronously publish initially encoded payload (not re-encoding) to message bus diff --git a/internal/core/data/controller/http/event_test.go b/internal/core/data/controller/http/event_test.go index 97875c03ba..e9f3187309 100644 --- a/internal/core/data/controller/http/event_test.go +++ b/internal/core/data/controller/http/event_test.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2020-2021 IOTech Ltd +// Copyright (C) 2020-2022 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 @@ -266,6 +266,70 @@ func TestAddEvent(t *testing.T) { } } +func TestAddEventSize(t *testing.T) { + + dbClientMock := &dbMock.DBClient{} + + validRequest := testAddEvent + + tests := []struct { + Name string + MaxEventSize int64 + RequestContentType string + errorExpected bool + ExpectedStatusCode int + }{ + {"Valid - AddEventRequest JSON with valid event size", 25000, common.ContentTypeJSON, false, http.StatusCreated}, + {"Valid - AddEventRequest JSON with valid event size", 0, common.ContentTypeJSON, false, http.StatusCreated}, + {"Invalid - AddEventRequest JSON with negative evnet size", -1, common.ContentTypeJSON, true, http.StatusBadRequest}, + } + + for _, testCase := range tests { + t.Run(testCase.Name, func(t *testing.T) { + dic := mocks.NewMockDIC() + dic.Update(di.ServiceConstructorMap{ + container.ConfigurationName: func(get di.Get) interface{} { + return &config.ConfigurationStruct{ + MaxEventSize: testCase.MaxEventSize, + Writable: config.WritableInfo{ + PersistData: false, + }, + } + }, + container.DBClientInterfaceName: func(get di.Get) interface{} { + return dbClientMock + }, + }) + ec := NewEventController(dic) + byteData, err := toByteArray(testCase.RequestContentType, validRequest) + require.NoError(t, err) + + reader := strings.NewReader(string(byteData)) + req, err := http.NewRequest(http.MethodPost, common.ApiEventProfileNameDeviceNameSourceNameRoute, reader) + req.Header.Set(common.ContentType, testCase.RequestContentType) + req = mux.SetURLVars(req, map[string]string{common.ProfileName: validRequest.Event.ProfileName, common.DeviceName: validRequest.Event.DeviceName, common.SourceName: validRequest.Event.SourceName}) + require.NoError(t, err) + + recorder := httptest.NewRecorder() + handler := http.HandlerFunc(ec.AddEvent) + handler.ServeHTTP(recorder, req) + + var actualResponse commonDTO.BaseWithIdResponse + err = json.Unmarshal(recorder.Body.Bytes(), &actualResponse) + + if testCase.errorExpected { + assert.Equal(t, testCase.ExpectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected") + return + } + require.NoError(t, err) + assert.Equal(t, common.ApiVersion, actualResponse.ApiVersion, "API Version not as expected") + assert.Equal(t, testCase.ExpectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected") + assert.Equal(t, testCase.ExpectedStatusCode, int(actualResponse.StatusCode), "BaseResponse status code not as expected") + assert.Empty(t, actualResponse.Message, "Message should be empty when it is successful") + }) + } +} + func TestEventById(t *testing.T) { validEventId := expectedEventId emptyEventId := "" diff --git a/internal/core/data/application/subscriber.go b/internal/core/data/controller/messaging/subscriber.go similarity index 87% rename from internal/core/data/application/subscriber.go rename to internal/core/data/controller/messaging/subscriber.go index dfdf52f903..01973f993a 100644 --- a/internal/core/data/application/subscriber.go +++ b/internal/core/data/controller/messaging/subscriber.go @@ -1,9 +1,9 @@ // -// Copyright (C) 2021 IOTech Ltd +// Copyright (C) 2021-2022 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 -package application +package messaging import ( "context" @@ -11,7 +11,9 @@ import ( "fmt" "strings" + "github.com/edgexfoundry/edgex-go/internal/core/data/application" dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" + "github.com/edgexfoundry/edgex-go/internal/pkg/utils" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/di" @@ -57,6 +59,12 @@ func SubscribeEvents(ctx context.Context, dic *di.Container) errors.EdgeX { case msgEnvelope := <-messages: lc.Debugf("Event received on message queue. Topic: %s, Correlation-id: %s ", messageBusInfo.SubscribeTopic, msgEnvelope.CorrelationID) event := &requests.AddEventRequest{} + maxEventSize := dataContainer.ConfigurationFrom(dic.Get).MaxEventSize + edgeXerr := utils.CheckPayloadSize(msgEnvelope.Payload, maxEventSize*1000) + if edgeXerr != nil { + lc.Errorf("event size exceed MaxEventSize(%d KB)", maxEventSize) + break + } err = unmarshalPayload(msgEnvelope, event) if err != nil { lc.Errorf("fail to unmarshal event, %v", err) @@ -67,7 +75,7 @@ func SubscribeEvents(ctx context.Context, dic *di.Container) errors.EdgeX { lc.Error(err.Error()) break } - err = AddEvent(requests.AddEventReqToEventModel(*event), ctx, dic) + err = application.AddEvent(requests.AddEventReqToEventModel(*event), ctx, dic) if err != nil { lc.Errorf("fail to persist the event, %v", err) } diff --git a/internal/core/data/init.go b/internal/core/data/init.go index 3f84479fd6..6e34a429fe 100644 --- a/internal/core/data/init.go +++ b/internal/core/data/init.go @@ -19,8 +19,8 @@ import ( "context" "sync" - "github.com/edgexfoundry/edgex-go/internal/core/data/application" dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" + "github.com/edgexfoundry/edgex-go/internal/core/data/controller/messaging" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/startup" @@ -51,7 +51,7 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, st lc := container.LoggingClientFrom(dic.Get) if configuration.MessageQueue.SubscribeEnabled { - err := application.SubscribeEvents(ctx, dic) + err := messaging.SubscribeEvents(ctx, dic) if err != nil { lc.Errorf("Failed to subscribe events from message bus, %v", err) return false diff --git a/internal/io/event.go b/internal/io/event.go deleted file mode 100644 index 2c40b9c192..0000000000 --- a/internal/io/event.go +++ /dev/null @@ -1,28 +0,0 @@ -// -// Copyright (C) 2021 IOTech Ltd -// -// SPDX-License-Identifier: Apache-2.0 - -package io - -import ( - "io" - - "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" -) - -// To avoid large data causing unexpected memory exhaustion when decoding CBOR payload, defaultMaxEventSize was introduced as -// a reasonable limit appropriate for handling CBOR payload in edgex-go. More details could be found at -// https://github.com/edgexfoundry/edgex-go/issues/2439 -// TODO Make MaxEventSize a service configuration setting, so that users could adjust the limit per systems requirements -// https://github.com/edgexfoundry/edgex-go/issues/3237 -const defaultMaxEventSize = int64(25 * 1e6) // 25 MB - -func ReadAddEventRequestInBytes(reader io.Reader) ([]byte, errors.EdgeX) { - // use LimitReader with defaultMaxEventSize to avoid unexpected memory exhaustion - bytes, err := io.ReadAll(io.LimitReader(reader, defaultMaxEventSize)) - if err != nil { - return nil, errors.NewCommonEdgeX(errors.KindIOError, "AddEventRequest I/O reading failed", err) - } - return bytes, nil -} diff --git a/internal/pkg/utils/payload.go b/internal/pkg/utils/payload.go new file mode 100644 index 0000000000..653071baca --- /dev/null +++ b/internal/pkg/utils/payload.go @@ -0,0 +1,22 @@ +// +// Copyright (C) 2021-2022 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package utils + +import ( + "fmt" + + "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" +) + +func CheckPayloadSize(payload []byte, sizeLimit int64) errors.EdgeX { + // Treat 0 as unlimit size + if sizeLimit < 0 { + return errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("sizeLimit cannot be lower than 0, current sizeLimit: %d", sizeLimit), nil) + } else if sizeLimit != 0 && int64(len(payload)) > sizeLimit { + return errors.NewCommonEdgeX(errors.KindLimitExceeded, fmt.Sprintf("request size exceed %d KB", sizeLimit), nil) + } + return nil +} diff --git a/internal/pkg/utils/payload_test.go b/internal/pkg/utils/payload_test.go new file mode 100644 index 0000000000..a2c08b9c7d --- /dev/null +++ b/internal/pkg/utils/payload_test.go @@ -0,0 +1,35 @@ +package utils + +import ( + "testing" + + "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCheckPayloadSize(t *testing.T) { + payload := make([]byte, 10) + tests := []struct { + name string + sizeLimit int64 + errorExpected bool + expectedErrKind errors.ErrKind + }{ + {"Valid Size", int64(len(payload)), false, ""}, + {"Invalid Size", int64(len(payload) - 1), true, errors.KindLimitExceeded}, + } + + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + err := CheckPayloadSize(payload, testCase.sizeLimit) + if testCase.errorExpected { + require.Error(t, err) + assert.NotEmpty(t, err.Error(), "Error message is empty") + assert.Equal(t, testCase.expectedErrKind, errors.Kind(err), "Error kind not as expected") + } else { + require.NoError(t, err) + } + }) + } +}