Skip to content

Commit

Permalink
feat(data): Make MaxEventSize a service configuration setting
Browse files Browse the repository at this point in the history
25000KB (25MB) by default
close #3237

Signed-off-by: Ginny Guan <[email protected]>
  • Loading branch information
jinlinGuan committed Mar 14, 2022
1 parent fbf009a commit 8099421
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 40 deletions.
2 changes: 2 additions & 0 deletions cmd/core-data/res/configuration.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
MaxEventSize = 25000 # Defines the maximum event size in kilobytes

[Writable]
PersistData = true
LogLevel = "INFO"
Expand Down
1 change: 1 addition & 0 deletions internal/core/data/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ConfigurationStruct struct {
Registry bootstrapConfig.RegistryInfo
Service bootstrapConfig.ServiceInfo
SecretStore bootstrapConfig.SecretStoreInfo
MaxEventSize int64
}

type WritableInfo struct {
Expand Down
22 changes: 16 additions & 6 deletions internal/core/data/controller/http/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"bytes"
"io"
"math"
"net/http"
"strconv"
Expand All @@ -10,7 +11,7 @@ import (

"github.com/edgexfoundry/edgex-go/internal/core/data/application"
dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container"
"github.com/edgexfoundry/edgex-go/internal/io"
edgexIO "github.com/edgexfoundry/edgex-go/internal/io"
"github.com/edgexfoundry/edgex-go/internal/pkg"
"github.com/edgexfoundry/edgex-go/internal/pkg/utils"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
Expand All @@ -25,20 +26,20 @@ import (
)

type EventController struct {
readers map[string]io.DtoReader
readers map[string]edgexIO.DtoReader
mux sync.RWMutex
dic *di.Container
}

// NewEventController creates and initializes an EventController
func NewEventController(dic *di.Container) *EventController {
return &EventController{
readers: make(map[string]io.DtoReader),
readers: make(map[string]edgexIO.DtoReader),
dic: dic,
}
}

func (ec *EventController) getReader(r *http.Request) io.DtoReader {
func (ec *EventController) getReader(r *http.Request) edgexIO.DtoReader {
contentType := strings.ToLower(r.Header.Get(common.ContentType))
ec.mux.RLock()
reader, ok := ec.readers[contentType]
Expand All @@ -49,7 +50,7 @@ func (ec *EventController) getReader(r *http.Request) io.DtoReader {

ec.mux.Lock()
defer ec.mux.Unlock()
reader = io.NewDtoReader(contentType)
reader = edgexIO.NewDtoReader(contentType)
ec.readers[contentType] = reader
return reader
}
Expand All @@ -63,6 +64,7 @@ func (ec *EventController) AddEvent(w http.ResponseWriter, r *http.Request) {
lc := container.LoggingClientFrom(ec.dic.Get)

ctx := r.Context()
config := dataContainer.ConfigurationFrom(ec.dic.Get)

// URL parameters
vars := mux.Vars(r)
Expand All @@ -71,8 +73,16 @@ func (ec *EventController) AddEvent(w http.ResponseWriter, r *http.Request) {
sourceName := vars[common.SourceName]

var addEventReqDTO requestDTO.AddEventRequest
var err errors.EdgeX

dataBytes, readErr := io.ReadAll(r.Body)
if readErr == nil {
err = utils.CheckPayloadSize(dataBytes, config.MaxEventSize*1000)
}
if readErr != nil {
err = errors.NewCommonEdgeX(errors.KindIOError, "AddEventRequest I/O reading failed", nil)
}

dataBytes, err := io.ReadAddEventRequestInBytes(r.Body)
if err == nil {
// Per https://github.com/edgexfoundry/edgex-go/pull/3202#discussion_r587618347
// V2 shall asynchronously publish initially encoded payload (not re-encoding) to message bus
Expand Down
66 changes: 65 additions & 1 deletion internal/core/data/controller/http/event_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2020-2021 IOTech Ltd
// Copyright (C) 2020-2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -266,6 +266,70 @@ func TestAddEvent(t *testing.T) {
}
}

func TestAddEventSize(t *testing.T) {

dbClientMock := &dbMock.DBClient{}

validRequest := testAddEvent

tests := []struct {
Name string
MaxEventSize int64
RequestContentType string
errorExpected bool
ExpectedStatusCode int
}{
{"Valid - AddEventRequest JSON with valid event size", 25000, common.ContentTypeJSON, false, http.StatusCreated},
{"Valid - AddEventRequest JSON with valid event size", 0, common.ContentTypeJSON, false, http.StatusCreated},
{"Invalid - AddEventRequest JSON with negative evnet size", -1, common.ContentTypeJSON, true, http.StatusBadRequest},
}

for _, testCase := range tests {
t.Run(testCase.Name, func(t *testing.T) {
dic := mocks.NewMockDIC()
dic.Update(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{
MaxEventSize: testCase.MaxEventSize,
Writable: config.WritableInfo{
PersistData: false,
},
}
},
container.DBClientInterfaceName: func(get di.Get) interface{} {
return dbClientMock
},
})
ec := NewEventController(dic)
byteData, err := toByteArray(testCase.RequestContentType, validRequest)
require.NoError(t, err)

reader := strings.NewReader(string(byteData))
req, err := http.NewRequest(http.MethodPost, common.ApiEventProfileNameDeviceNameSourceNameRoute, reader)
req.Header.Set(common.ContentType, testCase.RequestContentType)
req = mux.SetURLVars(req, map[string]string{common.ProfileName: validRequest.Event.ProfileName, common.DeviceName: validRequest.Event.DeviceName, common.SourceName: validRequest.Event.SourceName})
require.NoError(t, err)

recorder := httptest.NewRecorder()
handler := http.HandlerFunc(ec.AddEvent)
handler.ServeHTTP(recorder, req)

var actualResponse commonDTO.BaseWithIdResponse
err = json.Unmarshal(recorder.Body.Bytes(), &actualResponse)

if testCase.errorExpected {
assert.Equal(t, testCase.ExpectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected")
return
}
require.NoError(t, err)
assert.Equal(t, common.ApiVersion, actualResponse.ApiVersion, "API Version not as expected")
assert.Equal(t, testCase.ExpectedStatusCode, recorder.Result().StatusCode, "HTTP status code not as expected")
assert.Equal(t, testCase.ExpectedStatusCode, int(actualResponse.StatusCode), "BaseResponse status code not as expected")
assert.Empty(t, actualResponse.Message, "Message should be empty when it is successful")
})
}
}

func TestEventById(t *testing.T) {
validEventId := expectedEventId
emptyEventId := ""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
//
// Copyright (C) 2021 IOTech Ltd
// Copyright (C) 2021-2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package application
package messaging

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/edgexfoundry/edgex-go/internal/core/data/application"
dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container"
"github.com/edgexfoundry/edgex-go/internal/pkg/utils"

"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
Expand Down Expand Up @@ -57,6 +59,12 @@ func SubscribeEvents(ctx context.Context, dic *di.Container) errors.EdgeX {
case msgEnvelope := <-messages:
lc.Debugf("Event received on message queue. Topic: %s, Correlation-id: %s ", messageBusInfo.SubscribeTopic, msgEnvelope.CorrelationID)
event := &requests.AddEventRequest{}
maxEventSize := dataContainer.ConfigurationFrom(dic.Get).MaxEventSize
edgeXerr := utils.CheckPayloadSize(msgEnvelope.Payload, maxEventSize*1000)
if edgeXerr != nil {
lc.Errorf("event size exceed MaxEventSize(%d KB)", maxEventSize)
break
}
err = unmarshalPayload(msgEnvelope, event)
if err != nil {
lc.Errorf("fail to unmarshal event, %v", err)
Expand All @@ -67,7 +75,7 @@ func SubscribeEvents(ctx context.Context, dic *di.Container) errors.EdgeX {
lc.Error(err.Error())
break
}
err = AddEvent(requests.AddEventReqToEventModel(*event), ctx, dic)
err = application.AddEvent(requests.AddEventReqToEventModel(*event), ctx, dic)
if err != nil {
lc.Errorf("fail to persist the event, %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/core/data/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"context"
"sync"

"github.com/edgexfoundry/edgex-go/internal/core/data/application"
dataContainer "github.com/edgexfoundry/edgex-go/internal/core/data/container"
"github.com/edgexfoundry/edgex-go/internal/core/data/controller/messaging"

"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/startup"
Expand Down Expand Up @@ -51,7 +51,7 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, st
lc := container.LoggingClientFrom(dic.Get)

if configuration.MessageQueue.SubscribeEnabled {
err := application.SubscribeEvents(ctx, dic)
err := messaging.SubscribeEvents(ctx, dic)
if err != nil {
lc.Errorf("Failed to subscribe events from message bus, %v", err)
return false
Expand Down
28 changes: 0 additions & 28 deletions internal/io/event.go

This file was deleted.

22 changes: 22 additions & 0 deletions internal/pkg/utils/payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//
// Copyright (C) 2021-2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package utils

import (
"fmt"

"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
)

func CheckPayloadSize(payload []byte, sizeLimit int64) errors.EdgeX {
// Treat 0 as unlimit size
if sizeLimit < 0 {
return errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("sizeLimit cannot be lower than 0, current sizeLimit: %d", sizeLimit), nil)
} else if sizeLimit != 0 && int64(len(payload)) > sizeLimit {
return errors.NewCommonEdgeX(errors.KindLimitExceeded, fmt.Sprintf("request size exceed %d KB", sizeLimit), nil)
}
return nil
}
35 changes: 35 additions & 0 deletions internal/pkg/utils/payload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package utils

import (
"testing"

"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCheckPayloadSize(t *testing.T) {
payload := make([]byte, 10)
tests := []struct {
name string
sizeLimit int64
errorExpected bool
expectedErrKind errors.ErrKind
}{
{"Valid Size", int64(len(payload)), false, ""},
{"Invalid Size", int64(len(payload) - 1), true, errors.KindLimitExceeded},
}

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
err := CheckPayloadSize(payload, testCase.sizeLimit)
if testCase.errorExpected {
require.Error(t, err)
assert.NotEmpty(t, err.Error(), "Error message is empty")
assert.Equal(t, testCase.expectedErrKind, errors.Kind(err), "Error kind not as expected")
} else {
require.NoError(t, err)
}
})
}
}

0 comments on commit 8099421

Please sign in to comment.