Skip to content

Commit

Permalink
Merge branch 'master' into nr_consul_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
a-elsheikh authored Jan 6, 2023
2 parents f41da0e + cde02f4 commit c7d9e20
Show file tree
Hide file tree
Showing 27 changed files with 2,980 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
terraform {
required_version = ">=0.13"

required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 4.0"
}
}
}

variable "TIMESTAMP" {
type = string
description = "Timestamp of the github worklow run."
}

variable "UNIQUE_ID" {
type = string
description = "Unique Id of the github worklow run."
}

provider "aws" {
region = "us-east-1"
default_tags {
tags = {
Purpose = "AutomatedTesting"
Timestamp = "${var.TIMESTAMP}"
}
}
}

resource "aws_sns_topic" "multiTopic1" {
name = "sqsssnscerttest-q1-${var.UNIQUE_ID}"
tags = {
dapr-topic-name = "sqsssnscerttest-q1-${var.UNIQUE_ID}"
}
}

resource "aws_sqs_queue" "testQueue" {
name = "testQueue-${var.UNIQUE_ID}"
tags = {
dapr-queue-name = "testQueue-${var.UNIQUE_ID}"
}
}

resource "aws_sns_topic_subscription" "multiTopic1_testQueue" {
topic_arn = aws_sns_topic.multiTopic1.arn
protocol = "sqs"
endpoint = aws_sqs_queue.testQueue.arn
}

resource "aws_sqs_queue_policy" "testQueue_policy" {
queue_url = "${aws_sqs_queue.testQueue.id}"

policy = <<POLICY
{
"Version": "2012-10-17",
"Id": "sqspolicy",
"Statement": [{
"Sid": "Allow-SNS-SendMessage",
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": "sqs:SendMessage",
"Resource": "${aws_sqs_queue.testQueue.arn}",
"Condition": {
"ArnEquals": {
"aws:SourceArn": [
"${aws_sns_topic.multiTopic1.arn}"
]
}
}
}]
}
POLICY
}
27 changes: 23 additions & 4 deletions .github/workflows/certification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ jobs:
- component: state.azure.blobstorage
required-secrets: AzureBlobStorageContainer,AzureBlobStorageAccount, AzureBlobStorageAccessKey, AzureCertificationTenantId, AzureCertificationServicePrincipalClientId, AzureCertificationServicePrincipalClientSecret
- component: state.azure.cosmosdb
required-secrets: AzureCosmosDBMasterKey, AzureCosmosDBUrl, AzureCosmosDB, AzureCosmosDBCollection, AzureCertificationTenantId, AzureCertificationServicePrincipalClientId, AzureCertificationServicePrincipalClientSecret
required-secrets: AzureCosmosDBMasterKey, AzureCosmosDBUrl, AzureCosmosDB, AzureCosmosDBCollection, AzureCertificationTenantId, AzureCertificationServicePrincipalClientId, AzureCertificationServicePrincipalClientSecret
- component: pubsub.aws.snssqs
terraform-dir: pubsub/aws/snssqs
EOF
)
echo "cloud-components=$CRON_COMPONENTS" >> $GITHUB_OUTPUT
Expand Down Expand Up @@ -244,12 +246,18 @@ jobs:
uses: hashicorp/setup-terraform@v2
if: matrix.terraform-dir != ''

- name: Set AWS Region
if: contains(matrix.component, 'aws')
run: |
AWS_REGION="us-west-1"
echo "AWS_REGION=$AWS_REGION" >> $GITHUB_ENV
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_KEY }}
aws-region: us-west-1
aws-region: "${{ env.AWS_REGION }}"
if: matrix.terraform-dir != ''

- name: Terraform Init
Expand All @@ -275,6 +283,16 @@ jobs:
working-directory: "./.github/infrastructure/terraform/certification/${{ matrix.terraform-dir }}"
if: matrix.terraform-dir != ''
continue-on-error: true

