diff --git a/sdk/azcore/CHANGELOG.md b/sdk/azcore/CHANGELOG.md index 06a1f1d82ff9..c6c833fe5b29 100644 --- a/sdk/azcore/CHANGELOG.md +++ b/sdk/azcore/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- `messaging/CloudEvent` allows you to serialize/deserialize CloudEvents, as described in the CloudEvents 1.0 specification: [link](https://github.com/cloudevents/spec) + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/azcore/messaging/cloud_event.go b/sdk/azcore/messaging/cloud_event.go new file mode 100644 index 000000000000..22d5855a0dd3 --- /dev/null +++ b/sdk/azcore/messaging/cloud_event.go @@ -0,0 +1,291 @@ +// Copyright 2017 Microsoft Corporation. All rights reserved. +// Use of this source code is governed by an MIT +// license that can be found in the LICENSE file. + +// Package messaging contains types used across messaging packages. +package messaging + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" +) + +// CloudEvent represents an event conforming to the CloudEvents 1.0 spec. +// See here for more details: https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md +type CloudEvent struct { + // + // REQUIRED fields + // + + // ID identifies the event. Producers MUST ensure that source + id is unique for each distinct event. If a duplicate + // event is re-sent (e.g. due to a network error) it MAY have the same id. Consumers MAY assume that Events with + // identical source and id are duplicates. + ID string + + // Source identifies the context in which an event happened. + Source string + + // SpecVersion is the version of the CloudEvents specification which the event uses. + SpecVersion string + + // Type contains a value describing the type of event related to the originating occurrence. + Type string + + // + // OPTIONAL fields + // + + // Data is the payload for the event. + // * []byte will be serialized and deserialized as []byte. + // * Any other type will be serialized to a JSON object and deserialized into + // a [json.RawMessage]. + // + // To deserialize a [json.RawMessage] into your chosen type: + // + // var yourData *YourType + // json.Unmarshal(cloudEvent.Data.(json.RawMessage), &yourData) + // + Data any + + // DataContentType is the content type of [Data] value (ex: "text/xml") + DataContentType *string + + // DataSchema identifies the schema that Data adheres to. + DataSchema *string + + // Extensions are attributes that are serialized as siblings to attributes like Data. + Extensions map[string]any + + // Subject of the event, in the context of the event producer (identified by Source). + Subject *string + + // Time represents the time this event occurred. + Time *time.Time +} + +// CloudEventOptions are options for the [NewCloudEvent] function. +type CloudEventOptions struct { + // DataContentType is the content type of [Data] value (ex: "text/xml") + DataContentType *string + + // DataSchema identifies the schema that Data adheres to. + DataSchema *string + + // Extensions are attributes that are serialized as siblings to attributes like Data. + Extensions map[string]any + + // Subject of the event, in the context of the event producer (identified by Source). + Subject *string + + // Time represents the time this event occurred. + // Defaults to time.Now().UTC() + Time *time.Time +} + +// NewCloudEvent creates a CloudEvent. +// - source - Identifies the context in which an event happened. The combination of id and source must be unique +// for each distinct event. +// - eventType - Type of event related to the originating occurrence. +// - data - data to be added to the event. Can be a []byte, or any JSON serializable type, or nil. +// - options - additional fields that are not required. +func NewCloudEvent(source string, eventType string, data any, options *CloudEventOptions) (CloudEvent, error) { + if source == "" { + return CloudEvent{}, errors.New("source cannot be empty") + } + + if eventType == "" { + return CloudEvent{}, errors.New("eventType cannot be empty") + } + + id, err := uuid.New() + + if err != nil { + return CloudEvent{}, err + } + + ce := CloudEvent{ + ID: id.String(), + Source: source, + SpecVersion: "1.0", + Type: eventType, + + // optional but probably always filled in. + Data: data, + } + + if options != nil { + ce.DataContentType = options.DataContentType + ce.DataSchema = options.DataSchema + ce.Extensions = options.Extensions + ce.Subject = options.Subject + + ce.Time = options.Time + } + + if ce.Time == nil { + ce.Time = to.Ptr(time.Now().UTC()) + } + + return ce, nil +} + +// MarshalJSON implements the json.Marshaler interface for CloudEvent. +func (ce CloudEvent) MarshalJSON() ([]byte, error) { + m := map[string]any{ + "id": ce.ID, + "source": ce.Source, + "specversion": ce.SpecVersion, + "type": ce.Type, + } + + if ce.Data != nil { + bytes, isBytes := ce.Data.([]byte) + + if isBytes { + m["data_base64"] = base64.StdEncoding.EncodeToString(bytes) + } else { + m["data"] = ce.Data + } + } + + if ce.DataContentType != nil { + m["datacontenttype"] = ce.DataContentType + } + + if ce.DataSchema != nil { + m["dataschema"] = ce.DataSchema + } + + for k, v := range ce.Extensions { + m[k] = v + } + + if ce.Subject != nil { + m["subject"] = ce.Subject + } + + if ce.Time != nil { + m["time"] = ce.Time.Format(time.RFC3339Nano) + } + + return json.Marshal(m) +} + +func getValue[T any](k string, rawV any, dest *T) error { + v, ok := rawV.(T) + + if !ok { + var t T + return fmt.Errorf("field %q is a %T, but should be %T", k, rawV, t) + } + + *dest = v + return nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface for CloudEvent. +func (ce *CloudEvent) UnmarshalJSON(data []byte) error { + var m map[string]json.RawMessage + + if err := json.Unmarshal(data, &m); err != nil { + return err + } + + for k, raw := range m { + if err := updateFieldFromValue(ce, k, raw); err != nil { + return fmt.Errorf("failed to deserialize %q: %w", k, err) + } + } + + if ce.ID == "" { + return errors.New("required field 'id' was not present, or was empty") + } + + if ce.Source == "" { + return errors.New("required field 'source' was not present, or was empty") + } + + if ce.SpecVersion == "" { + return errors.New("required field 'specversion' was not present, or was empty") + } + + if ce.Type == "" { + return errors.New("required field 'type' was not present, or was empty") + } + + return nil +} + +func updateFieldFromValue(ce *CloudEvent, k string, raw json.RawMessage) error { + switch k { + // + // required attributes + // + case "id": + return json.Unmarshal(raw, &ce.ID) + case "source": + return json.Unmarshal(raw, &ce.Source) + case "specversion": + return json.Unmarshal(raw, &ce.SpecVersion) + case "type": + return json.Unmarshal(raw, &ce.Type) + // + // optional attributes + // + case "data": + // let the user deserialize so they can put it into their own native type. + ce.Data = raw + case "datacontenttype": + return json.Unmarshal(raw, &ce.DataContentType) + case "dataschema": + return json.Unmarshal(raw, &ce.DataSchema) + case "data_base64": + var base64Str string + if err := json.Unmarshal(raw, &base64Str); err != nil { + return err + } + + bytes, err := base64.StdEncoding.DecodeString(base64Str) + + if err != nil { + return err + } + + ce.Data = bytes + case "subject": + return json.Unmarshal(raw, &ce.Subject) + case "time": + var timeStr string + if err := json.Unmarshal(raw, &timeStr); err != nil { + return err + } + + tm, err := time.Parse(time.RFC3339Nano, timeStr) + + if err != nil { + return err + } + + ce.Time = &tm + default: + // https: //github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#extension-context-attributes + if ce.Extensions == nil { + ce.Extensions = map[string]any{} + } + + var v any + if err := json.Unmarshal(raw, &v); err != nil { + return err + } + + ce.Extensions[k] = v + } + + return nil +} diff --git a/sdk/azcore/messaging/cloud_event_test.go b/sdk/azcore/messaging/cloud_event_test.go new file mode 100644 index 000000000000..1cd168ad1471 --- /dev/null +++ b/sdk/azcore/messaging/cloud_event_test.go @@ -0,0 +1,228 @@ +// Copyright 2017 Microsoft Corporation. All rights reserved. +// Use of this source code is governed by an MIT +// license that can be found in the LICENSE file. + +package messaging + +import ( + "encoding/json" + "io/ioutil" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/stretchr/testify/require" +) + +func TestCloudEvent_Minimum(t *testing.T) { + e, err := NewCloudEvent("source", "eventType", nil, nil) + require.NoError(t, err) + + require.NotEmpty(t, e) + + require.NotEmpty(t, e.ID) + require.GreaterOrEqual(t, time.Since(*e.Time), time.Duration(0)) + + require.Equal(t, CloudEvent{ + ID: e.ID, + Source: "source", + SpecVersion: "1.0", + Time: e.Time, + Type: "eventType", + }, e) + + actualCE := roundTrip(t, e) + + require.Equal(t, CloudEvent{ + ID: e.ID, + Source: "source", + SpecVersion: "1.0", + Time: e.Time, + Type: "eventType", + }, *actualCE) +} + +func TestCloudEventDefaultToTimeNowUTC(t *testing.T) { + ce, err := NewCloudEvent("source", "type", nil, nil) + require.NoError(t, err) + require.NotEmpty(t, ce.Time) +} + +func TestCloudEventJSONData(t *testing.T) { + data := map[string]string{ + "randomData": "hello", + } + ce, err := NewCloudEvent("source", "type", data, nil) + require.NoError(t, err) + require.Equal(t, data, ce.Data) + + // The types change here because the map is ultimately treated as + // a JSON object, which means the type ends up being map[string]any{} + // when deserialized. + actualCE := roundTrip(t, ce) + + var dest *map[string]string + require.NoError(t, json.Unmarshal(actualCE.Data.(json.RawMessage), &dest)) + + require.Equal(t, data, *dest) +} + +func TestCloudEventUnmarshalFull(t *testing.T) { + tm, err := time.Parse(time.RFC3339, "2023-06-16T02:54:01Z") + require.NoError(t, err) + + ce, err := NewCloudEvent("source", "type", []byte{1, 2, 3}, &CloudEventOptions{ + DataContentType: to.Ptr("data content type"), + DataSchema: to.Ptr("microsoft.com/dataschema"), + Extensions: map[string]any{ + "extstr": "extstring", + "extnum": float64(1), + "extbool": true, + "exturi": "http://microsoft.com", + }, + Subject: to.Ptr("subject"), + Time: &tm, + }) + require.NoError(t, err) + require.NotEmpty(t, ce.ID) + require.NotEmpty(t, ce.Time) + + actualCE := roundTrip(t, ce) + + require.NotEmpty(t, actualCE.ID) + require.Equal(t, &CloudEvent{ + ID: ce.ID, + Source: "source", + Subject: to.Ptr("subject"), + SpecVersion: "1.0", + Time: &tm, + Type: "type", + DataSchema: to.Ptr("microsoft.com/dataschema"), + DataContentType: to.Ptr("data content type"), + Data: []byte{1, 2, 3}, + Extensions: map[string]any{ + "extstr": "extstring", + "extnum": float64(1), + "extbool": true, + "exturi": "http://microsoft.com", + }, + }, actualCE) +} + +func TestCloudEventUnmarshalFull_InteropWithPython(t *testing.T) { + // this event is a Python serialized CloudEvent + text, err := ioutil.ReadFile("testdata/cloudevent_binary_with_extensions.json") + require.NoError(t, err) + + var ce *CloudEvent + + err = json.Unmarshal(text, &ce) + require.NoError(t, err) + + tm, err := time.Parse(time.RFC3339, "2023-06-16T02:54:01.470515Z") + require.NoError(t, err) + + require.Equal(t, &CloudEvent{ + ID: "2de93014-a793-4170-88f4-1ef74002dfc9", + Source: "source", + Subject: to.Ptr("subject"), + SpecVersion: "1.0", + Time: &tm, + Type: "type", + DataSchema: to.Ptr("microsoft.com/dataschema"), + DataContentType: to.Ptr("data content type"), + Data: []byte{1, 2, 3}, + Extensions: map[string]any{ + "extstr": "extstring", + "extnum": float64(1), + "extbool": true, + "exturi": "http://microsoft.com", + }, + }, ce) +} + +func TestCloudEventUnmarshalRequiredFieldsOnly(t *testing.T) { + text, err := ioutil.ReadFile("testdata/cloudevent_required_only.json") + require.NoError(t, err) + + var ce *CloudEvent + + err = json.Unmarshal(text, &ce) + require.NoError(t, err) + + require.Equal(t, &CloudEvent{ + ID: "2de93014-a793-4170-88f4-1ef74002dfc9", + Source: "source", + SpecVersion: "1.0", + Type: "type", + }, ce) +} + +func TestCloudEventUnmarshalInvalidEvents(t *testing.T) { + var ce *CloudEvent + + err := json.Unmarshal([]byte("{}"), &ce) + require.EqualError(t, err, "required field 'id' was not present, or was empty") + + err = json.Unmarshal([]byte(`{"id": "hello"}`), &ce) + require.EqualError(t, err, "required field 'source' was not present, or was empty") + + err = json.Unmarshal([]byte(`{"id": "hello", "source": "hello"}`), &ce) + require.EqualError(t, err, "required field 'specversion' was not present, or was empty") + + err = json.Unmarshal([]byte(`{"id": "hello", "source": "hello", "specversion": "1.0"}`), &ce) + require.EqualError(t, err, "required field 'type' was not present, or was empty") + + err = json.Unmarshal([]byte("invalid-json"), &ce) + require.EqualError(t, err, "invalid character 'i' looking for beginning of value") + + err = json.Unmarshal([]byte("[]"), &ce) + require.EqualError(t, err, "json: cannot unmarshal array into Go value of type map[string]json.RawMessage") + + err = json.Unmarshal([]byte(`{"id":100}`), &ce) + require.EqualError(t, err, `failed to deserialize "id": json: cannot unmarshal number into Go value of type string`) + + err = json.Unmarshal([]byte(`{"data_base64": 1}`), &ce) + require.EqualError(t, err, `failed to deserialize "data_base64": json: cannot unmarshal number into Go value of type string`) + + err = json.Unmarshal([]byte(`{"data_base64": "not-base-64"}`), &ce) + require.EqualError(t, err, `failed to deserialize "data_base64": illegal base64 data at input byte 3`) + + err = json.Unmarshal([]byte(`{"time": 100}`), &ce) + require.EqualError(t, err, `failed to deserialize "time": json: cannot unmarshal number into Go value of type string`) + + err = json.Unmarshal([]byte(`{"time": "not an RFC timestamp"}`), &ce) + require.EqualError(t, err, `failed to deserialize "time": parsing time "not an RFC timestamp" as "2006-01-02T15:04:05.999999999Z07:00": cannot parse "not an RFC timestamp" as "2006"`) +} + +func TestGetValue(t *testing.T) { + var s string + require.NoError(t, getValue("k", "hello", &s)) + require.Equal(t, "hello", s) + + // this doesn't work because we assume the [T] here is *string + // and that's not what the rawValue would be. + var ps *string + require.EqualError(t, getValue("k", "hello", &ps), `field "k" is a string, but should be *string`) +} + +func TestInvalidCloudEvent(t *testing.T) { + ce, err := NewCloudEvent("", "eventType", nil, nil) + require.Empty(t, ce) + require.EqualError(t, err, "source cannot be empty") + + ce, err = NewCloudEvent("source", "", nil, nil) + require.Empty(t, ce) + require.EqualError(t, err, "eventType cannot be empty") +} + +func roundTrip(t *testing.T, ce CloudEvent) *CloudEvent { + bytes, err := json.Marshal(ce) + require.NoError(t, err) + + var dest *CloudEvent + err = json.Unmarshal(bytes, &dest) + require.NoError(t, err) + + return dest +} diff --git a/sdk/azcore/messaging/example_usingcloudevent_test.go b/sdk/azcore/messaging/example_usingcloudevent_test.go new file mode 100644 index 000000000000..1f262db6df1e --- /dev/null +++ b/sdk/azcore/messaging/example_usingcloudevent_test.go @@ -0,0 +1,59 @@ +// Copyright 2017 Microsoft Corporation. All rights reserved. +// Use of this source code is governed by an MIT +// license that can be found in the LICENSE file. + +package messaging_test + +import ( + "encoding/json" + "fmt" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/messaging" +) + +func Example_usingCloudEvent() { + type sampleType struct { + CustomField string `json:"custom_field"` + } + + eventToSend, err := messaging.NewCloudEvent("source", "eventtype", &sampleType{ + CustomField: "hello, a custom field value", + }, nil) + + if err != nil { + panic(err) + } + + receivedEvent, err := sendAndReceiveCloudEvent(eventToSend) + + if err != nil { + panic(err) + } + + var receivedData *sampleType + + if err := json.Unmarshal(receivedEvent.Data.(json.RawMessage), &receivedData); err != nil { + panic(err) + } + + fmt.Printf("Custom field = %s\n", receivedData.CustomField) + + // Output: + // Custom field = hello, a custom field value +} + +func sendAndReceiveCloudEvent(ce messaging.CloudEvent) (messaging.CloudEvent, error) { + bytes, err := json.Marshal(ce) + + if err != nil { + return messaging.CloudEvent{}, err + } + + var received *messaging.CloudEvent + + if err := json.Unmarshal(bytes, &received); err != nil { + return messaging.CloudEvent{}, err + } + + return *received, nil +} diff --git a/sdk/azcore/messaging/testdata/cloudevent_binary_with_extensions.json b/sdk/azcore/messaging/testdata/cloudevent_binary_with_extensions.json new file mode 100644 index 000000000000..4bb5f9e50982 --- /dev/null +++ b/sdk/azcore/messaging/testdata/cloudevent_binary_with_extensions.json @@ -0,0 +1,15 @@ +{ + "data_base64": "AQID", + "datacontenttype": "data content type", + "dataschema": "microsoft.com/dataschema", + "extbool": true, + "extnum": 1, + "extstr": "extstring", + "exturi": "http://microsoft.com", + "id": "2de93014-a793-4170-88f4-1ef74002dfc9", + "source": "source", + "specversion": "1.0", + "subject": "subject", + "time": "2023-06-16T02:54:01.470515Z", + "type": "type" +} diff --git a/sdk/azcore/messaging/testdata/cloudevent_required_only.json b/sdk/azcore/messaging/testdata/cloudevent_required_only.json new file mode 100644 index 000000000000..ab76e6907d44 --- /dev/null +++ b/sdk/azcore/messaging/testdata/cloudevent_required_only.json @@ -0,0 +1,6 @@ +{ + "id": "2de93014-a793-4170-88f4-1ef74002dfc9", + "source": "source", + "specversion": "1.0", + "type": "type" +}