Skip to content

Commit

Permalink
Refactor datacatalog updater to only accept events from SQS (#1985)
Browse files Browse the repository at this point in the history
* Refactor datacatalog updater to only accept events from SQS

* Remove enterprise-only stuff

* fix PR feedback

* add comments

* Make sure S3 events are processed before sync events

* Fix test and linter
  • Loading branch information
alxarch authored Nov 18, 2020
1 parent f7b38ff commit 6c93eb6
Show file tree
Hide file tree
Showing 25 changed files with 769 additions and 673 deletions.
12 changes: 11 additions & 1 deletion deployments/log_analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ Resources:
Variables:
ATHENA_WORKGROUP: !Ref AthenaWorkGroup
DEBUG: !Ref Debug
QUEUE_URL: !Ref UpdaterQueue
PROCESSED_DATA_BUCKET: !Ref ProcessedDataBucket
Events:
Queue:
Expand All @@ -710,6 +711,7 @@ Resources:
- glue:GetTable
- glue:GetTables
- glue:CreateTable
- glue:CreateDatabase
- glue:UpdateTable
- glue:CreatePartition
- glue:GetPartition
Expand All @@ -719,6 +721,13 @@ Resources:
- !Sub arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:catalog
- !Sub arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:database/panther*
- !Sub arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:table/panther*
- Id: SQSPermissions # allow writing
Version: 2012-10-17
Statement:
- Effect: Allow
Action: sqs:SendMessage
Resource:
- !GetAtt UpdaterQueue.Arn # send messages to updater queue (defer tasks)
- Id: AthenaPermissions # to create views over Glue tables
Version: 2012-10-17
Statement:
Expand Down Expand Up @@ -749,7 +758,8 @@ Resources:
Statement:
- Effect: Allow
Action: lambda:InvokeFunction
Resource: !Sub arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:panther-datacatalog-updater
Resource:
- !Sub arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:panther-logtypes-api

UpdaterAlarms:
Type: Custom::LambdaAlarms
Expand Down
37 changes: 10 additions & 27 deletions internal/core/custom_resources/resources/log_processor_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

"github.com/panther-labs/panther/internal/core/source_api/apifunctions"
"github.com/panther-labs/panther/internal/log_analysis/awsglue"
"github.com/panther-labs/panther/internal/log_analysis/datacatalog_updater/process"
"github.com/panther-labs/panther/internal/log_analysis/datacatalog_updater/datacatalog"
"github.com/panther-labs/panther/pkg/awsutils"
)

Expand All @@ -49,7 +49,15 @@ func customUpdateLogProcessorTables(ctx context.Context, event cfn.Event) (strin
zap.L().Error("failed to parse resource properties", zap.Error(err))
return physicalResourceID, nil, err
}
if err := updateLogProcessorTables(ctx, &props); err != nil {
requiredLogTypes, err := apifunctions.ListLogTypes(ctx, lambdaClient)
if err != nil {
return physicalResourceID, nil, errors.Wrap(err, "failed to fetch required log types from Sources API")
}
client := datacatalog.Client{
SQSAPI: sqsClient,
QueueURL: props.DataCatalogUpdaterQueueURL,
}
if err := client.SendSyncDatabase(ctx, "", requiredLogTypes); err != nil {
zap.L().Error("failed to update glue tables", zap.Error(err))
return physicalResourceID, nil, err
}
Expand All @@ -70,28 +78,3 @@ func customUpdateLogProcessorTables(ctx context.Context, event cfn.Event) (strin
return "", nil, fmt.Errorf("unknown request type %s", event.RequestType)
}
}

