Skip to content

Commit

Permalink
feat(data): Make MaxEventSize a service configuration setting (#3891)
Browse files Browse the repository at this point in the history
25000KB (25MB) by default
close #3237

Signed-off-by: Ginny Guan <[email protected]>
  • Loading branch information
jinlinGuan authored Mar 16, 2022
1 parent 1cc235d commit de3e46c
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 35 deletions.
2 changes: 2 additions & 0 deletions cmd/core-data/res/configuration.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
MaxEventSize = 25000 # Defines the maximum event size in kilobytes

[Writable]
PersistData = true
LogLevel = "INFO"
Expand Down
1 change: 1 addition & 0 deletions internal/core/data/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ConfigurationStruct struct {
Registry bootstrapConfig.RegistryInfo
Service bootstrapConfig.ServiceInfo
SecretStore bootstrapConfig.SecretStoreInfo
MaxEventSize int64
}

type WritableInfo struct {
Expand Down
28 changes: 22 additions & 6 deletions internal/core/data/controller/http/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package http

import (
"bytes"
"fmt"
"io"
"math"
"net/http"
"strconv"
Expand All @@ -10,7 +12,7 @@ import (

"github.com/edgexfoundry/edgex-go/internal/core/data/application"
dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container"
"github.com/edgexfoundry/edgex-go/internal/io"
edgexIO "github.com/edgexfoundry/edgex-go/internal/io"
"github.com/edgexfoundry/edgex-go/internal/pkg"
"github.com/edgexfoundry/edgex-go/internal/pkg/utils"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
Expand All @@ -25,20 +27,20 @@ import (
)

type EventController struct {
readers map[string]io.DtoReader
readers map[string]edgexIO.DtoReader
mux sync.RWMutex
dic *di.Container
}

// NewEventController creates and initializes an EventController
func NewEventController(dic *di.Container) *EventController {
return &EventController{
readers: make(map[string]io.DtoReader),
readers: make(map[string]edgexIO.DtoReader),
dic: dic,
}
}

func (ec *EventController) getReader(r *http.Request) io.DtoReader {
func (ec *EventController) getReader(r *http.Request) edgexIO.DtoReader {
contentType := strings.ToLower(r.Header.Get(common.ContentType))
ec.mux.RLock()
reader, ok := ec.readers[contentType]
Expand All @@ -49,7 +51,7 @@ func (ec *EventController) getReader(r *http.Request) io.DtoReader {

ec.mux.Lock()
defer ec.mux.Unlock()
reader = io.NewDtoReader(contentType)
reader = edgexIO.NewDtoReader(contentType)
ec.readers[contentType] = reader
return reader
}
Expand All @@ -63,6 +65,7 @@ func (ec *EventController) AddEvent(w http.ResponseWriter, r *http.Request) {
lc := container.LoggingClientFrom(ec.dic.Get)

ctx := r.Context()
config := dataContainer.ConfigurationFrom(ec.dic.Get)

// URL parameters
vars := mux.Vars(r)
Expand All @@ -71,8 +74,21 @@ func (ec *EventController) AddEvent(w http.ResponseWriter, r *http.Request) {
sourceName := vars[common.SourceName]

var addEventReqDTO requestDTO.AddEventRequest
var err errors.EdgeX

if config.MaxEventSize > 0 && r.ContentLength > config.MaxEventSize*1000 {
err = errors.NewCommonEdgeX(errors.KindLimitExceeded, fmt.Sprintf("request size exceed %d KB", config.MaxEventSize), nil)
utils.WriteErrorResponse(w, ctx, lc, err, "")
return
}

dataBytes, readErr := io.ReadAll(r.Body)
if readErr != nil {
err = errors.NewCommonEdgeX(errors.KindIOError, "AddEventRequest I/O reading failed", nil)
} else if r.ContentLength == -1 { // only check the payload byte array size when the Content-Length of Request is unknown
err = utils.CheckPayloadSize(dataBytes, config.MaxEventSize*1000)
}

dataBytes, err := io.ReadAddEventRequestInBytes(r.Body)
if err == nil {
// Per https://github.com/edgexfoundry/edgex-go/pull/3202#discussion_r587618347
// V2 shall asynchronously publish initially encoded payload (not re-encoding) to message bus
Expand Down
80 changes: 79 additions & 1 deletion internal/core/data/controller/http/event_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2020-2021 IOTech Ltd
// Copyright (C) 2020-2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -266,6 +266,84 @@ func TestAddEvent(t *testing.T) {
}
}

func TestAddEventSize(t *testing.T) {

dbClientMock := &dbMock.DBClient{}

validRequest := testAddEvent
TestReadingLargeBinaryValue := make([]byte, 26000000)
largeBinaryRequest := validRequest
largeBinaryRequest.Event.Readings = []dtos.BaseReading{{
DeviceName: TestDeviceName,
ResourceName: TestDeviceResourceName,
ProfileName: TestDeviceProfileName,
Origin: TestOriginTime,
ValueType: common.ValueTypeBinary,
BinaryReading: dtos.BinaryReading{
BinaryValue: []byte(TestReadingLargeBinaryValue),
MediaType: TestBinaryReadingMediaType,
},
}}

tests := []struct {
Name string
Request requests.AddEventRequest
MaxEventSize int64
RequestContentType string
ErrorExpected bool
ExpectedStatusCode int
}{
{"Valid - AddEventRequest CBOR with default MaxEventSize", validRequest, 25000, common.ContentTypeCBOR, false, http.StatusCreated},
{"Valid - AddEventRequest CBOR with unlimit MaxEventSize", validRequest, 0, common.ContentTypeCBOR, false, http.StatusCreated},
{"Valid - AddEventRequest CBOR with higher MaxEventSize", largeBinaryRequest, 50000, common.ContentTypeCBOR, false, http.StatusCreated},
{"Invalid - AddEventRequest CBOR with invalid event size", largeBinaryRequest, 25000, common.ContentTypeCBOR, true, http.StatusRequestEntityTooLarge},
}

for _, testCase := range tests {
t.Run(testCase.Name, func(t *testing.T) {
dic := mocks.NewMockDIC()
dic.Update(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{
MaxEventSize: testCase.MaxEventSize,
Writable: config.WritableInfo{
PersistData: false,
},
}
},
container.DBClientInterfaceName: func(get di.Get) interface{} {
return dbClientMock
},
})
ec := NewEventController(dic)
byteData, err := toByteArray(testCase.RequestContentType, testCase.Request)
require.NoError(t, err)

reader := strings.NewReader(string(byteData))
req, err := http.NewRequest(http.MethodPost, common.ApiEventProfileNameDeviceNameSourceNameRoute, reader)
req.Header.Set(common.ContentType, testCase.RequestContentType)
req = mux.SetURLVars(req, map[string]string{common.ProfileName: validRequest.Event.ProfileName, common.DeviceName: validRequest.Event.DeviceName, common.SourceName: validRequest.Event.SourceName})
require.NoError(t, err)
recorder := httptest.NewRecorder()
handler := http.HandlerFunc(ec.AddEvent)
handler.ServeHTTP(recorder, req)

var actualResponse commonDTO.BaseWithIdResponse
err = json.Unmarshal(recorder.Body.Bytes(), &actualResponse)

if testCase.ErrorExpected {
assert.Equal(t, testCase.ExpectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected")
return
}
require.NoError(t, err)
assert.Equal(t, common.ApiVersion, actualResponse.ApiVersion, "API Version not as expected")
assert.Equal(t, testCase.ExpectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected")
assert.Equal(t, testCase.ExpectedStatusCode, int(actualResponse.StatusCode), "BaseResponse status code not as expected")
assert.Empty(t, actualResponse.Message, "Message should be empty when it is successful")
})
}
}

func TestEventById(t *testing.T) {
validEventId := expectedEventId
emptyEventId := ""
Expand Down
8 changes: 8 additions & 0 deletions internal/core/data/controller/messaging/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/edgexfoundry/edgex-go/internal/core/data/application"
dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container"
"github.com/edgexfoundry/edgex-go/internal/pkg/utils"

"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
Expand Down Expand Up @@ -58,6 +59,13 @@ func SubscribeEvents(ctx context.Context, dic *di.Container) errors.EdgeX {
case msgEnvelope := <-messages:
lc.Debugf("Event received on message queue. Topic: %s, Correlation-id: %s ", messageBusInfo.SubscribeTopic, msgEnvelope.CorrelationID)
event := &requests.AddEventRequest{}
// decoding the large payload may cause memory issues so checking before decoding
maxEventSize := dataContainer.ConfigurationFrom(dic.Get).MaxEventSize
edgeXerr := utils.CheckPayloadSize(msgEnvelope.Payload, maxEventSize*1000)
if edgeXerr != nil {
lc.Errorf("event size exceed MaxEventSize(%d KB)", maxEventSize)
break
}
err = unmarshalPayload(msgEnvelope, event)
if err != nil {
lc.Errorf("fail to unmarshal event, %v", err)
Expand Down
28 changes: 0 additions & 28 deletions internal/io/event.go

This file was deleted.

22 changes: 22 additions & 0 deletions internal/pkg/utils/payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//
// Copyright (C) 2021-2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package utils

import (
"fmt"

"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
)

func CheckPayloadSize(payload []byte, sizeLimit int64) errors.EdgeX {
// Treat 0 as unlimit size
if sizeLimit < 0 {
return errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("sizeLimit cannot be lower than 0, current sizeLimit: %d", sizeLimit), nil)
} else if sizeLimit != 0 && int64(len(payload)) > sizeLimit {
return errors.NewCommonEdgeX(errors.KindLimitExceeded, fmt.Sprintf("request size exceed %d KB", sizeLimit), nil)
}
return nil
}
39 changes: 39 additions & 0 deletions internal/pkg/utils/payload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package utils

import (
"testing"

"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCheckPayloadSize(t *testing.T) {
smallpayload := make([]byte, 10)
largePayload := make([]byte, 25000000)
tests := []struct {
name string
payload []byte
sizeLimit int64
errorExpected bool
expectedErrKind errors.ErrKind
}{
{"Valid small size", smallpayload, int64(len(smallpayload)), false, ""},
{"Valid large size", largePayload, int64(len(largePayload)), false, ""},
{"Invalid small size", smallpayload, int64(len(smallpayload) - 1), true, errors.KindLimitExceeded},
{"Invalid large size", largePayload, int64(len(largePayload) - 1), true, errors.KindLimitExceeded},
}

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
err := CheckPayloadSize(testCase.payload, testCase.sizeLimit)
if testCase.errorExpected {
require.Error(t, err)
assert.NotEmpty(t, err.Error(), "Error message is empty")
assert.Equal(t, testCase.expectedErrKind, errors.Kind(err), "Error kind not as expected")
} else {
require.NoError(t, err)
}
})
}
}

0 comments on commit de3e46c

Please sign in to comment.