Skip to content

Commit

Permalink
Support Consumer Middleware #41
Browse files Browse the repository at this point in the history
  • Loading branch information
mehdihadeli committed Sep 26, 2022
1 parent 8786f45 commit 054cae0
Show file tree
Hide file tree
Showing 59 changed files with 808 additions and 293 deletions.
2 changes: 1 addition & 1 deletion .vscode/configurationCache.log
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"buildTargets":[".PHONY","cert","docker-compose_infra_down","docker-compose_infra_up","docker_clean","docker_down","docker_logs","docker_path","execute_k6_catalogs_read_service","execute_k6_catalogs_write_service","generate_load_test_client_catalogs_read_service","generate_load_test_client_catalogs_write_service","key","pprof_allocs","pprof_cpu","pprof_heap","proto_catalogs_read_product_service","proto_catalogs_write_product_service","proto_orders_order_service","run-linter","run_catalogs_read_service","run_catalogs_write_service","swagger_catalogs_read","swagger_catalogs_write","swagger_orders"],"launchTargets":[],"customConfigurationProvider":{"workspaceBrowse":{"browsePath":[],"compilerArgs":[]},"fileIndex":[]}}
{"buildTargets":[".PHONY","_path","cert","docker-compose_infra_down","docker-compose_infra_up","docker_clean","docker_down","docker_logs","execute_k6_catalogs_read_service","execute_k6_catalogs_write_service","generate_load_test_client_catalogs_read_service","generate_load_test_client_catalogs_write_service","key","pprof_allocs","pprof_cpu","pprof_heap","proto_catalogs_read_product_service","proto_catalogs_write_product_service","proto_orders_order_service","run-linter","run_catalogs_read_service","run_catalogs_write_service","swagger_catalogs_read","swagger_catalogs_write","swagger_orders"],"launchTargets":[],"customConfigurationProvider":{"workspaceBrowse":{"browsePath":[],"compilerArgs":[]},"fileIndex":[]}}
26 changes: 15 additions & 11 deletions .vscode/targets.log
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ make: *** No rule to make target 'all'. Stop.
# This is free software: you are free to change and redistribute it.
# There is NO WARRANTY, to the extent permitted by law.

# Make data base, printed on Sat Sep 24 18:45:47 2022
# Make data base, printed on Mon Sep 26 17:57:23 2022

# Variables

# environment
FPS_BROWSER_APP_PROFILE_STRING = Internet Explorer
# environment
SYSTEMDRIVE = C:
# environment
Expand Down Expand Up @@ -42,7 +44,9 @@ GOPATH = C:\Users\MehdiHadeli\go
# automatic
@D = $(patsubst %/,%,$(patsubst %\,%,$(dir $@)))
# environment
CHROME_CRASHPAD_PIPE_NAME = \\.\pipe\crashpad_24200_TPEATRNJNNVHFQDE
CHROME_CRASHPAD_PIPE_NAME = \\.\pipe\crashpad_21824_FYLOMXUBJVBNEDSG
# environment
FPS_BROWSER_USER_PROFILE_STRING = Default
# environment
VSCODE_HANDLES_UNCAUGHT_ERRORS = true
# default
Expand Down Expand Up @@ -94,7 +98,7 @@ MAKEFILE_LIST := Makefile
# automatic
@F = $(notdir $@)
# environment
VSCODE_PID = 24200
VSCODE_PID = 21824
# automatic
?D = $(patsubst %/,%,$(patsubst %\,%,$(dir $?)))
# automatic
Expand Down Expand Up @@ -218,11 +222,12 @@ PROCESSOR_REVISION = a503
# environment
PROCESSOR_IDENTIFIER = Intel64 Family 6 Model 165 Stepping 3, GenuineIntel
# variable set hash-table stats:
# Load=101/1024=10%, Rehash=0, Collisions=10/132=8%
# Load=103/1024=10%, Rehash=0, Collisions=11/134=8%

# Pattern-specific Variable Values

# No pattern-specific variable values.
# No pattern
-specific variable values.

# Directories

Expand Down Expand Up @@ -254,8 +259,7 @@ docker-compose_infra_up:

