Skip to content

Commit

Permalink
Merge branch 'main' into feature/kafka-pubsub-avro-caching
Browse files Browse the repository at this point in the history
  • Loading branch information
yaron2 authored Nov 26, 2024
2 parents 9a57158 + 913ba4c commit 411f170
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 84 deletions.
26 changes: 13 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ require (
github.com/apache/pulsar-client-go v0.11.0
github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230412142645-25003f6f083d
github.com/apache/thrift v0.13.0
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a
github.com/aws/aws-sdk-go v1.50.19
github.com/aws/aws-sdk-go-v2 v1.31.0
github.com/aws/aws-sdk-go-v2/config v1.27.39
github.com/aws/aws-sdk-go-v2/credentials v1.17.37
github.com/aws/aws-sdk-go-v2 v1.32.4
github.com/aws/aws-sdk-go-v2/config v1.28.2
github.com/aws/aws-sdk-go-v2/credentials v1.17.43
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.17.3
github.com/aws/rolesanywhere-credential-helper v1.0.4
Expand Down Expand Up @@ -180,16 +180,16 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.23.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.31.3 // indirect
github.com/aws/smithy-go v1.21.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
52 changes: 26 additions & 26 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -244,58 +244,58 @@ github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496/go.mod h1:o
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so=
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a h1:QFemvMGPnajaeRBkFc1HoEA7qzVjUv+rkYb1/ps1/UE=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI=
github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.50.19 h1:YSIDKRSkh/TW0RPWoocdLqtC/T5W6IGBVhFs6P7Qcac=
github.com/aws/aws-sdk-go v1.50.19/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/aws/aws-sdk-go-v2 v1.9.2/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2 v1.31.0 h1:3V05LbxTSItI5kUqNwhJrrrY1BAXxXt0sN0l72QmG5U=
github.com/aws/aws-sdk-go-v2 v1.31.0/go.mod h1:ztolYtaEUtdpf9Wftr31CJfLVjOnD/CVRkKOOYgF8hA=
github.com/aws/aws-sdk-go-v2 v1.32.4 h1:S13INUiTxgrPueTmrm5DZ+MiAo99zYzHEFh1UNkOxNE=
github.com/aws/aws-sdk-go-v2 v1.32.4/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5 h1:xDAuZTn4IMm8o1LnBZvmrL8JA1io4o3YWNXgohbf20g=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5/go.mod h1:wYSv6iDS621sEFLfKvpPE2ugjTuGlAG7iROg0hLOkfc=
github.com/aws/aws-sdk-go-v2/config v1.8.3/go.mod h1:4AEiLtAb8kLs7vgw2ZV3p2VZ1+hBavOc84hqxVNpCyw=
github.com/aws/aws-sdk-go-v2/config v1.27.39 h1:FCylu78eTGzW1ynHcongXK9YHtoXD5AiiUqq3YfJYjU=
github.com/aws/aws-sdk-go-v2/config v1.27.39/go.mod h1:wczj2hbyskP4LjMKBEZwPRO1shXY+GsQleab+ZXT2ik=
github.com/aws/aws-sdk-go-v2/config v1.28.2 h1:FLvWA97elBiSPdIol4CXfIAY1wlq3KzoSgkMuZSuSe8=
github.com/aws/aws-sdk-go-v2/config v1.28.2/go.mod h1:hNmQsKfUqpKz2yfnZUB60GCemPmeqAalVTui0gOxjAE=
github.com/aws/aws-sdk-go-v2/credentials v1.4.3/go.mod h1:FNNC6nQZQUuyhq5aE5c7ata8o9e4ECGmS4lAXC7o1mQ=
github.com/aws/aws-sdk-go-v2/credentials v1.17.37 h1:G2aOH01yW8X373JK419THj5QVqu9vKEwxSEsGxihoW0=
github.com/aws/aws-sdk-go-v2/credentials v1.17.37/go.mod h1:0ecCjlb7htYCptRD45lXJ6aJDQac6D2NlKGpZqyTG6A=
github.com/aws/aws-sdk-go-v2/credentials v1.17.43 h1:SEGdVOOE1Wyr2XFKQopQ5GYjym3nYHcphesdt78rNkY=
github.com/aws/aws-sdk-go-v2/credentials v1.17.43/go.mod h1:3aiza5kSyAE4eujSanOkSkAmX/RnVqslM+GRQ/Xvv4c=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.6.0/go.mod h1:gqlclDEZp4aqJOancXK6TN24aKhT0W0Ae9MHk3wzTMM=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 h1:C/d03NAmh8C4BZXhuRNboF/DqhBkBCeDiJDcaqIT5pA=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14/go.mod h1:7I0Ju7p9mCIdlrfS+JCgqcYD0VXz/N4yozsox+0o078=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 h1:woXadbf0c7enQ2UGCi8gW/WuKmE0xIzxBF/eD94jMKQ=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19/go.mod h1:zminj5ucw7w0r65bP6nhyOd3xL6veAUMc3ElGMoLVb4=
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10 h1:z6fAXB4HSuYjrE/P8RU3NdCaN+EPaeq/+80aisCjuF8=
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10/go.mod h1:PoPjOi7j+/DtKIGC58HRfcdWKBPYYXwdKnRG+po+hzo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 h1:kYQ3H1u0ANr9KEKlGs/jTLrBFPo8P8NaH/w7A01NeeM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18/go.mod h1:r506HmK5JDUh9+Mw4CfGJGSSoqIiLCndAuqXuhbv67Y=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 h1:Z7IdFUONvTcvS7YuhtVxN99v2cCoHRXOS4mTr0B/pUc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18/go.mod h1:DkKMmksZVVyat+Y+r1dEOgJEfUeA7UngIHWeKsi0yNc=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 h1:A2w6m6Tmr+BNXjDsr7M90zkWjsu4JXHwrzPg235STs4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23/go.mod h1:35EVp9wyeANdujZruvHiQUAo9E3vbhnIO1mTCAxMlY0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 h1:pgYW9FCabt2M25MoHYCfMrVY2ghiiBKYWUVXfwZs+sU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23/go.mod h1:c48kLgzO19wAu3CPkDWC28JbaJ+hfQlsdl7I2+oqIbk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.4/go.mod h1:ZcBrrI3zBKlhGFNYWvju0I3TR93I7YIgAfy82Fh4lcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/service/appconfig v1.4.2/go.mod h1:FZ3HkCe+b10uFZZkFdvf98LHW21k49W8o8J366lqVKY=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.17.3 h1:PtP2Zzf3uy94EsVOW+tB7gNt63fFZEHuS9IRWg5q250=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.17.3/go.mod h1:4zuvYEUJm0Vq8tb3gcb2sl04A9I1AA5DKAefbYPA4VM=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 h1:QFASJGfT8wMXtuP3D5CRmMjARHv9ZmzFUMJznHDOY3w=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5/go.mod h1:QdZ3OmoIjSX+8D1OPAzPxDfjXASbBMDsz9qvtyIhtik=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 h1:TToQNkvGguu209puTojY/ozlqy2d/SFNcoLIqTFi42g=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0/go.mod h1:0jp+ltwkf+SwG2fm/PKo8t4y8pJSgOCO4D8Lz3k0aHQ=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72HRZDLMtmVQiLG2tLfQcaWLCssELvGl+Zf2WVxMmR8=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 h1:Xbwbmk44URTiHNx6PNo0ujDE6ERlsCKJD3u1zfnzAPg=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20/go.mod h1:oAfOFzUB14ltPZj1rWwRc3d/6OgD76R8KlvU3EqM9Fg=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 h1:tHxQi/XHPK0ctd/wdOw0t7Xrc2OxcRCnVzv8lwWPu0c=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4/go.mod h1:4GQbF1vJzG60poZqWatZlhP31y8PGCCVTvIGPdaaYJ0=
github.com/aws/aws-sdk-go-v2/service/sso v1.4.2/go.mod h1:NBvT9R1MEF+Ud6ApJKM0G+IkPchKS7p7c2YPKwHmBOk=
github.com/aws/aws-sdk-go-v2/service/sso v1.23.3 h1:rs4JCczF805+FDv2tRhZ1NU0RB2H6ryAvsWPanAr72Y=
github.com/aws/aws-sdk-go-v2/service/sso v1.23.3/go.mod h1:XRlMvmad0ZNL+75C5FYdMvbbLkd6qiqz6foR1nA1PXY=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3 h1:S7EPdMVZod8BGKQQPTBK+FcX9g7bKR7c4+HxWqHP7Vg=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3/go.mod h1:FnvDM4sfa+isJ3kDXIzAB9GAwVSzFzSy97uZ3IsHo4E=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.4 h1:BqE3NRG6bsODh++VMKMsDmFuJTHrdD4rJZqHjDeF6XI=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.4/go.mod h1:wrMCEwjFPms+V86TCQQeOxQF/If4vT44FGIOFiMC2ck=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 h1:zcx9LiGWZ6i6pjdcoE9oXAB6mUdeyC36Ia/QEiIvYdg=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4/go.mod h1:Tp/ly1cTjRLGBBmNccFumbZ8oqpZlpdhFf80SrRh4is=
github.com/aws/aws-sdk-go-v2/service/sts v1.7.2/go.mod h1:8EzeIqfWt2wWT4rJVu3f21TfrhJ8AEMzVybRNSb/b4g=
github.com/aws/aws-sdk-go-v2/service/sts v1.31.3 h1:VzudTFrDCIDakXtemR7l6Qzt2+JYsVqo2MxBPt5k8T8=
github.com/aws/aws-sdk-go-v2/service/sts v1.31.3/go.mod h1:yMWe0F+XG0DkRZK5ODZhG7BEFYhLXi2dqGsv6tX0cgI=
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 h1:yDxvkz3/uOKfxnv8YhzOi9m+2OGIxF+on3KOISbK5IU=
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4/go.mod h1:9XEUty5v5UAsMiFOBJrNibZgwCeOma73jgGwwhgffa8=
github.com/aws/rolesanywhere-credential-helper v1.0.4 h1:kHIVVdyQQiFZoKBP+zywBdFilGCS8It+UvW5LolKbW8=
github.com/aws/rolesanywhere-credential-helper v1.0.4/go.mod h1:QVGNxlDlYhjR0/ZUee7uGl0hNChWidNpe2+GD87Buqk=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.21.0 h1:H7L8dtDRk0P1Qm6y0ji7MCYMQObJ5R9CRpyPhRUkLYA=
github.com/aws/smithy-go v1.21.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f h1:Pf0BjJDga7C98f0vhw+Ip5EaiE07S3lTKpIYPNS0nMo=
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4=
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
Expand Down
6 changes: 6 additions & 0 deletions pubsub/aws/snssqs/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type snsSqsMetadata struct {
AccountID string `mapstructure:"accountID"`
// processing concurrency mode
ConcurrencyMode pubsub.ConcurrencyMode `mapstructure:"concurrencyMode"`
// limits the number of concurrent goroutines
ConcurrencyLimit int `mapstructure:"concurrencyLimit"`
}

func maskLeft(s string) string {
Expand Down Expand Up @@ -130,6 +132,10 @@ func (s *snsSqs) getSnsSqsMetadata(meta pubsub.Metadata) (*snsSqsMetadata, error
return nil, err
}

if md.ConcurrencyLimit < 0 {
return nil, errors.New("concurrencyLimit must be greater than or equal to 0")
}

s.logger.Debug(md.hideDebugPrintedCredentials())

return md, nil
Expand Down
9 changes: 9 additions & 0 deletions pubsub/aws/snssqs/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ metadata:
default: '"parallel"'
example: '"single", "parallel"'
type: string
- name: concurrencyLimit
required: false
description: |
Defines the maximum number of concurrent workers handling messages.
This value is ignored when "concurrencyMode" is set to “single“.
To avoid limiting the number of concurrent workers set this to “0“.
type: number
default: '0'
example: '100'
- name: accountId
required: false
description: |
Expand Down
23 changes: 17 additions & 6 deletions pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,13 @@ func (s *snsSqs) consumeSubscription(ctx context.Context, queueInfo, deadLetters
WaitTimeSeconds: aws.Int64(s.metadata.MessageWaitTimeSeconds),
}

// sem is a semaphore used to control the concurrencyLimit.
// It is set only when we are in parallel mode and limit is > 0.
var sem chan (struct{}) = nil
if (s.metadata.ConcurrencyMode == pubsub.Parallel) && s.metadata.ConcurrencyLimit > 0 {
sem = make(chan struct{}, s.metadata.ConcurrencyLimit)
}

for {
// If the context is canceled, stop requesting messages
if ctx.Err() != nil {
Expand Down Expand Up @@ -629,33 +636,37 @@ func (s *snsSqs) consumeSubscription(ctx context.Context, queueInfo, deadLetters
}
s.logger.Debugf("%v message(s) received on queue %s", len(messageResponse.Messages), queueInfo.arn)

var wg sync.WaitGroup
for _, message := range messageResponse.Messages {
if err := s.validateMessage(ctx, message, queueInfo, deadLettersQueueInfo); err != nil {
s.logger.Errorf("message is not valid for further processing by the handler. error is: %v", err)
continue
}

f := func(message *sqs.Message) {
defer wg.Done()
if err := s.callHandler(ctx, message, queueInfo); err != nil {
s.logger.Errorf("error while handling received message. error is: %v", err)
}
}

wg.Add(1)
switch s.metadata.ConcurrencyMode {
case pubsub.Single:
f(message)
case pubsub.Parallel:
wg.Add(1)
// This is the back pressure mechanism.
// It will block until another goroutine frees a slot.
if sem != nil {
sem <- struct{}{}
}

go func(message *sqs.Message) {
defer wg.Done()
if sem != nil {
defer func() { <-sem }()
}

f(message)
}(message)
}
}
wg.Wait()
}
}

Expand Down
17 changes: 17 additions & 0 deletions pubsub/aws/snssqs/snssqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Test_getSnsSqsMetadata_AllConfiguration(t *testing.T) {
"consumerID": "consumer",
"Endpoint": "endpoint",
"concurrencyMode": string(pubsub.Single),
"concurrencyLimit": "42",
"accessKey": "a",
"secretKey": "s",
"sessionToken": "t",
Expand All @@ -68,6 +69,7 @@ func Test_getSnsSqsMetadata_AllConfiguration(t *testing.T) {
r.Equal("consumer", md.SqsQueueName)
r.Equal("endpoint", md.Endpoint)
r.Equal(pubsub.Single, md.ConcurrencyMode)
r.Equal(42, md.ConcurrencyLimit)
r.Equal("a", md.AccessKey)
r.Equal("s", md.SecretKey)
r.Equal("t", md.SessionToken)
Expand Down Expand Up @@ -105,6 +107,7 @@ func Test_getSnsSqsMetadata_defaults(t *testing.T) {
r.Equal("", md.SessionToken)
r.Equal("r", md.Region)
r.Equal(pubsub.Parallel, md.ConcurrencyMode)
r.Equal(0, md.ConcurrencyLimit)
r.Equal(int64(10), md.MessageVisibilityTimeout)
r.Equal(int64(10), md.MessageRetryLimit)
r.Equal(int64(2), md.MessageWaitTimeSeconds)
Expand Down Expand Up @@ -273,6 +276,20 @@ func Test_getSnsSqsMetadata_invalidMetadataSetup(t *testing.T) {
}}},
name: "invalid message concurrencyMode",
},
// invalid concurrencyLimit
{
metadata: pubsub.Metadata{Base: metadata.Base{Properties: map[string]string{
"consumerID": "consumer",
"Endpoint": "endpoint",
"AccessKey": "acctId",
"SecretKey": "secret",
"awsToken": "token",
"Region": "region",
"messageRetryLimit": "10",
"concurrencyLimit": "-1",
}}},
name: "invalid message concurrencyLimit",
},
}

l := logger.NewLogger("SnsSqs unit test")
Expand Down
26 changes: 13 additions & 13 deletions tests/certification/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,22 @@ require (
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.31.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.39 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.37 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a // indirect
github.com/aws/aws-sdk-go-v2 v1.32.4 // indirect
github.com/aws/aws-sdk-go-v2/config v1.28.2 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.43 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 // indirect
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.23.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.31.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.4 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 // indirect
github.com/aws/rolesanywhere-credential-helper v1.0.4 // indirect
github.com/aws/smithy-go v1.21.0 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.4.0 // indirect
Expand Down
Loading

0 comments on commit 411f170

Please sign in to comment.