func updateLogProcessorTables(ctx context.Context, props *UpdateLogProcessorTablesProperties) error {
// ensure databases are all there
for pantherDatabase, pantherDatabaseDescription := range awsglue.PantherDatabases {
zap.L().Info("creating database", zap.String("database", pantherDatabase))
if _, err := awsglue.CreateDatabase(glueClient, pantherDatabase, pantherDatabaseDescription); err != nil {
if awsutils.IsAnyError(err, glue.ErrCodeAlreadyExistsException) {
zap.L().Info("database exists", zap.String("database", pantherDatabase))
} else {
return errors.Wrapf(err, "failed creating database %s", pantherDatabase)
}
}
}

// update schemas and views for tables that are deployed
logTypes, err := apifunctions.ListLogTypes(ctx, lambdaClient)
if err != nil {
return err
}
m := process.CreateTablesMessage{
LogTypes: logTypes,
Sync: true, // force a partition sync
}
return m.Send(sqsClient, props.DataCatalogUpdaterQueueURL)
}
2 changes: 2 additions & 0 deletions internal/core/logtypesapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"context"
)

const LambdaName = "panther-logtypes-api"

// Generate a lambda client using apigen
// nolint:lll
//go:generate go run github.com/panther-labs/panther/pkg/x/apigen -target LogTypesAPI -type lambdaclient -out ./lambdaclient_gen.go
Expand Down
15 changes: 11 additions & 4 deletions internal/core/source_api/api/put_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package api
*/

import (
"context"
"fmt"
"strings"
"time"
Expand All @@ -33,7 +34,7 @@ import (
"github.com/panther-labs/panther/api/lambda/source/models"
pollermodels "github.com/panther-labs/panther/internal/compliance/snapshot_poller/models/poller"
awspoller "github.com/panther-labs/panther/internal/compliance/snapshot_poller/pollers/aws"
"github.com/panther-labs/panther/internal/log_analysis/datacatalog_updater/process"
"github.com/panther-labs/panther/internal/log_analysis/datacatalog_updater/datacatalog"
"github.com/panther-labs/panther/pkg/awsbatch/sqsbatch"
"github.com/panther-labs/panther/pkg/genericapi"
)
Expand Down Expand Up @@ -282,8 +283,14 @@ func createTables(integration *models.SourceIntegration) error {
return nil
}

m := process.CreateTablesMessage{
LogTypes: integration.RequiredLogTypes(),
client := datacatalog.Client{
SQSAPI: sqsClient,
QueueURL: env.DataCatalogUpdaterQueueURL,
}
return m.Send(sqsClient, env.DataCatalogUpdaterQueueURL)
logTypes := integration.RequiredLogTypes()
err := client.SendCreateTablesForLogTypes(context.TODO(), logTypes...)
if err != nil {
return errors.Wrap(err, "failed to create Glue tables")
}
return nil
}
18 changes: 3 additions & 15 deletions internal/core/source_api/api/put_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
awspoller "github.com/panther-labs/panther/internal/compliance/snapshot_poller/pollers/aws"
"github.com/panther-labs/panther/internal/core/source_api/ddb"
"github.com/panther-labs/panther/internal/core/source_api/ddb/modelstest"
"github.com/panther-labs/panther/internal/log_analysis/datacatalog_updater/process"
"github.com/panther-labs/panther/pkg/testutils"
)

Expand Down Expand Up @@ -274,7 +273,7 @@ func TestPutLogIntegrationUpdateSqsQueuePermissions(t *testing.T) {
QueueUrl: &env.LogProcessorQueueURL,
}
mockSQS.On("SetQueueAttributes", expectedSetAttributes).Return(&sqs.SetQueueAttributesOutput{}, nil).Once()
mockSQS.On("SendMessage", mock.Anything).Return(&sqs.SendMessageOutput{}, nil)
mockSQS.On("SendMessageWithContext", mock.Anything, mock.Anything).Return(&sqs.SendMessageOutput{}, nil)

