diff --git a/internal/core/data/controller/messaging/subscriber.go b/internal/core/data/application/subscriber.go similarity index 95% rename from internal/core/data/controller/messaging/subscriber.go rename to internal/core/data/application/subscriber.go index 01973f993a..9ebd3c0152 100644 --- a/internal/core/data/controller/messaging/subscriber.go +++ b/internal/core/data/application/subscriber.go @@ -3,7 +3,7 @@ // // SPDX-License-Identifier: Apache-2.0 -package messaging +package application import ( "context" @@ -11,7 +11,6 @@ import ( "fmt" "strings" - "github.com/edgexfoundry/edgex-go/internal/core/data/application" dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" "github.com/edgexfoundry/edgex-go/internal/pkg/utils" @@ -59,6 +58,7 @@ func SubscribeEvents(ctx context.Context, dic *di.Container) errors.EdgeX { case msgEnvelope := <-messages: lc.Debugf("Event received on message queue. Topic: %s, Correlation-id: %s ", messageBusInfo.SubscribeTopic, msgEnvelope.CorrelationID) event := &requests.AddEventRequest{} + // decoding the large payload may cause memory issues so checking before decoding maxEventSize := dataContainer.ConfigurationFrom(dic.Get).MaxEventSize edgeXerr := utils.CheckPayloadSize(msgEnvelope.Payload, maxEventSize*1000) if edgeXerr != nil { @@ -75,7 +75,7 @@ func SubscribeEvents(ctx context.Context, dic *di.Container) errors.EdgeX { lc.Error(err.Error()) break } - err = application.AddEvent(requests.AddEventReqToEventModel(*event), ctx, dic) + err = AddEvent(requests.AddEventReqToEventModel(*event), ctx, dic) if err != nil { lc.Errorf("fail to persist the event, %v", err) } diff --git a/internal/core/data/controller/http/event.go b/internal/core/data/controller/http/event.go index c629ceca11..f8f4acde94 100644 --- a/internal/core/data/controller/http/event.go +++ b/internal/core/data/controller/http/event.go @@ -2,6 +2,7 @@ package http import ( "bytes" + "fmt" "io" "math" "net/http" @@ -75,6 +76,12 @@ func (ec *EventController) AddEvent(w http.ResponseWriter, r *http.Request) { var addEventReqDTO requestDTO.AddEventRequest var err errors.EdgeX + if config.MaxEventSize > 0 && r.ContentLength > config.MaxEventSize*1000 { + err = errors.NewCommonEdgeX(errors.KindLimitExceeded, fmt.Sprintf("request size exceed %d KB", config.MaxEventSize), nil) + utils.WriteErrorResponse(w, ctx, lc, err, "") + return + } + dataBytes, readErr := io.ReadAll(r.Body) if readErr != nil { err = errors.NewCommonEdgeX(errors.KindIOError, "AddEventRequest I/O reading failed", nil) diff --git a/internal/core/data/controller/http/event_test.go b/internal/core/data/controller/http/event_test.go index e9f3187309..59a6758d55 100644 --- a/internal/core/data/controller/http/event_test.go +++ b/internal/core/data/controller/http/event_test.go @@ -271,17 +271,33 @@ func TestAddEventSize(t *testing.T) { dbClientMock := &dbMock.DBClient{} validRequest := testAddEvent + TestReadingLargeBinaryValue := make([]byte, 26000000) + largeBinaryRequest := validRequest + largeBinaryRequest.Event.Readings = []dtos.BaseReading{{ + DeviceName: TestDeviceName, + ResourceName: TestDeviceResourceName, + ProfileName: TestDeviceProfileName, + Origin: TestOriginTime, + ValueType: common.ValueTypeBinary, + BinaryReading: dtos.BinaryReading{ + BinaryValue: []byte(TestReadingLargeBinaryValue), + MediaType: TestBinaryReadingMediaType, + }, + }} tests := []struct { Name string + Request requests.AddEventRequest MaxEventSize int64 RequestContentType string - errorExpected bool + ErrorExpected bool ExpectedStatusCode int }{ - {"Valid - AddEventRequest JSON with valid event size", 25000, common.ContentTypeJSON, false, http.StatusCreated}, - {"Valid - AddEventRequest JSON with valid event size", 0, common.ContentTypeJSON, false, http.StatusCreated}, - {"Invalid - AddEventRequest JSON with negative evnet size", -1, common.ContentTypeJSON, true, http.StatusBadRequest}, + {"Valid - AddEventRequest CBOR with default MaxEventSize", validRequest, 25000, common.ContentTypeCBOR, false, http.StatusCreated}, + {"Valid - AddEventRequest CBOR with unlimit MaxEventSize", validRequest, 0, common.ContentTypeCBOR, false, http.StatusCreated}, + {"Valid - AddEventRequest CBOR with higher MaxEventSize", largeBinaryRequest, 50000, common.ContentTypeCBOR, false, http.StatusCreated}, + {"Invalid - AddEventRequest CBOR with negative MaxEventSize", validRequest, -1, common.ContentTypeCBOR, true, http.StatusBadRequest}, + {"Invalid - AddEventRequest CBOR with invalid event size", largeBinaryRequest, 25000, common.ContentTypeCBOR, true, http.StatusRequestEntityTooLarge}, } for _, testCase := range tests { @@ -301,7 +317,7 @@ func TestAddEventSize(t *testing.T) { }, }) ec := NewEventController(dic) - byteData, err := toByteArray(testCase.RequestContentType, validRequest) + byteData, err := toByteArray(testCase.RequestContentType, testCase.Request) require.NoError(t, err) reader := strings.NewReader(string(byteData)) @@ -317,7 +333,7 @@ func TestAddEventSize(t *testing.T) { var actualResponse commonDTO.BaseWithIdResponse err = json.Unmarshal(recorder.Body.Bytes(), &actualResponse) - if testCase.errorExpected { + if testCase.ErrorExpected { assert.Equal(t, testCase.ExpectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected") return } diff --git a/internal/core/data/init.go b/internal/core/data/init.go index 6e34a429fe..3f84479fd6 100644 --- a/internal/core/data/init.go +++ b/internal/core/data/init.go @@ -19,8 +19,8 @@ import ( "context" "sync" + "github.com/edgexfoundry/edgex-go/internal/core/data/application" dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container" - "github.com/edgexfoundry/edgex-go/internal/core/data/controller/messaging" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/startup" @@ -51,7 +51,7 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, st lc := container.LoggingClientFrom(dic.Get) if configuration.MessageQueue.SubscribeEnabled { - err := messaging.SubscribeEvents(ctx, dic) + err := application.SubscribeEvents(ctx, dic) if err != nil { lc.Errorf("Failed to subscribe events from message bus, %v", err) return false diff --git a/internal/pkg/utils/payload_test.go b/internal/pkg/utils/payload_test.go index a2c08b9c7d..48f24bca9c 100644 --- a/internal/pkg/utils/payload_test.go +++ b/internal/pkg/utils/payload_test.go @@ -9,20 +9,24 @@ import ( ) func TestCheckPayloadSize(t *testing.T) { - payload := make([]byte, 10) + smallpayload := make([]byte, 10) + largePayload := make([]byte, 25000000) tests := []struct { name string + payload []byte sizeLimit int64 errorExpected bool expectedErrKind errors.ErrKind }{ - {"Valid Size", int64(len(payload)), false, ""}, - {"Invalid Size", int64(len(payload) - 1), true, errors.KindLimitExceeded}, + {"Valid small size", smallpayload, int64(len(smallpayload)), false, ""}, + {"Valid large size", largePayload, int64(len(largePayload)), false, ""}, + {"Invalid small size", smallpayload, int64(len(smallpayload) - 1), true, errors.KindLimitExceeded}, + {"Invalid large size", largePayload, int64(len(largePayload) - 1), true, errors.KindLimitExceeded}, } for _, testCase := range tests { t.Run(testCase.name, func(t *testing.T) { - err := CheckPayloadSize(payload, testCase.sizeLimit) + err := CheckPayloadSize(testCase.payload, testCase.sizeLimit) if testCase.errorExpected { require.Error(t, err) assert.NotEmpty(t, err.Error(), "Error message is empty")