cert:
# Implicit rule search has not been done.
# Modification time neve
r checked.
# Modification time never checked.
# File has not been updated.
# recipe to execute (from 'Makefile', line 100):
openssl req -new -x509 -sha256 -key server.key -out server.pem -days 3650
Expand Down Expand Up @@ -427,8 +431,7 @@ proto_catalogs_read_product_service:
# File has not been updated.
# recipe to execute (from 'Makefile', line 122):
@echo Generating product_service client proto
protoc --go_out=./services/catalogs/read_service/internal/products/contracts/proto/service_clients --go-grpc_opt=require_unimplemented_servers=false --go-grpc_out=./services/catalogs/read_service/internal/products/contracts/proto/service_cl
ients api_docs/catalogs/read_service/protobuf/products/service_clients/*.proto
protoc --go_out=./services/catalogs/read_service/internal/products/contracts/proto/service_clients --go-grpc_opt=require_unimplemented_servers=false --go-grpc_out=./services/catalogs/read_service/internal/products/contracts/proto/service_clients api_docs/catalogs/read_service/protobuf/products/service_clients/*.proto

docker-compose_infra_down:
# Implicit rule search has not been done.
Expand All @@ -446,7 +449,8 @@ generate_load_test_client_catalogs_write_service:
@echo Generating load test client for catalogs write service
docker run --rm -v ${PWD}:/local openapitools/openapi-generator-cli generate --skip-validate-spec -i local/api_docs/catalogs/write_service/openapi/swagger.json -g k6 -o local/performance_tests/catalogs/write_service/k6-test/

docker_path:
docker
_path:
# Implicit rule search has not been done.
# Modification time never checked.
# File has not been updated.
Expand Down Expand Up @@ -487,6 +491,6 @@ proto_orders_order_service:
# strcache performance: lookups = 81 / hit rate = 38%
# hash-table stats:
# Load=50/8192=1%, Rehash=0, Collisions=0/81=0%
# Finished Make data base on Sat Sep 24 18:45:47 2022
# Finished Make data base on Mon Sep 26 17:57:23 2022


6 changes: 4 additions & 2 deletions pkg/core/domain/event_envelope.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package domain

import "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core"
import (
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata"
)

type EventEnvelope struct {
EventData interface{}
Metadata core.Metadata
Metadata metadata.Metadata
}
46 changes: 46 additions & 0 deletions pkg/core/metadata/message_extended_methods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package metadata

import (
messageHeader "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/message_header"
"time"
)

func (m Metadata) GetCorrelationId() string {
return m.GetString(messageHeader.CorrelationId)
}

func (m Metadata) SetCorrelationId(val string) {
m.SetValue(messageHeader.CorrelationId, val)
}

func (m Metadata) GetMessageId() string {
return m.GetString(messageHeader.MessageId)
}

func (m Metadata) SetMessageId(val string) {
m.SetValue(messageHeader.MessageId, val)
}

func (m Metadata) GetMessageName() string {
return m.GetString(messageHeader.Name)
}

func (m Metadata) SetMessageName(val string) {
m.SetValue(messageHeader.Name, val)
}

func (m Metadata) GetMessageType() string {
return m.GetString(messageHeader.Type)
}

func (m Metadata) SetMessageType(val string) {
m.SetValue(messageHeader.Type, val)
}

func (m Metadata) GetMessageCreated() time.Time {
return m.GetTime(messageHeader.Created)
}

func (m Metadata) SetMessageCreated(val time.Time) {
m.SetValue(messageHeader.Created, val)
}
28 changes: 23 additions & 5 deletions pkg/core/metadata.go → pkg/core/metadata/metadata.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package core
package metadata

import "emperror.dev/errors"
import "time"

type Metadata map[string]interface{}

Expand All @@ -9,13 +9,31 @@ func (m Metadata) ExistsKey(key string) bool {
return exists
}

func (m Metadata) GetKey(key string) (interface{}, error) {
func (m Metadata) GetKey(key string) interface{} {
val, exists := m[key]
if !exists {
return nil, errors.New("key not found")
return nil
}

return val, nil
return val
}

func (m Metadata) GetString(key string) string {
val, ok := m.GetKey(key).(string)
if ok {
return val
}

return ""
}

func (m Metadata) GetTime(key string) time.Time {
val, ok := m.GetKey(key).(time.Time)
if ok {
return val
}

return *new(time.Time)
}

func (m Metadata) SetValue(key string, value interface{}) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/core/serializer/json/json_metadata_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package json

import (
"emperror.dev/errors"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/serializer/jsonSerializer"
)

Expand All @@ -13,7 +13,7 @@ func NewJsonMetadataSerializer() *JsonMetadataSerializer {
return &JsonMetadataSerializer{}
}

func (s *JsonMetadataSerializer) Serialize(meta core.Metadata) ([]byte, error) {
func (s *JsonMetadataSerializer) Serialize(meta metadata.Metadata) ([]byte, error) {
if meta == nil {
return nil, nil
}
Expand All @@ -26,12 +26,12 @@ func (s *JsonMetadataSerializer) Serialize(meta core.Metadata) ([]byte, error) {
return marshal, nil
}

func (s *JsonMetadataSerializer) Deserialize(bytes []byte) (core.Metadata, error) {
func (s *JsonMetadataSerializer) Deserialize(bytes []byte) (metadata.Metadata, error) {
if bytes == nil {
return nil, nil
}

var meta core.Metadata
var meta metadata.Metadata

err := jsonSerializer.Unmarshal(bytes, &meta)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions pkg/core/serializer/metadata_serializer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package serializer

import "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core"
import (
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata"
)

type MetadataSerializer interface {
Serialize(meta core.Metadata) ([]byte, error)
Deserialize(bytes []byte) (core.Metadata, error)
Serialize(meta metadata.Metadata) ([]byte, error)
Deserialize(bytes []byte) (metadata.Metadata, error)
}
6 changes: 3 additions & 3 deletions pkg/es/contracts/store/aggregate_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package store

import (
"context"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models"
appendResult "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models/append_result"
readPosition "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models/stream_position/read_position"
Expand All @@ -16,12 +16,12 @@ type AggregateStore[T models.IHaveEventSourcedAggregate] interface {
// StoreWithVersion store the new or update aggregate state with expected version
StoreWithVersion(
aggregate T,
metadata core.Metadata,
metadata metadata.Metadata,
expectedVersion expectedStreamVersion.ExpectedStreamVersion,
ctx context.Context) (*appendResult.AppendEventsResult, error)

// Store the new or update aggregate state
Store(aggregate T, metadata core.Metadata, ctx context.Context) (*appendResult.AppendEventsResult, error)
Store(aggregate T, metadata metadata.Metadata, ctx context.Context) (*appendResult.AppendEventsResult, error)

// Load loads the most recent version of an aggregate to provided into params aggregate with an id and start read position.
Load(ctx context.Context, aggregateId uuid.UUID) (T, error)
Expand Down
10 changes: 5 additions & 5 deletions pkg/es/models/event_sourced_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"emperror.dev/errors"
"fmt"
"github.com/ahmetb/go-linq/v3"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/domain"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata"
errors2 "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/errors"
expectedStreamVersion "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models/stream_version"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/serializer/jsonSerializer"
Expand All @@ -24,7 +24,7 @@ type When interface {

type fold interface {
// Restore the aggregate state with events that are loaded form the event store and increase the current version and last commit version.
fold(event domain.IDomainEvent, metadata core.Metadata) error
fold(event domain.IDomainEvent, metadata metadata.Metadata) error
}

type Apply interface {
Expand Down Expand Up @@ -72,7 +72,7 @@ type IEventSourcedAggregateRoot interface {
UncommittedEvents() []domain.IDomainEvent

// LoadFromHistory Loads the current state of the aggregate from a list of events.
LoadFromHistory(events []domain.IDomainEvent, metadata core.Metadata) error
LoadFromHistory(events []domain.IDomainEvent, metadata metadata.Metadata) error

AggregateStateProjection
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func (a *EventSourcedAggregateRoot) UncommittedEvents() []domain.IDomainEvent {
return a.uncommittedEvents
}

func (a *EventSourcedAggregateRoot) LoadFromHistory(events []domain.IDomainEvent, metadata core.Metadata) error {
func (a *EventSourcedAggregateRoot) LoadFromHistory(events []domain.IDomainEvent, metadata metadata.Metadata) error {
for _, event := range events {
err := a.fold(event, metadata)
if err != nil {
Expand All @@ -188,7 +188,7 @@ func (a *EventSourcedAggregateRoot) Apply(event domain.IDomainEvent, isNew bool)
return nil
}

func (a *EventSourcedAggregateRoot) fold(event domain.IDomainEvent, metadata core.Metadata) error {
func (a *EventSourcedAggregateRoot) fold(event domain.IDomainEvent, metadata metadata.Metadata) error {
err := a.when(event)
if err != nil {
return errors.WrapIf(err, "[EventSourcedAggregateRoot_fold:when] error in the applying whenFunc")
Expand Down
4 changes: 2 additions & 2 deletions pkg/es/models/stream_event.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package models

import (
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/domain"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata"
uuid "github.com/satori/go.uuid"
)

Expand All @@ -11,5 +11,5 @@ type StreamEvent struct {
Version int64
Position int64
Event domain.IDomainEvent
Metadata core.Metadata
Metadata metadata.Metadata
}
12 changes: 6 additions & 6 deletions pkg/eventstroredb/aggregate_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"fmt"
"github.com/EventStore/EventStore-Client-Go/esdb"
"github.com/ahmetb/go-linq/v3"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/domain"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/contracts/store"
"github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models"
appendResult "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models/append_result"
Expand Down Expand Up @@ -35,7 +35,7 @@ func NewEventStoreAggregateStore[T models.IHaveEventSourcedAggregate](log logger
return &esdbAggregateStore[T]{log: log, eventStore: eventStore, serializer: serializer}
}

func (a *esdbAggregateStore[T]) StoreWithVersion(aggregate T, metadata core.Metadata, expectedVersion expectedStreamVersion.ExpectedStreamVersion, ctx context.Context) (*appendResult.AppendEventsResult, error) {
func (a *esdbAggregateStore[T]) StoreWithVersion(aggregate T, metadata metadata.Metadata, expectedVersion expectedStreamVersion.ExpectedStreamVersion, ctx context.Context) (*appendResult.AppendEventsResult, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "esdbAggregateStore.StoreWithVersion")
defer span.Finish()
span.LogFields(log.String("AggregateID", aggregate.Id().String()))
Expand Down Expand Up @@ -73,7 +73,7 @@ func (a *esdbAggregateStore[T]) StoreWithVersion(aggregate T, metadata core.Meta
return streamAppendResult, nil
}

func (a *esdbAggregateStore[T]) Store(aggregate T, metadata core.Metadata, ctx context.Context) (*appendResult.AppendEventsResult, error) {
func (a *esdbAggregateStore[T]) Store(aggregate T, metadata metadata.Metadata, ctx context.Context) (*appendResult.AppendEventsResult, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "esdbAggregateStore.Store")
defer span.Finish()
expectedVersion := expectedStreamVersion.FromInt64(aggregate.OriginalVersion())
Expand Down Expand Up @@ -125,15 +125,15 @@ func (a *esdbAggregateStore[T]) LoadWithReadPosition(ctx context.Context, aggreg
return *new(T), tracing.TraceWithErr(span, errors.WrapIff(err, "[esdbAggregateStore.LoadWithReadPosition:MethodByName] error in loading aggregate {%s}", aggregateId.String()))
}

var metadata core.Metadata
var meta metadata.Metadata
var domainEvents []domain.IDomainEvent

linq.From(streamEvents).Distinct().SelectT(func(streamEvent *models.StreamEvent) domain.IDomainEvent {
metadata = streamEvent.Metadata
meta = streamEvent.Metadata
return streamEvent.Event
}).ToSlice(&domainEvents)

err = aggregate.LoadFromHistory(domainEvents, metadata)
err = aggregate.LoadFromHistory(domainEvents, meta)
if err != nil {
return *new(T), tracing.TraceWithErr(span, err)
}
Expand Down
Loading

0 comments on commit 054cae0

Please sign in to comment.