Skip to content

Commit

Permalink
Fix regression in glue:sync for testing existence of data in a partit…
Browse files Browse the repository at this point in the history
…ion (#965)
  • Loading branch information
rleighton authored Jun 2, 2020
1 parent 89776a6 commit 18f8fd4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
7 changes: 3 additions & 4 deletions internal/log_analysis/awsglue/glue_timebin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package awsglue

import (
"fmt"
"net/url"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -97,16 +96,16 @@ func (tb GlueTableTimebin) PartitionS3PathFromTime(t time.Time) (s3Path string)

// PartitionHasData checks if there is at least 1 s3 object in the partition
func (tb GlueTableTimebin) PartitionHasData(client s3iface.S3API, t time.Time, tableOutput *glue.GetTableOutput) (bool, error) {
location, err := url.Parse(*tableOutput.Table.StorageDescriptor.Location)
bucket, prefix, err := ParseS3URL(*tableOutput.Table.StorageDescriptor.Location)
if err != nil {
return false, errors.Wrapf(err, "Cannot parse s3 path: %s",
*tableOutput.Table.StorageDescriptor.Location)
}

// list files w/pagination
inputParams := &s3.ListObjectsV2Input{
Bucket: aws.String(location.Host),
Prefix: aws.String(tb.PartitionS3PathFromTime(t)),
Bucket: aws.String(bucket),
Prefix: aws.String(prefix + tb.PartitionS3PathFromTime(t)),
MaxKeys: aws.Int64(1), // look for at least 1
}
var hasData bool
Expand Down
28 changes: 22 additions & 6 deletions internal/log_analysis/awsglue/table_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import (
"github.com/panther-labs/panther/pkg/testutils"
)

const (
metadataTestBucket = "testbucket"
metadataTestTablePrefix = "logs/table/"
)

var (
refTime = time.Date(2020, 1, 3, 1, 1, 1, 0, time.UTC)
nonAWSError = errors.New("nonAWSError") // nolint:golint
Expand All @@ -51,7 +56,7 @@ var (

testStorageDescriptor = &glue.StorageDescriptor{
Columns: testColumns,
Location: aws.String("s3://testbucket/logs/table"),
Location: aws.String("s3://" + metadataTestBucket + "/" + metadataTestTablePrefix),
SerdeInfo: &glue.SerDeInfo{
SerializationLibrary: aws.String("org.openx.data.jsonserde.JsonSerDe"),
Parameters: map[string]*string{
Expand Down Expand Up @@ -221,13 +226,12 @@ func TestSyncPartitionsPartitionDoesntExistAndNoData(t *testing.T) {
}

func TestSyncPartitionsPartitionDoesntExistAndHasData(t *testing.T) {
var startDate time.Time // default unset
gm := NewGlueTableMetadata(models.LogData, "Test.Logs", "Description", GlueTableHourly, partitionTestEvent{})

// test not exists error in GetPartition (should not fail)
glueClient := &testutils.GlueMock{}
glueClient.On("GetTable", mock.Anything).Return(testGetTableOutput, nil).Once()
glueClient.On("GetPartition", mock.Anything).Return(testGetPartitionOutput, entityNotFoundError).Times(24)
// confirm correct listing calls for some data found in S3
s3Client := &testutils.S3Mock{}
page := &s3.ListObjectsV2Output{
Contents: []*s3.Object{
Expand All @@ -236,9 +240,21 @@ func TestSyncPartitionsPartitionDoesntExistAndHasData(t *testing.T) {
},
},
}
s3Client.On("ListObjectsV2Pages", mock.Anything, mock.Anything).Return(page, nil).Times(24) // some data found in S3
glueClient.On("CreatePartition", mock.Anything).Return(testCreatePartitionOutput, nil).Times(24) // should create partitions
err := gm.SyncPartitions(glueClient, s3Client, startDate)
now := time.Now().UTC()
today := now.Truncate(time.Hour * 24)
endToday := now.Truncate(time.Hour * 24).Add(time.Hour * 23)
for partitionTime := today; !partitionTime.After(endToday); partitionTime = partitionTime.Add(time.Hour) {
expectedListPageInput := s3.ListObjectsV2Input{
Bucket: aws.String(metadataTestBucket),
Prefix: aws.String(metadataTestTablePrefix + GlueTableHourly.PartitionS3PathFromTime(partitionTime)),
MaxKeys: aws.Int64(1),
}
glueClient.On("GetPartition", mock.Anything).Return(testGetPartitionOutput, entityNotFoundError)
s3Client.On("ListObjectsV2Pages", &expectedListPageInput, mock.Anything).Return(page, nil)
glueClient.On("CreatePartition", mock.Anything).Return(testCreatePartitionOutput, nil)
}

err := gm.SyncPartitions(glueClient, s3Client, today)
assert.NoError(t, err)
glueClient.AssertExpectations(t)
s3Client.AssertExpectations(t)
Expand Down

0 comments on commit 18f8fd4

Please sign in to comment.