out, err := apiTest.PutIntegration(&models.PutIntegrationInput{
PutIntegrationSettings: models.PutIntegrationSettings{
Expand All @@ -300,7 +299,7 @@ func TestPutLogIntegrationUpdateSqsQueuePermissionsFailure(t *testing.T) {
evaluateIntegrationFunc = func(_ API, _ *models.CheckIntegrationInput) (string, bool, error) { return "", true, nil }

mockSQS.On("GetQueueAttributes", mock.Anything).Return(&sqs.GetQueueAttributesOutput{}, errors.New("error")).Once()
mockSQS.On("SendMessage", mock.Anything).Return(&sqs.SendMessageOutput{}, nil)
mockSQS.On("SendMessageWithContext", mock.Anything, mock.Anything).Return(&sqs.SendMessageOutput{}, nil)

out, err := apiTest.PutIntegration(&models.PutIntegrationInput{
PutIntegrationSettings: models.PutIntegrationSettings{
Expand Down Expand Up @@ -344,18 +343,7 @@ func TestPutSqsIntegration(t *testing.T) {
// Create a new SQS queue - we are verifying the parameters below
mockSQS.On("CreateQueue", mock.Anything).Return(&sqs.CreateQueueOutput{}, nil).Once()

marshalled, err := jsoniter.MarshalToString(process.CreateTablesMessage{
LogTypes: []string{"AWS.CloudTrail"},
})
require.NoError(t, err)
msgInput := &sqs.SendMessageInput{
MessageBody: &marshalled,
QueueUrl: aws.String(""),
MessageAttributes: map[string]*sqs.MessageAttributeValue{
process.PantherMessageType: &process.CreateTableMessageAttribute,
},
}
mockSQS.On("SendMessage", msgInput).Return(&sqs.SendMessageOutput{}, nil)
mockSQS.On("SendMessageWithContext", mock.Anything, mock.Anything).Return(&sqs.SendMessageOutput{}, nil)

mockLambda.On("CreateEventSourceMapping", mock.Anything).Return(&lambda.EventSourceMappingConfiguration{}, nil)

Expand Down
10 changes: 6 additions & 4 deletions internal/core/source_api/api/update_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package api
*/

import (
"context"
"fmt"

"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/panther-labs/panther/api/lambda/source/models"
"github.com/panther-labs/panther/internal/core/source_api/ddb"
"github.com/panther-labs/panther/internal/log_analysis/datacatalog_updater/process"
"github.com/panther-labs/panther/internal/log_analysis/datacatalog_updater/datacatalog"
"github.com/panther-labs/panther/pkg/genericapi"
)

Expand Down Expand Up @@ -226,10 +227,11 @@ func updateTables(item *ddb.Integration, input *models.UpdateIntegrationSettings
return nil
}

m := process.CreateTablesMessage{
LogTypes: newLogTypes,
client := datacatalog.Client{
SQSAPI: sqsClient,
QueueURL: env.DataCatalogUpdaterQueueURL,
}
err := m.Send(sqsClient, env.DataCatalogUpdaterQueueURL)
err := client.SendCreateTablesForLogTypes(context.TODO(), newLogTypes...)
if err != nil {
return errors.Wrap(err, "failed to create Glue tables")
}
Expand Down
4 changes: 3 additions & 1 deletion internal/core/source_api/api/update_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
func TestUpdateIntegrationSettingsAwsScanType(t *testing.T) {
mockClient := &testutils.DynamoDBMock{}
dynamoClient = &ddb.DDB{Client: mockClient, TableName: "test"}
mockSQS := &testutils.SqsMock{}

// Mocking health check
evaluateIntegrationFunc = func(_ API, _ *models.CheckIntegrationInput) (string, bool, error) {
Expand All @@ -51,6 +52,7 @@ func TestUpdateIntegrationSettingsAwsScanType(t *testing.T) {
mockClient.On("GetItem", mock.Anything).Return(getResponse, nil).Once()
mockClient.On("PutItem", mock.Anything).Return(&dynamodb.PutItemOutput{}, nil).Once()
mockClient.On("Scan", mock.Anything).Return(&dynamodb.ScanOutput{}, nil).Once()
mockSQS.On("SendMessageWithContext", mock.Anything, mock.Anything).Return(&sqs.SendMessageOutput{}, nil)

result, err := apiTest.UpdateIntegrationSettings(&models.UpdateIntegrationSettingsInput{
IntegrationID: testIntegrationID,
Expand Down Expand Up @@ -91,7 +93,7 @@ func TestUpdateIntegrationSettingsAwsS3Type(t *testing.T) {
mockClient.On("PutItem", mock.Anything).Return(&dynamodb.PutItemOutput{}, nil).Once()
mockClient.On("Scan", mock.Anything).Return(&dynamodb.ScanOutput{}, nil).Once()
// Send message to create new log types
mockSqsClient.On("SendMessage", mock.Anything).Return(&sqs.SendMessageOutput{}, nil)
mockSqsClient.On("SendMessageWithContext", mock.Anything, mock.Anything).Return(&sqs.SendMessageOutput{}, nil)

result, err := apiTest.UpdateIntegrationSettings(&models.UpdateIntegrationSettingsInput{
S3Bucket: "test-bucket-1",
Expand Down
27 changes: 27 additions & 0 deletions internal/log_analysis/awsglue/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ package awsglue
*/

import (
"context"
"net/url"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/glue"
"github.com/aws/aws-sdk-go/service/glue/glueiface"
"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/panther-labs/panther/pkg/awsutils"
)

// Wrapper functions to reduce boiler-plate code in callers
Expand Down Expand Up @@ -157,3 +161,26 @@ func ParseS3URL(s3URL string) (bucket, key string, err error) {

return bucket, key, err
}

func EnsureDatabases(ctx context.Context, client glueiface.GlueAPI) (err error) {
for name, desc := range PantherDatabases {
if e := EnsureDatabase(ctx, client, name, desc); e != nil {
err = multierr.Append(err, e)
}
}
return
}

func EnsureDatabase(ctx context.Context, client glueiface.GlueAPI, name, description string) error {
createDatabaseInput := &glue.CreateDatabaseInput{
DatabaseInput: &glue.DatabaseInput{
Name: aws.String(name),
Description: aws.String(description),
},
}
_, err := client.CreateDatabaseWithContext(ctx, createDatabaseInput)
if err != nil && !awsutils.IsAnyError(err, glue.ErrCodeAlreadyExistsException) {
return err
}
return err
}
85 changes: 85 additions & 0 deletions internal/log_analysis/datacatalog_updater/datacatalog/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package datacatalog

/**
* Panther is a Cloud-Native SIEM for the Modern Security Team.
* Copyright (C) 2020 Panther Labs Inc
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import (
"context"

"github.com/aws/aws-lambda-go/lambdacontext"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
)

type Client struct {
QueueURL string
SQSAPI sqsiface.SQSAPI
}

func (c *Client) SendSyncDatabasePartitions(ctx context.Context, event *SyncDatabasePartitionsEvent) error {
syncEvent := *event
syncEvent.TraceID = traceIDFromContext(ctx, syncEvent.TraceID)
return sendEvent(ctx, c.SQSAPI, c.QueueURL, sqsTask{
SyncDatabasePartitions: &syncEvent,
})
}

func traceIDFromContext(ctx context.Context, traceID string) string {
if traceID == "" {
if lambdaCtx, ok := lambdacontext.FromContext(ctx); ok {
return lambdaCtx.AwsRequestID
}
}
return traceID
}

func (c *Client) SendSyncDatabase(ctx context.Context, traceID string, requiredLogTypes []string) error {
return sendEvent(ctx, c.SQSAPI, c.QueueURL, sqsTask{
SyncDatabase: &SyncDatabaseEvent{
TraceID: traceIDFromContext(ctx, traceID),
RequiredLogTypes: requiredLogTypes,
},
})
}

func (c *Client) SendCreateTablesForLogTypes(ctx context.Context, logTypes ...string) error {
return sendEvent(ctx, c.SQSAPI, c.QueueURL, sqsTask{
CreateTables: &CreateTablesEvent{
LogTypes: logTypes,
},
})
}

func sendEvent(ctx context.Context, sqsAPI sqsiface.SQSAPI, queueURL string, event sqsTask) error {
body, err := jsoniter.MarshalToString(event)
if err != nil {
err = errors.Wrapf(err, "failed to marshal %#v", event)
return err
}
input := &sqs.SendMessageInput{
MessageBody: aws.String(body),
QueueUrl: aws.String(queueURL),
}
if _, err := sqsAPI.SendMessageWithContext(ctx, input); err != nil {
return err
}
return nil
}
Loading

0 comments on commit 6c93eb6

Please sign in to comment.