- name: Create aws.snssqs specific variables
if: contains(matrix.component, 'snssqs')
run: |
PUBSUB_AWS_SNSSQS_QUEUE_1="sqsssnscerttest-q1-${{env.UNIQUE_ID}}"
echo "PUBSUB_AWS_SNSSQS_QUEUE_1=$PUBSUB_AWS_SNSSQS_QUEUE_1" >> $GITHUB_ENV
PUBSUB_AWS_SNSSQS_QUEUE_2="sqsssnscerttest-q2-${{env.UNIQUE_ID}}"
echo "PUBSUB_AWS_SNSSQS_QUEUE_2=$PUBSUB_AWS_SNSSQS_QUEUE_2" >> $GITHUB_ENV
AWS_REGION="us-east-1"
echo "AWS_REGION=$AWS_REGION" >> $GITHUB_ENV
- name: Set up Go
uses: actions/setup-go@v3
Expand All @@ -294,13 +312,14 @@ jobs:
go mod tidy -compat=1.19
git diff --exit-code ./go.mod
git diff --exit-code ./go.sum
- name: Run tests
continue-on-error: false
working-directory: ${{ env.TEST_PATH }}
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY }}
AWS_REGION: "${{ env.AWS_REGION }}"
run: |
echo "Running certification tests for ${{ matrix.component }} ... "
export GOLANG_PROTOBUF_REGISTRATION_CONFLICT=ignore
Expand Down
25 changes: 23 additions & 2 deletions pubsub/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ const (
sysPropIotHubEnqueuedTime = "iothub-enqueuedtime"
sysPropMessageID = "message-id"

// Metadata field to ensure all Event Hub properties pass through
requireAllProperties = "requireAllProperties"

defaultMessageRetentionInDays = 1
defaultPartitionCount = 1

Expand All @@ -86,7 +89,7 @@ const (
maxPartitionCount = int32(1024)
)

func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, handler pubsub.Handler) error {
func subscribeHandler(ctx context.Context, topic string, getAllProperties bool, e *eventhub.Event, handler pubsub.Handler) error {
res := pubsub.NewMessage{Data: e.Data, Topic: topic, Metadata: map[string]string{}}
if e.SystemProperties.SequenceNumber != nil {
res.Metadata[sysPropSequenceNumber] = strconv.FormatInt(*e.SystemProperties.SequenceNumber, 10)
Expand Down Expand Up @@ -124,6 +127,16 @@ func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, hand
if e.ID != "" {
res.Metadata[sysPropMessageID] = e.ID
}
// added properties if any ( includes application properties from iot-hub)
if getAllProperties {
if e.Properties != nil && len(e.Properties) > 0 {
for key, value := range e.Properties {
if str, ok := value.(string); ok {
res.Metadata[key] = str
}
}
}
}

return handler(ctx, &res)
}
Expand Down Expand Up @@ -622,6 +635,14 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.Su
return err
}

getAllProperties := false
if req.Metadata[requireAllProperties] != "" {
getAllProperties, err = strconv.ParseBool(req.Metadata[requireAllProperties])
if err != nil {
aeh.logger.Errorf("invalid value for metadata : %s . Error: %v.", requireAllProperties, err)
}
}

aeh.logger.Debugf("registering handler for topic %s", req.Topic)
_, err = processor.RegisterHandler(subscribeCtx,
func(_ context.Context, e *eventhub.Event) error {
Expand All @@ -631,7 +652,7 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.Su
retryerr := retry.NotifyRecover(func() error {
aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID)

return subscribeHandler(subscribeCtx, req.Topic, e, handler)
return subscribeHandler(subscribeCtx, req.Topic, getAllProperties, e, handler)
}, b, func(_ error, _ time.Duration) {
aeh.logger.Warnf("Error processing EventHubs event: %s/%s. Retrying...", req.Topic, e.ID)
}, func() {
Expand Down
10 changes: 8 additions & 2 deletions pubsub/azure/eventhubs/eventhubs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (

testStorageContainerName = "iothub-pubsub-integration-test"
testTopic = "integration-test-topic"
applicationProperty = "applicationProperty"
)

func createIotHubPubsubMetadata() pubsub.Metadata {
Expand Down Expand Up @@ -86,8 +87,10 @@ func testReadIotHubEvents(t *testing.T) {
}

req := pubsub.SubscribeRequest{
Topic: testTopic, // TODO: Handle Topic configuration after EventHubs pubsub rewrite #951
Metadata: map[string]string{},
Topic: testTopic, // TODO: Handle Topic configuration after EventHubs pubsub rewrite #951
Metadata: map[string]string{
"requireAllProperties": "true",
},
}
err = eh.Subscribe(context.Background(), req, handler)
assert.Nil(t, err)
Expand All @@ -114,6 +117,9 @@ func testReadIotHubEvents(t *testing.T) {
assert.Contains(t, r.Metadata, sysPropIotHubConnectionAuthMethod, "IoT device event missing: %s", sysPropIotHubConnectionAuthMethod)
assert.Contains(t, r.Metadata, sysPropIotHubEnqueuedTime, "IoT device event missing: %s", sysPropIotHubEnqueuedTime)
assert.Contains(t, r.Metadata, sysPropMessageID, "IoT device event missing: %s", sysPropMessageID)

// Verify sent custom application property is received in IoT Hub device event metadata
assert.Contains(t, r.Metadata, applicationProperty, "IoT device event missing: %s", applicationProperty)
}

eh.Close()
Expand Down
12 changes: 11 additions & 1 deletion pubsub/jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,17 @@ func (js *jetstreamPubSub) Init(metadata pubsub.Metadata) error {
}
js.l.Debugf("Connected to nats at %s", js.meta.natsURL)

js.jsc, err = js.nc.JetStream()
jsOpts := []nats.JSOpt{}

if js.meta.domain != "" {
jsOpts = append(jsOpts, nats.Domain(js.meta.domain))
}

if js.meta.apiPrefix != "" {
jsOpts = append(jsOpts, nats.APIPrefix(js.meta.apiPrefix))
}

js.jsc, err = js.nc.JetStream(jsOpts...)
if err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions pubsub/jetstream/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type metadata struct {
hearbeat time.Duration
deliverPolicy nats.DeliverPolicy
ackPolicy nats.AckPolicy
domain string
apiPrefix string
}

func parseMetadata(psm pubsub.Metadata) (metadata, error) {
Expand Down Expand Up @@ -143,6 +145,13 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
m.hearbeat = v
}

if domain := psm.Properties["domain"]; domain != "" {
m.domain = domain
}
if apiPrefix := psm.Properties["apiPrefix"]; apiPrefix != "" {
m.apiPrefix = apiPrefix
}

deliverPolicy := psm.Properties["deliverPolicy"]
switch deliverPolicy {
case "all", "":
Expand Down
4 changes: 4 additions & 0 deletions pubsub/jetstream/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestParseMetadata(t *testing.T) {
"memoryStorage": "true",
"rateLimit": "20000",
"hearbeat": "1s",
"domain": "hub",
},
}},
want: metadata{
Expand All @@ -70,6 +71,7 @@ func TestParseMetadata(t *testing.T) {
hearbeat: time.Second * 1,
deliverPolicy: nats.DeliverAllPolicy,
ackPolicy: nats.AckExplicitPolicy,
domain: "hub",
},
expectErr: false,
},
Expand All @@ -95,6 +97,7 @@ func TestParseMetadata(t *testing.T) {
"deliverPolicy": "sequence",
"startSequence": "5",
"ackPolicy": "all",
"apiPrefix": "HUB",
},
}},
want: metadata{
Expand All @@ -116,6 +119,7 @@ func TestParseMetadata(t *testing.T) {
token: "myToken",
deliverPolicy: nats.DeliverByStartSequencePolicy,
ackPolicy: nats.AckAllPolicy,
apiPrefix: "HUB",
},
expectErr: false,
},
Expand Down
11 changes: 10 additions & 1 deletion pubsub/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,16 @@ func (m *mqttPubSub) Publish(_ context.Context, req *pubsub.PublishRequest) erro
// m.logger.Debugf("mqtt publishing topic %s with data: %v", req.Topic, req.Data)
m.logger.Debugf("mqtt publishing topic %s", req.Topic)

token := m.producer.Publish(req.Topic, m.metadata.qos, m.metadata.retain, req.Data)
retain := m.metadata.retain
if val, ok := req.Metadata[mqttRetain]; ok && val != "" {
var err error
retain, err = strconv.ParseBool(val)
if err != nil {
return fmt.Errorf("mqtt invalid retain %s, %s", val, err)
}
}

token := m.producer.Publish(req.Topic, m.metadata.qos, retain, req.Data)
t := time.NewTimer(defaultWait)
defer func() {
if !t.Stop() {
Expand Down
Loading

0 comments on commit c7d9e20

Please sign in to comment.