Skip to content

Commit

Permalink
Merge branch 'main' into cloudeventsgh-1026-expose-publishSettings
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesBLewis authored Jul 5, 2024
2 parents 6c36c4b + 8efefb0 commit 912a678
Show file tree
Hide file tree
Showing 51 changed files with 1,115 additions and 219 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @cloudevents/sdk-go-maintainers
2 changes: 1 addition & 1 deletion .github/workflows/go-lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:

- name: Go Lint on ./v2
if: steps.golangci_configuration.outputs.files_exists == 'true'
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@v6
with:
version: v1.54
working-directory: v2
Expand Down
4 changes: 2 additions & 2 deletions docs/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ GEM
rb-fsevent (~> 0.10, >= 0.10.3)
rb-inotify (~> 0.9, >= 0.9.10)
mercenary (0.3.6)
mini_portile2 (2.8.5)
mini_portile2 (2.8.6)
minima (2.5.1)
jekyll (>= 3.5, < 5.0)
jekyll-feed (~> 0.9)
jekyll-seo-tag (~> 2.1)
minitest (5.17.0)
multipart-post (2.1.1)
nokogiri (1.16.2)
nokogiri (1.16.5)
mini_portile2 (~> 2.8.2)
racc (~> 1.4)
octokit (4.18.0)
Expand Down
2 changes: 1 addition & 1 deletion observability/opencensus/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/net v0.23.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand Down
8 changes: 4 additions & 4 deletions observability/opencensus/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -97,11 +97,11 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
23 changes: 19 additions & 4 deletions protocol/amqp/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func NewMessage(message *amqp.Message, receiver *amqp.Receiver) *Message {
return &Message{AMQP: message, AMQPrcv: receiver, format: fmt, version: vn}
}

var _ binding.Message = (*Message)(nil)
var _ binding.MessageMetadataReader = (*Message)(nil)
var (
_ binding.Message = (*Message)(nil)
_ binding.MessageMetadataReader = (*Message)(nil)
)

func getSpecVersion(message *amqp.Message) spec.Version {
if sv, ok := message.ApplicationProperties[specs.PrefixedSpecVersionName()]; ok {
Expand All @@ -74,7 +76,8 @@ func (m *Message) ReadEncoding() binding.Encoding {

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.format != nil {
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.AMQP.GetData()))
data := m.getAmqpData()
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(data))
}
return binding.ErrNotStructured
}
Expand Down Expand Up @@ -106,7 +109,7 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter)
}
}

