Skip to content

Commit

Permalink
Fix resources (#2288)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kostas Papageorgiou authored Dec 16, 2020
1 parent 4b07f5c commit 65ed283
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 118 deletions.
143 changes: 143 additions & 0 deletions internal/compliance/datalake_forwarder/forwarder/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/aws/aws-sdk-go/service/lambda"
jsoniter "github.com/json-iterator/go"
Expand All @@ -32,6 +33,7 @@ import (
"go.uber.org/zap"

"github.com/panther-labs/panther/api/lambda/source/models"
"github.com/panther-labs/panther/internal/compliance/datalake_forwarder/forwarder/diff"
"github.com/panther-labs/panther/internal/compliance/datalake_forwarder/forwarder/events"
"github.com/panther-labs/panther/pkg/testutils"
)
Expand Down Expand Up @@ -188,3 +190,144 @@ func TestComplianceEventStatusChange(t *testing.T) {
lambdaMock.AssertExpectations(t)
firehoseMock.AssertExpectations(t)
}

func TestResourceEvent(t *testing.T) {
t.Parallel()
lambdaMock := &testutils.LambdaMock{}
firehoseMock := &testutils.FirehoseMock{}

sh := StreamHandler{
LambdaClient: lambdaMock,
FirehoseClient: firehoseMock,
StreamName: "stream-name",
}

record := events.DynamoDBEventRecord{
AWSRegion: "eu-west-1",
EventSource: "aws:dynamodb",
EventName: "MODIFY",
EventID: "a04e16d70e2520ff8a3569354b55b3f5",
EventVersion: "1.1",
EventSourceArn: "arn:aws:dynamodb:eu-west-1:123456789012:table/panther-resources/stream/2020-12-09T13:15:55.703",
Change: events.DynamoDBStreamRecord{
Keys: map[string]*dynamodb.AttributeValue{
"id": {S: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder")},
},
NewImage: map[string]*dynamodb.AttributeValue{
"id": {S: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder")},
"lowerId": {S: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder")},
"expiresAt": {N: aws.String("1607707952")},
"integrationId": {S: aws.String("8349b647-f731-48c4-9d6b-eefff4010c14")},
"deleted": {BOOL: aws.Bool(false)},
"integrationType": {S: aws.String("aws")},
"type": {S: aws.String("AWS.Lambda.Function")},
"lastModified": {S: aws.String("2020-12-09T15:32:32.362503673Z")},
"attributes": {M: map[string]*dynamodb.AttributeValue{
"Policy": {NULL: aws.Bool(true)},
"RevisionId": {S: aws.String("433968bb-c360-4411-8f38-0ac65767f230")},
"LastModified": {S: aws.String("2020-12-15T11:10:32.883+0000")},
"MemorySize": {N: aws.String("128")},
"ResourceId": {S: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder")},
"TimeCreated": {NULL: aws.Bool(true)},
"Region": {S: aws.String("eu-west-1")},
"Arn": {S: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder")},
"ResourceType": {S: aws.String("AWS.Lambda.Function")},
"AccountId": {S: aws.String("123456789012")},
"Name": {S: aws.String("panther-cloudsecurity-datalake-forwarder")},
"Tags": {M: map[string]*dynamodb.AttributeValue{
"key": {S: aws.String("value")},
}}},
}},
OldImage: map[string]*dynamodb.AttributeValue{
"id": {S: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder")},
"lowerId": {S: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder")},
"expiresAt": {N: aws.String("1607707952")},
"integrationId": {S: aws.String("8349b647-f731-48c4-9d6b-eefff4010c14")},
"deleted": {BOOL: aws.Bool(false)},
"integrationType": {S: aws.String("aws")},
"type": {S: aws.String("AWS.Lambda.Function")},
"lastModified": {S: aws.String("2020-12-09T15:32:32.362503673Z")},
"attributes": {M: map[string]*dynamodb.AttributeValue{
"Policy": {NULL: aws.Bool(true)},
"RevisionId": {S: aws.String("433968bb-c360-4411-8f38-0ac65767f230")},
"LastModified": {S: aws.String("2020-12-15T11:10:32.883+0000")},
"MemorySize": {N: aws.String("256")},
"ResourceId": {S: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder")},
"TimeCreated": {NULL: aws.Bool(true)},
"Region": {S: aws.String("eu-west-1")},
"Arn": {S: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder")},
"ResourceType": {S: aws.String("AWS.Lambda.Function")},
"AccountId": {S: aws.String("123456789012")},
"Name": {S: aws.String("panther-cloudsecurity-datalake-forwarder")},
"Tags": {M: map[string]*dynamodb.AttributeValue{
"key": {S: aws.String("value")},
}}},
},
},
},
}

// Mock fetching of integration label
integrations := []*models.SourceIntegrationMetadata{
{
IntegrationID: "8349b647-f731-48c4-9d6b-eefff4010c14",
IntegrationLabel: "test-label",
},
}
marshaledIntegrations, err := jsoniter.Marshal(integrations)
assert.NoError(t, err)
lambdaMock.On("Invoke", mock.Anything).Return(&lambda.InvokeOutput{Payload: marshaledIntegrations}, nil).Once()

// Mock firehose invocation
firehoseMock.On("PutRecordBatchWithContext", mock.Anything, mock.Anything, mock.Anything).
Return(&firehose.PutRecordBatchOutput{}, nil).
Once()

assert.NoError(t, sh.Run(context.Background(), zap.L(), &events.DynamoDBEvent{Records: []events.DynamoDBEventRecord{record}}))
lambdaMock.AssertExpectations(t)
firehoseMock.AssertExpectations(t)

// Verify firehose payload

// First
expectedChange := ResourceChange{
ChangeType: "MODIFIED",
IntegrationID: "8349b647-f731-48c4-9d6b-eefff4010c14",
IntegrationLabel: "test-label",
LastUpdated: "2020-12-09T15:32:32.362503673Z",
ID: "arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder",
Changes: map[string]diff.Diff{
"MemorySize": {
From: float64(256),
To: float64(128),
},
},
ResourceAttributes: ResourceAttributes{
TimeCreated: nil,
Name: aws.String("panther-cloudsecurity-datalake-forwarder"),
ResourceType: aws.String("AWS.Lambda.Function"),
ResourceID: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder"),
Region: aws.String("eu-west-1"),
AccountID: aws.String("123456789012"),
ARN: aws.String("arn:aws:lambda:eu-west-1:123456789012:function:panther-cloudsecurity-datalake-forwarder"),
Tags: map[string]string{
"key": "value",
},
},
}

var expectedResource map[string]interface{}
if err = dynamodbattribute.Unmarshal(record.Change.NewImage["attributes"], &expectedResource); err != nil {
t.Error("failed to marshal attributes")
}
expectedChange.Resource = expectedResource

request := firehoseMock.Calls[0].Arguments[1].(*firehose.PutRecordBatchInput)
assert.Equal(t, 1, len(request.Records))
assert.Equal(t, "stream-name", *request.DeliveryStreamName)
var change ResourceChange
if err := jsoniter.Unmarshal(request.Records[0].Data, &change); err != nil {
t.Error("failed to unmarshal change")
}
assert.Equal(t, expectedChange, change)
}
103 changes: 73 additions & 30 deletions internal/compliance/datalake_forwarder/forwarder/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,32 @@ import (
)

type ResourceChange struct {
ChangeType string `json:"changeType"`
Changes diff.Changelog `json:"changes"`
IntegrationID string `json:"integrationID"`
IntegrationLabel string `json:"integrationLabel"`
LastUpdated string `json:"lastUpdated"`
Resource jsoniter.RawMessage `json:"resource"`
ChangeType string `json:"changeType"`
Changes diff.Changelog `json:"changes"`
IntegrationID string `json:"integrationId"`
IntegrationLabel string `json:"integrationLabel"`
LastUpdated string `json:"lastUpdated"`
ID string `json:"id"`
Resource map[string]interface{} `json:"resource"`
ResourceAttributes
}

type ResourceAttributes struct {
ResourceID *string `json:"resourceId,omitempty"`
ResourceType *string `json:"resourceType,omitempty"`
TimeCreated *string `json:"timeCreated,omitempty"`
AccountID *string `json:"accountId,omitempty"`
Region *string `json:"region,omitempty"`
ARN *string `json:"arn,omitempty"`
Name *string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
}

type resourceSnapshot struct {
LastModified string `json:"lastModified"`
IntegrationID string `json:"integrationId"`
ID string `json:"id"`
Deleted bool `json:"deleted"`
Attributes map[string]interface{} `json:"attributes"`
}

Expand All @@ -53,8 +68,11 @@ func (sh *StreamHandler) processResourceChanges(record *events.DynamoDBEventReco
resource, err = sh.processResourceSnapshot(ChangeTypeCreate, record.Change.NewImage)
case lambdaevents.DynamoDBOperationTypeRemove:
resource, err = sh.processResourceSnapshot(ChangeTypeDelete, record.Change.OldImage)
case lambdaevents.DynamoDBOperationTypeModify:
resource, err = sh.processResourceSnapshotDiff(record.Change.OldImage, record.Change.NewImage)
default:
resource, err = sh.processResourceSnapshotDiff(record.EventName, record.Change.OldImage, record.Change.NewImage)
zap.L().Error("Unknown Event Type", zap.String("record.EventName", record.EventName))
return nil, nil
}

if err != nil {
Expand All @@ -71,16 +89,21 @@ func (sh *StreamHandler) processResourceChanges(record *events.DynamoDBEventReco
return resource, nil
}

func (sh *StreamHandler) processResourceSnapshotDiff(eventName string,
oldImage, newImage map[string]*dynamodb.AttributeValue) (*ResourceChange, error) {
func (sh *StreamHandler) processResourceSnapshotDiff(oldImage, newImage map[string]*dynamodb.AttributeValue) (*ResourceChange, error) {
var newSnapshot resourceSnapshot
if err := dynamodbattribute.UnmarshalMap(newImage, &newSnapshot); err != nil {
return nil, errors.Wrapf(err, "could not unmarshal new image %#v", newImage)
}
if newSnapshot.Attributes == nil {
return nil, errors.Errorf("resources-table new image did include top level key attributes: %#v", newImage)
}

oldSnapshot := resourceSnapshot{}
if err := dynamodbattribute.UnmarshalMap(oldImage, &oldSnapshot); err != nil || oldSnapshot.Attributes == nil {
return nil, errors.New("resources-table record old image did include top level key attributes")
var oldSnapshot resourceSnapshot
if err := dynamodbattribute.UnmarshalMap(oldImage, &oldSnapshot); err != nil {
return nil, errors.Wrapf(err, "could not unmarshal old image %#v", oldImage)
}
newSnapshot := resourceSnapshot{}
if err := dynamodbattribute.UnmarshalMap(newImage, &newSnapshot); err != nil || newSnapshot.Attributes == nil {
return nil, errors.New("resources-table record new image did include top level key attributes")
if oldSnapshot.Attributes == nil {
return nil, errors.Errorf("resources-table old image did include top level key attributes: %#v", oldImage)
}

// First convert the old & new image from the useless dynamodb stream format into a JSON string
Expand All @@ -100,7 +123,6 @@ func (sh *StreamHandler) processResourceSnapshotDiff(eventName string,
}
zap.L().Debug(
"processing resource record",
zap.Any("record.EventName", eventName),
zap.Any("newImage", newImageJSON),
zap.Any("changes", changes),
zap.Error(err),
Expand All @@ -116,12 +138,19 @@ func (sh *StreamHandler) processResourceSnapshotDiff(eventName string,
return nil, nil
}

var attributes ResourceAttributes
if err := jsoniter.Unmarshal(newImageJSON, &attributes); err != nil {
return nil, errors.Wrapf(err, "failed to populate attributes: %s", string(newImageJSON))
}

out := &ResourceChange{
LastUpdated: newSnapshot.LastModified,
IntegrationID: newSnapshot.IntegrationID,
IntegrationLabel: integrationLabel,
Resource: newImageJSON,
Changes: changes,
LastUpdated: newSnapshot.LastModified,
IntegrationID: newSnapshot.IntegrationID,
ID: newSnapshot.ID,
IntegrationLabel: integrationLabel,
Resource: newSnapshot.Attributes,
Changes: changes,
ResourceAttributes: attributes,
}

// If nothing changed, report it as a sync
Expand All @@ -137,10 +166,15 @@ func (sh *StreamHandler) processResourceSnapshotDiff(eventName string,
func (sh *StreamHandler) processResourceSnapshot(changeType string,
image map[string]*dynamodb.AttributeValue) (*ResourceChange, error) {

change := resourceSnapshot{}
if err := dynamodbattribute.UnmarshalMap(image, &change); err != nil || change.Attributes == nil {
return nil, errors.New("resources-table record image did include top level key attributes")
var change resourceSnapshot
if err := dynamodbattribute.UnmarshalMap(image, &change); err != nil {
return nil, errors.Wrapf(err, "could not unmarshal image %#v", image)
}

if change.Attributes == nil {
return nil, errors.Errorf("resources-table image did include top level key attributes: %#v", image)
}

integrationLabel, err := sh.getIntegrationLabel(change.IntegrationID)
if err != nil {
return nil, errors.Wrapf(err, "failed to retrieve integration label for %q", change.IntegrationID)
Expand All @@ -150,15 +184,24 @@ func (sh *StreamHandler) processResourceSnapshot(changeType string,
if len(integrationLabel) == 0 {
return nil, nil
}

rawResource, err := jsoniter.Marshal(change.Attributes)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal resource")
return nil, errors.Wrapf(err, "failed to marshal attributes: %#v", rawResource)
}

var attributes ResourceAttributes
if err := jsoniter.Unmarshal(rawResource, &attributes); err != nil {
return nil, errors.Wrapf(err, "failed to populate attributes: %s", string(rawResource))
}

return &ResourceChange{
IntegrationID: change.IntegrationID,
IntegrationLabel: integrationLabel,
LastUpdated: change.LastModified,
Resource: rawResource,
ChangeType: changeType,
ID: change.ID,
IntegrationID: change.IntegrationID,
IntegrationLabel: integrationLabel,
LastUpdated: change.LastModified,
Resource: change.Attributes,
ChangeType: changeType,
ResourceAttributes: attributes,
}, nil
}
Loading

0 comments on commit 65ed283

Please sign in to comment.