data := m.AMQP.GetData()
data := m.getAmqpData()
if len(data) != 0 { // Some data
err = encoder.SetData(bytes.NewBuffer(data))
if err != nil {
Expand Down Expand Up @@ -137,3 +140,15 @@ func (m *Message) Finish(err error) error {
}
return m.AMQPrcv.AcceptMessage(context.Background(), m.AMQP)
}

// fixes: github.com/cloudevents/spec/issues/1275
func (m *Message) getAmqpData() []byte {
var data []byte
amqpData := m.AMQP.Data

// TODO: replace with slices.Concat once go mod bumped to 1.22
for idx := range amqpData {
data = append(data, amqpData[idx]...)
}
return data
}
55 changes: 55 additions & 0 deletions protocol/amqp/v2/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,58 @@ func TestNewMessage_message_unknown(t *testing.T) {
got := NewMessage(message, &rcv)
require.Equal(t, binding.EncodingUnknown, got.ReadEncoding())
}

func TestMessage_getAmqpData(t *testing.T) {
tests := []struct {
name string
message *amqp.Message
want []byte
}{
{
name: "nil data",
message: amqp.NewMessage(nil),
want: nil,
},
{
name: "empty string",
message: amqp.NewMessage([]byte(`""`)),
want: []byte(`""`),
},
{
name: "simple string",
message: amqp.NewMessage([]byte("hello world")),
want: []byte("hello world"),
},
{
name: "multiple data with simple strings",
message: &amqp.Message{Data: [][]byte{
[]byte("hello"),
[]byte(" "),
[]byte("world"),
}},
want: []byte("hello world"),
},
{
name: "multiple data to build JSON array",
message: &amqp.Message{Data: [][]byte{
[]byte("["),
[]byte("Foo"),
[]byte(","),
[]byte("Bar"),
[]byte(","),
[]byte("Baz"),
[]byte("]"),
}},
want: []byte("[Foo,Bar,Baz]"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Message{
AMQP: tt.message,
}
got := m.getAmqpData()
require.Equal(t, tt.want, got)
})
}
}
5 changes: 3 additions & 2 deletions protocol/kafka_confluent/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"strconv"
"strings"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

const (
prefix = "ce-"
prefix = "ce_"
contentTypeKey = "content-type"
)

Expand Down
32 changes: 16 additions & 16 deletions protocol/kafka_confluent/v2/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ var (
TopicPartition: topicPartition,
Value: []byte("hello world!"),
Headers: mapToKafkaHeaders(map[string]string{
"ce-type": testEvent.Type(),
"ce-source": testEvent.Source(),
"ce-id": testEvent.ID(),
"ce-time": test.Timestamp.String(),
"ce-specversion": "1.0",
"ce-dataschema": test.Schema.String(),
"ce-datacontenttype": "text/json",
"ce-subject": "receiverTopic",
"ce_type": testEvent.Type(),
"ce_source": testEvent.Source(),
"ce_id": testEvent.ID(),
"ce_time": test.Timestamp.String(),
"ce_specversion": "1.0",
"ce_dataschema": test.Schema.String(),
"ce_datacontenttype": "text/json",
"ce_subject": "receiverTopic",
"exta": "someext",
}),
}
Expand Down Expand Up @@ -89,14 +89,14 @@ func TestNewMessage(t *testing.T) {
TopicPartition: topicPartition,
Value: nil,
Headers: mapToKafkaHeaders(map[string]string{
"ce-type": testEvent.Type(),
"ce-source": testEvent.Source(),
"ce-id": testEvent.ID(),
"ce-time": test.Timestamp.String(),
"ce-specversion": "1.0",
"ce-dataschema": test.Schema.String(),
"ce-datacontenttype": "text/json",
"ce-subject": "receiverTopic",
"ce_type": testEvent.Type(),
"ce_source": testEvent.Source(),
"ce_id": testEvent.ID(),
"ce_time": test.Timestamp.String(),
"ce_specversion": "1.0",
"ce_dataschema": test.Schema.String(),
"ce_datacontenttype": "text/json",
"ce_subject": "receiverTopic",
}),
},
expectedEncoding: binding.EncodingBinary,
Expand Down
20 changes: 10 additions & 10 deletions protocol/kafka_confluent/v2/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// Option is the function signature required to be considered an kafka_confluent.Option.
type Option func(*Protocol) error

// WithConfigMap sets the configMap to init the kafka client. This option is not required.
// WithConfigMap sets the configMap to init the kafka client.
func WithConfigMap(config *kafka.ConfigMap) Option {
return func(p *Protocol) error {
if config == nil {
Expand All @@ -26,7 +26,7 @@ func WithConfigMap(config *kafka.ConfigMap) Option {
}
}

// WithSenderTopic sets the defaultTopic for the kafka.Producer. This option is not required.
// WithSenderTopic sets the defaultTopic for the kafka.Producer.
func WithSenderTopic(defaultTopic string) Option {
return func(p *Protocol) error {
if defaultTopic == "" {
Expand All @@ -37,7 +37,7 @@ func WithSenderTopic(defaultTopic string) Option {
}
}

// WithReceiverTopics sets the topics for the kafka.Consumer. This option is not required.
// WithReceiverTopics sets the topics for the kafka.Consumer.
func WithReceiverTopics(topics []string) Option {
return func(p *Protocol) error {
if topics == nil {
Expand All @@ -48,7 +48,7 @@ func WithReceiverTopics(topics []string) Option {
}
}

// WithRebalanceCallBack sets the callback for rebalancing of the consumer group. This option is not required.
// WithRebalanceCallBack sets the callback for rebalancing of the consumer group.
func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option {
return func(p *Protocol) error {
if rebalanceCb == nil {
Expand All @@ -59,15 +59,15 @@ func WithRebalanceCallBack(rebalanceCb kafka.RebalanceCb) Option {
}
}

// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout. This option is not required.
// WithPollTimeout sets timeout of the consumer polling for message or events, return nil on timeout.
func WithPollTimeout(timeoutMs int) Option {
return func(p *Protocol) error {
p.consumerPollTimeout = timeoutMs
return nil
}
}

// WithSender set a kafka.Producer instance to init the client directly. This option is not required.
// WithSender set a kafka.Producer instance to init the client directly.
func WithSender(producer *kafka.Producer) Option {
return func(p *Protocol) error {
if producer == nil {
Expand All @@ -78,15 +78,15 @@ func WithSender(producer *kafka.Producer) Option {
}
}

// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled. This option is not required.
// WithErrorHandler provide a func on how to handle the kafka.Error which the kafka.Consumer has polled.
func WithErrorHandler(handler func(ctx context.Context, err kafka.Error)) Option {
return func(p *Protocol) error {
p.consumerErrorHandler = handler
return nil
}
}

// WithSender set a kafka.Consumer instance to init the client directly. This option is not required.
// WithReceiver set a kafka.Consumer instance to init the client directly.
func WithReceiver(consumer *kafka.Consumer) Option {
return func(p *Protocol) error {
if consumer == nil {
Expand All @@ -97,12 +97,12 @@ func WithReceiver(consumer *kafka.Consumer) Option {
}
}

// Opaque key type used to store topicPartitionOffsets: assign them from ctx. This option is not required.
// Opaque key type used to store topicPartitionOffsets: assign them from ctx.
type topicPartitionOffsetsType struct{}

var offsetKey = topicPartitionOffsetsType{}

// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from. This option is not required.
// WithTopicPartitionOffsets will set the positions where the consumer starts consuming from.
func WithTopicPartitionOffsets(ctx context.Context, topicPartitionOffsets []kafka.TopicPartition) context.Context {
if len(topicPartitionOffsets) == 0 {
panic("the topicPartitionOffsets cannot be empty")
Expand Down
Loading

0 comments on commit 912a678

Please sign in to comment.