diff --git a/.vscode/configurationCache.log b/.vscode/configurationCache.log index 7d511d18..d1c97883 100644 --- a/.vscode/configurationCache.log +++ b/.vscode/configurationCache.log @@ -1 +1 @@ -{"buildTargets":[".PHONY","cert","docker-compose_infra_down","docker-compose_infra_up","docker_clean","docker_down","docker_logs","docker_path","execute_k6_catalogs_read_service","execute_k6_catalogs_write_service","generate_load_test_client_catalogs_read_service","generate_load_test_client_catalogs_write_service","key","pprof_allocs","pprof_cpu","pprof_heap","proto_catalogs_read_product_service","proto_catalogs_write_product_service","proto_orders_order_service","run-linter","run_catalogs_read_service","run_catalogs_write_service","swagger_catalogs_read","swagger_catalogs_write","swagger_orders"],"launchTargets":[],"customConfigurationProvider":{"workspaceBrowse":{"browsePath":[],"compilerArgs":[]},"fileIndex":[]}} \ No newline at end of file +{"buildTargets":[".PHONY","_path","cert","docker-compose_infra_down","docker-compose_infra_up","docker_clean","docker_down","docker_logs","execute_k6_catalogs_read_service","execute_k6_catalogs_write_service","generate_load_test_client_catalogs_read_service","generate_load_test_client_catalogs_write_service","key","pprof_allocs","pprof_cpu","pprof_heap","proto_catalogs_read_product_service","proto_catalogs_write_product_service","proto_orders_order_service","run-linter","run_catalogs_read_service","run_catalogs_write_service","swagger_catalogs_read","swagger_catalogs_write","swagger_orders"],"launchTargets":[],"customConfigurationProvider":{"workspaceBrowse":{"browsePath":[],"compilerArgs":[]},"fileIndex":[]}} \ No newline at end of file diff --git a/.vscode/targets.log b/.vscode/targets.log index f84b2585..80d6d899 100644 --- a/.vscode/targets.log +++ b/.vscode/targets.log @@ -11,10 +11,12 @@ make: *** No rule to make target 'all'. Stop. # This is free software: you are free to change and redistribute it. # There is NO WARRANTY, to the extent permitted by law. -# Make data base, printed on Sat Sep 24 18:45:47 2022 +# Make data base, printed on Mon Sep 26 17:57:23 2022 # Variables +# environment +FPS_BROWSER_APP_PROFILE_STRING = Internet Explorer # environment SYSTEMDRIVE = C: # environment @@ -42,7 +44,9 @@ GOPATH = C:\Users\MehdiHadeli\go # automatic @D = $(patsubst %/,%,$(patsubst %\,%,$(dir $@))) # environment -CHROME_CRASHPAD_PIPE_NAME = \\.\pipe\crashpad_24200_TPEATRNJNNVHFQDE +CHROME_CRASHPAD_PIPE_NAME = \\.\pipe\crashpad_21824_FYLOMXUBJVBNEDSG +# environment +FPS_BROWSER_USER_PROFILE_STRING = Default # environment VSCODE_HANDLES_UNCAUGHT_ERRORS = true # default @@ -94,7 +98,7 @@ MAKEFILE_LIST := Makefile # automatic @F = $(notdir $@) # environment -VSCODE_PID = 24200 +VSCODE_PID = 21824 # automatic ?D = $(patsubst %/,%,$(patsubst %\,%,$(dir $?))) # automatic @@ -218,11 +222,12 @@ PROCESSOR_REVISION = a503 # environment PROCESSOR_IDENTIFIER = Intel64 Family 6 Model 165 Stepping 3, GenuineIntel # variable set hash-table stats: -# Load=101/1024=10%, Rehash=0, Collisions=10/132=8% +# Load=103/1024=10%, Rehash=0, Collisions=11/134=8% # Pattern-specific Variable Values -# No pattern-specific variable values. +# No pattern +-specific variable values. # Directories @@ -254,8 +259,7 @@ docker-compose_infra_up: cert: # Implicit rule search has not been done. -# Modification time neve -r checked. +# Modification time never checked. # File has not been updated. # recipe to execute (from 'Makefile', line 100): openssl req -new -x509 -sha256 -key server.key -out server.pem -days 3650 @@ -427,8 +431,7 @@ proto_catalogs_read_product_service: # File has not been updated. # recipe to execute (from 'Makefile', line 122): @echo Generating product_service client proto - protoc --go_out=./services/catalogs/read_service/internal/products/contracts/proto/service_clients --go-grpc_opt=require_unimplemented_servers=false --go-grpc_out=./services/catalogs/read_service/internal/products/contracts/proto/service_cl -ients api_docs/catalogs/read_service/protobuf/products/service_clients/*.proto + protoc --go_out=./services/catalogs/read_service/internal/products/contracts/proto/service_clients --go-grpc_opt=require_unimplemented_servers=false --go-grpc_out=./services/catalogs/read_service/internal/products/contracts/proto/service_clients api_docs/catalogs/read_service/protobuf/products/service_clients/*.proto docker-compose_infra_down: # Implicit rule search has not been done. @@ -446,7 +449,8 @@ generate_load_test_client_catalogs_write_service: @echo Generating load test client for catalogs write service docker run --rm -v ${PWD}:/local openapitools/openapi-generator-cli generate --skip-validate-spec -i local/api_docs/catalogs/write_service/openapi/swagger.json -g k6 -o local/performance_tests/catalogs/write_service/k6-test/ -docker_path: +docker +_path: # Implicit rule search has not been done. # Modification time never checked. # File has not been updated. @@ -487,6 +491,6 @@ proto_orders_order_service: # strcache performance: lookups = 81 / hit rate = 38% # hash-table stats: # Load=50/8192=1%, Rehash=0, Collisions=0/81=0% -# Finished Make data base on Sat Sep 24 18:45:47 2022 +# Finished Make data base on Mon Sep 26 17:57:23 2022 diff --git a/pkg/core/domain/event_envelope.go b/pkg/core/domain/event_envelope.go index 625993f6..5160e9c6 100644 --- a/pkg/core/domain/event_envelope.go +++ b/pkg/core/domain/event_envelope.go @@ -1,8 +1,10 @@ package domain -import "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" +import ( + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" +) type EventEnvelope struct { EventData interface{} - Metadata core.Metadata + Metadata metadata.Metadata } diff --git a/pkg/core/metadata/message_extended_methods.go b/pkg/core/metadata/message_extended_methods.go new file mode 100644 index 00000000..dfc9eb73 --- /dev/null +++ b/pkg/core/metadata/message_extended_methods.go @@ -0,0 +1,46 @@ +package metadata + +import ( + messageHeader "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/message_header" + "time" +) + +func (m Metadata) GetCorrelationId() string { + return m.GetString(messageHeader.CorrelationId) +} + +func (m Metadata) SetCorrelationId(val string) { + m.SetValue(messageHeader.CorrelationId, val) +} + +func (m Metadata) GetMessageId() string { + return m.GetString(messageHeader.MessageId) +} + +func (m Metadata) SetMessageId(val string) { + m.SetValue(messageHeader.MessageId, val) +} + +func (m Metadata) GetMessageName() string { + return m.GetString(messageHeader.Name) +} + +func (m Metadata) SetMessageName(val string) { + m.SetValue(messageHeader.Name, val) +} + +func (m Metadata) GetMessageType() string { + return m.GetString(messageHeader.Type) +} + +func (m Metadata) SetMessageType(val string) { + m.SetValue(messageHeader.Type, val) +} + +func (m Metadata) GetMessageCreated() time.Time { + return m.GetTime(messageHeader.Created) +} + +func (m Metadata) SetMessageCreated(val time.Time) { + m.SetValue(messageHeader.Created, val) +} diff --git a/pkg/core/metadata.go b/pkg/core/metadata/metadata.go similarity index 57% rename from pkg/core/metadata.go rename to pkg/core/metadata/metadata.go index 322e200c..441735fa 100644 --- a/pkg/core/metadata.go +++ b/pkg/core/metadata/metadata.go @@ -1,6 +1,6 @@ -package core +package metadata -import "emperror.dev/errors" +import "time" type Metadata map[string]interface{} @@ -9,13 +9,31 @@ func (m Metadata) ExistsKey(key string) bool { return exists } -func (m Metadata) GetKey(key string) (interface{}, error) { +func (m Metadata) GetKey(key string) interface{} { val, exists := m[key] if !exists { - return nil, errors.New("key not found") + return nil } - return val, nil + return val +} + +func (m Metadata) GetString(key string) string { + val, ok := m.GetKey(key).(string) + if ok { + return val + } + + return "" +} + +func (m Metadata) GetTime(key string) time.Time { + val, ok := m.GetKey(key).(time.Time) + if ok { + return val + } + + return *new(time.Time) } func (m Metadata) SetValue(key string, value interface{}) { diff --git a/pkg/core/serializer/json/json_metadata_serializer.go b/pkg/core/serializer/json/json_metadata_serializer.go index 35c5f114..e33e48ee 100644 --- a/pkg/core/serializer/json/json_metadata_serializer.go +++ b/pkg/core/serializer/json/json_metadata_serializer.go @@ -2,7 +2,7 @@ package json import ( "emperror.dev/errors" - "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/serializer/jsonSerializer" ) @@ -13,7 +13,7 @@ func NewJsonMetadataSerializer() *JsonMetadataSerializer { return &JsonMetadataSerializer{} } -func (s *JsonMetadataSerializer) Serialize(meta core.Metadata) ([]byte, error) { +func (s *JsonMetadataSerializer) Serialize(meta metadata.Metadata) ([]byte, error) { if meta == nil { return nil, nil } @@ -26,12 +26,12 @@ func (s *JsonMetadataSerializer) Serialize(meta core.Metadata) ([]byte, error) { return marshal, nil } -func (s *JsonMetadataSerializer) Deserialize(bytes []byte) (core.Metadata, error) { +func (s *JsonMetadataSerializer) Deserialize(bytes []byte) (metadata.Metadata, error) { if bytes == nil { return nil, nil } - var meta core.Metadata + var meta metadata.Metadata err := jsonSerializer.Unmarshal(bytes, &meta) if err != nil { diff --git a/pkg/core/serializer/metadata_serializer.go b/pkg/core/serializer/metadata_serializer.go index da347109..d4fb0ac0 100644 --- a/pkg/core/serializer/metadata_serializer.go +++ b/pkg/core/serializer/metadata_serializer.go @@ -1,8 +1,10 @@ package serializer -import "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" +import ( + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" +) type MetadataSerializer interface { - Serialize(meta core.Metadata) ([]byte, error) - Deserialize(bytes []byte) (core.Metadata, error) + Serialize(meta metadata.Metadata) ([]byte, error) + Deserialize(bytes []byte) (metadata.Metadata, error) } diff --git a/pkg/es/contracts/store/aggregate_store.go b/pkg/es/contracts/store/aggregate_store.go index 99a80db6..9417e749 100644 --- a/pkg/es/contracts/store/aggregate_store.go +++ b/pkg/es/contracts/store/aggregate_store.go @@ -2,7 +2,7 @@ package store import ( "context" - "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models" appendResult "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models/append_result" readPosition "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models/stream_position/read_position" @@ -16,12 +16,12 @@ type AggregateStore[T models.IHaveEventSourcedAggregate] interface { // StoreWithVersion store the new or update aggregate state with expected version StoreWithVersion( aggregate T, - metadata core.Metadata, + metadata metadata.Metadata, expectedVersion expectedStreamVersion.ExpectedStreamVersion, ctx context.Context) (*appendResult.AppendEventsResult, error) // Store the new or update aggregate state - Store(aggregate T, metadata core.Metadata, ctx context.Context) (*appendResult.AppendEventsResult, error) + Store(aggregate T, metadata metadata.Metadata, ctx context.Context) (*appendResult.AppendEventsResult, error) // Load loads the most recent version of an aggregate to provided into params aggregate with an id and start read position. Load(ctx context.Context, aggregateId uuid.UUID) (T, error) diff --git a/pkg/es/models/event_sourced_aggregate.go b/pkg/es/models/event_sourced_aggregate.go index 13e3c069..3d127442 100644 --- a/pkg/es/models/event_sourced_aggregate.go +++ b/pkg/es/models/event_sourced_aggregate.go @@ -7,8 +7,8 @@ import ( "emperror.dev/errors" "fmt" "github.com/ahmetb/go-linq/v3" - "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/domain" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" errors2 "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/errors" expectedStreamVersion "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models/stream_version" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/serializer/jsonSerializer" @@ -24,7 +24,7 @@ type When interface { type fold interface { // Restore the aggregate state with events that are loaded form the event store and increase the current version and last commit version. - fold(event domain.IDomainEvent, metadata core.Metadata) error + fold(event domain.IDomainEvent, metadata metadata.Metadata) error } type Apply interface { @@ -72,7 +72,7 @@ type IEventSourcedAggregateRoot interface { UncommittedEvents() []domain.IDomainEvent // LoadFromHistory Loads the current state of the aggregate from a list of events. - LoadFromHistory(events []domain.IDomainEvent, metadata core.Metadata) error + LoadFromHistory(events []domain.IDomainEvent, metadata metadata.Metadata) error AggregateStateProjection } @@ -161,7 +161,7 @@ func (a *EventSourcedAggregateRoot) UncommittedEvents() []domain.IDomainEvent { return a.uncommittedEvents } -func (a *EventSourcedAggregateRoot) LoadFromHistory(events []domain.IDomainEvent, metadata core.Metadata) error { +func (a *EventSourcedAggregateRoot) LoadFromHistory(events []domain.IDomainEvent, metadata metadata.Metadata) error { for _, event := range events { err := a.fold(event, metadata) if err != nil { @@ -188,7 +188,7 @@ func (a *EventSourcedAggregateRoot) Apply(event domain.IDomainEvent, isNew bool) return nil } -func (a *EventSourcedAggregateRoot) fold(event domain.IDomainEvent, metadata core.Metadata) error { +func (a *EventSourcedAggregateRoot) fold(event domain.IDomainEvent, metadata metadata.Metadata) error { err := a.when(event) if err != nil { return errors.WrapIf(err, "[EventSourcedAggregateRoot_fold:when] error in the applying whenFunc") diff --git a/pkg/es/models/stream_event.go b/pkg/es/models/stream_event.go index 91cb165a..7d7021ab 100644 --- a/pkg/es/models/stream_event.go +++ b/pkg/es/models/stream_event.go @@ -1,8 +1,8 @@ package models import ( - "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/domain" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" uuid "github.com/satori/go.uuid" ) @@ -11,5 +11,5 @@ type StreamEvent struct { Version int64 Position int64 Event domain.IDomainEvent - Metadata core.Metadata + Metadata metadata.Metadata } diff --git a/pkg/eventstroredb/aggregate_store.go b/pkg/eventstroredb/aggregate_store.go index 0180bab5..912c6e0c 100644 --- a/pkg/eventstroredb/aggregate_store.go +++ b/pkg/eventstroredb/aggregate_store.go @@ -6,8 +6,8 @@ import ( "fmt" "github.com/EventStore/EventStore-Client-Go/esdb" "github.com/ahmetb/go-linq/v3" - "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/domain" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/contracts/store" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models" appendResult "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models/append_result" @@ -35,7 +35,7 @@ func NewEventStoreAggregateStore[T models.IHaveEventSourcedAggregate](log logger return &esdbAggregateStore[T]{log: log, eventStore: eventStore, serializer: serializer} } -func (a *esdbAggregateStore[T]) StoreWithVersion(aggregate T, metadata core.Metadata, expectedVersion expectedStreamVersion.ExpectedStreamVersion, ctx context.Context) (*appendResult.AppendEventsResult, error) { +func (a *esdbAggregateStore[T]) StoreWithVersion(aggregate T, metadata metadata.Metadata, expectedVersion expectedStreamVersion.ExpectedStreamVersion, ctx context.Context) (*appendResult.AppendEventsResult, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "esdbAggregateStore.StoreWithVersion") defer span.Finish() span.LogFields(log.String("AggregateID", aggregate.Id().String())) @@ -73,7 +73,7 @@ func (a *esdbAggregateStore[T]) StoreWithVersion(aggregate T, metadata core.Meta return streamAppendResult, nil } -func (a *esdbAggregateStore[T]) Store(aggregate T, metadata core.Metadata, ctx context.Context) (*appendResult.AppendEventsResult, error) { +func (a *esdbAggregateStore[T]) Store(aggregate T, metadata metadata.Metadata, ctx context.Context) (*appendResult.AppendEventsResult, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "esdbAggregateStore.Store") defer span.Finish() expectedVersion := expectedStreamVersion.FromInt64(aggregate.OriginalVersion()) @@ -125,15 +125,15 @@ func (a *esdbAggregateStore[T]) LoadWithReadPosition(ctx context.Context, aggreg return *new(T), tracing.TraceWithErr(span, errors.WrapIff(err, "[esdbAggregateStore.LoadWithReadPosition:MethodByName] error in loading aggregate {%s}", aggregateId.String())) } - var metadata core.Metadata + var meta metadata.Metadata var domainEvents []domain.IDomainEvent linq.From(streamEvents).Distinct().SelectT(func(streamEvent *models.StreamEvent) domain.IDomainEvent { - metadata = streamEvent.Metadata + meta = streamEvent.Metadata return streamEvent.Event }).ToSlice(&domainEvents) - err = aggregate.LoadFromHistory(domainEvents, metadata) + err = aggregate.LoadFromHistory(domainEvents, meta) if err != nil { return *new(T), tracing.TraceWithErr(span, err) } diff --git a/pkg/eventstroredb/esdb_serilizer.go b/pkg/eventstroredb/esdb_serilizer.go index b5995158..018ffd53 100644 --- a/pkg/eventstroredb/esdb_serilizer.go +++ b/pkg/eventstroredb/esdb_serilizer.go @@ -5,8 +5,8 @@ import ( "github.com/EventStore/EventStore-Client-Go/esdb" "github.com/ahmetb/go-linq/v3" "github.com/gofrs/uuid" - "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/domain" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/serializer" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models" appendResult "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/models/append_result" @@ -164,13 +164,13 @@ func (e *EsdbSerializer) EsdbWriteResultToAppendEventResult(writeResult *esdb.Wr return appendResult.From(writeResult.CommitPosition, writeResult.NextExpectedVersion) } -func (e *EsdbSerializer) Serialize(data interface{}, metadata core.Metadata) (*esdb.EventData, error) { +func (e *EsdbSerializer) Serialize(data interface{}, meta metadata.Metadata) (*esdb.EventData, error) { serializedData, err := e.eventSerializer.Serialize(data) if err != nil { return nil, err } - serializedMeta, err := e.metadataSerializer.Serialize(metadata) + serializedMeta, err := e.metadataSerializer.Serialize(meta) if err != nil { return nil, err } @@ -185,25 +185,25 @@ func (e *EsdbSerializer) Serialize(data interface{}, metadata core.Metadata) (*e }, nil } -func (e *EsdbSerializer) Deserialize(resolveEvent *esdb.ResolvedEvent) (interface{}, core.Metadata, error) { +func (e *EsdbSerializer) Deserialize(resolveEvent *esdb.ResolvedEvent) (interface{}, metadata.Metadata, error) { eventType := resolveEvent.Event.EventType data := resolveEvent.Event.Data - meta := resolveEvent.Event.UserMetadata + userMeta := resolveEvent.Event.UserMetadata payload, err := e.eventSerializer.DeserializeEvent(data, eventType, resolveEvent.Event.ContentType) if err != nil { return nil, nil, err } - metadata, err := e.metadataSerializer.Deserialize(meta) + meta, err := e.metadataSerializer.Deserialize(userMeta) if err != nil { return nil, nil, err } - return payload, metadata, nil + return payload, meta, nil } -func (e *EsdbSerializer) DomainEventToStreamEvent(domainEvent domain.IDomainEvent, meta core.Metadata, position int64) *models.StreamEvent { +func (e *EsdbSerializer) DomainEventToStreamEvent(domainEvent domain.IDomainEvent, meta metadata.Metadata, position int64) *models.StreamEvent { return &models.StreamEvent{ EventID: uuid2.NewV4(), Event: domainEvent, diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go index 78442224..f22e57cf 100644 --- a/pkg/grpc/server.go +++ b/pkg/grpc/server.go @@ -1,6 +1,7 @@ package grpc import ( + "context" "emperror.dev/errors" "fmt" grpcMiddleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -26,10 +27,11 @@ const ( type GrpcConfig struct { Port string `mapstructure:"port" env:"Port"` Development bool `mapstructure:"development" env:"Development"` + Name string `mapstructure:"name" env:"Name"` } type GrpcServer interface { - RunGrpcServer(configGrpc func(grpcServer *grpc.Server)) error + RunGrpcServer(ctx context.Context, configGrpc func(grpcServer *grpc.Server)) error GracefulShutdown() GetCurrentGrpcServer() *grpc.Server } @@ -59,7 +61,7 @@ func NewGrpcServer(config *GrpcConfig, logger logger.Logger) *grpcServer { return &grpcServer{server: s, config: config, log: logger} } -func (s *grpcServer) RunGrpcServer(configGrpc func(grpcServer *grpc.Server)) error { +func (s *grpcServer) RunGrpcServer(ctx context.Context, configGrpc func(grpcServer *grpc.Server)) error { l, err := net.Listen("tcp", s.config.Port) if err != nil { return errors.WrapIf(err, "net.Listen") @@ -75,6 +77,17 @@ func (s *grpcServer) RunGrpcServer(configGrpc func(grpcServer *grpc.Server)) err reflection.Register(s.server) } + go func() { + for { + select { + case <-ctx.Done(): + s.log.Infof("%s is shutting down Grpc PORT: {%s}", s.config.Name, s.config.Port) + s.GracefulShutdown() + return + } + } + }() + s.log.Infof("[grpcServer.RunGrpcServer] Writer gRPC server is listening on port: %s", s.config.Port) err = s.server.Serve(l) diff --git a/pkg/http/custom_echo/config.go b/pkg/http/custom_echo/config.go index b6e42bc0..a0538a0d 100644 --- a/pkg/http/custom_echo/config.go +++ b/pkg/http/custom_echo/config.go @@ -8,4 +8,5 @@ type EchoHttpConfig struct { IgnoreLogUrls []string `mapstructure:"ignoreLogUrls"` Timeout int `mapstructure:"timeout" env:"Timeout"` Host string `mapstructure:"host" env:"Host"` + Name string `mapstructure:"name" env:"Name"` } diff --git a/pkg/http/custom_echo/echo_server.go b/pkg/http/custom_echo/echo_server.go index 047f646d..bc128300 100644 --- a/pkg/http/custom_echo/echo_server.go +++ b/pkg/http/custom_echo/echo_server.go @@ -20,7 +20,7 @@ type echoHttpServer struct { } type EchoHttpServer interface { - RunHttpServer(configEcho func(echoServer *echo.Echo)) error + RunHttpServer(ctx context.Context, configEcho func(echoServer *echo.Echo)) error GracefulShutdown(ctx context.Context) error ApplyVersioningFromHeader() GetEchoInstance() *echo.Echo @@ -33,7 +33,7 @@ func NewEchoHttpServer(config *EchoHttpConfig, logger logger.Logger) *echoHttpSe return &echoHttpServer{echo: echo.New(), config: config, log: logger} } -func (s *echoHttpServer) RunHttpServer(configEcho func(echo *echo.Echo)) error { +func (s *echoHttpServer) RunHttpServer(ctx context.Context, configEcho func(echo *echo.Echo)) error { s.echo.Server.ReadTimeout = constants.ReadTimeout s.echo.Server.WriteTimeout = constants.WriteTimeout s.echo.Server.MaxHeaderBytes = constants.MaxHeaderBytes @@ -42,6 +42,19 @@ func (s *echoHttpServer) RunHttpServer(configEcho func(echo *echo.Echo)) error { configEcho(s.echo) } + go func() { + for { + select { + case <-ctx.Done(): + s.log.Infof("%s is shutting down Http PORT: {%s}", s.config.Name, s.config.Port) + if err := s.GracefulShutdown(ctx); err != nil { + s.log.Warnf("(Shutdown) err: {%v}", err) + } + return + } + } + }() + //https://echo.labstack.com/guide/http_server/ return s.echo.Start(s.config.Port) } diff --git a/pkg/http/http_errors/problemDetails/option_builder.go b/pkg/http/http_errors/problemDetails/option_builder.go new file mode 100644 index 00000000..d8bb283a --- /dev/null +++ b/pkg/http/http_errors/problemDetails/option_builder.go @@ -0,0 +1,21 @@ +package problemDetails + +import "reflect" + +type OptionBuilder struct { + internalErrors map[reflect.Type]func(err error) ProblemDetailErr +} + +func NewOptionBuilder() *OptionBuilder { + return &OptionBuilder{} +} + +func (p *OptionBuilder) Map(srcErrorType reflect.Type, problem ProblemDetailFunc[error]) *OptionBuilder { + internalErrorMaps[srcErrorType] = problem + + return p +} + +func (p *OptionBuilder) Build() map[reflect.Type]func(err error) ProblemDetailErr { + return p.internalErrors +} diff --git a/pkg/http/http_errors/problemDetails/problem_detail_parser.go b/pkg/http/http_errors/problemDetails/problem_detail_parser.go index 89ff79a5..cb738138 100644 --- a/pkg/http/http_errors/problemDetails/problem_detail_parser.go +++ b/pkg/http/http_errors/problemDetails/problem_detail_parser.go @@ -8,9 +8,31 @@ import ( "github.com/mehdihadeli/store-golang-microservice-sample/pkg/constants" httpErrors "github.com/mehdihadeli/store-golang-microservice-sample/pkg/http/http_errors" customErrors "github.com/mehdihadeli/store-golang-microservice-sample/pkg/http/http_errors/custom_errors" + typeMapper "github.com/mehdihadeli/store-golang-microservice-sample/pkg/reflection/type_mappper" "net/http" + "reflect" ) +type ProblemDetailParser struct { + internalErrors map[reflect.Type]func(err error) ProblemDetailErr +} + +func NewProblemDetailParser(builder func(builder *OptionBuilder)) *ProblemDetailParser { + optionBuilder := NewOptionBuilder() + builder(optionBuilder) + items := optionBuilder.Build() + return &ProblemDetailParser{internalErrors: items} +} + +func (p *ProblemDetailParser) ResolveError(err error) ProblemDetailErr { + errType := typeMapper.GetType(err) + problem := p.internalErrors[errType] + if problem != nil { + return problem(err) + } + return nil +} + func ParseError(err error) ProblemDetailErr { stackTrace := httpErrors.ErrorsWithStack(err) customErr := customErrors.GetCustomError(err) diff --git a/pkg/http/http_errors/problemDetails/problem_details.go b/pkg/http/http_errors/problemDetails/problem_details.go index da71761b..2a959aad 100644 --- a/pkg/http/http_errors/problemDetails/problem_details.go +++ b/pkg/http/http_errors/problemDetails/problem_details.go @@ -5,7 +5,9 @@ import ( "fmt" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/logger/defaultLogger" + typeMapper "github.com/mehdihadeli/store-golang-microservice-sample/pkg/reflection/type_mappper" "net/http" + "reflect" "time" ) @@ -13,6 +15,10 @@ const ( ContentTypeJSON = "application/problem+json" ) +type ProblemDetailFunc[E error] func(err error) ProblemDetailErr + +var internalErrorMaps map[reflect.Type]func(err error) ProblemDetailErr + // ProblemDetailErr ProblemDetail error interface type ProblemDetailErr interface { GetStatus() int @@ -164,6 +170,20 @@ func NewProblemDetailFromCodeAndDetail(status int, detail string, stackTrace str } } +func Map[E error](problem ProblemDetailFunc[E]) { + errorType := reflect.TypeOf(typeMapper.GetTypeFromGeneric[E]()) + internalErrorMaps[errorType] = problem +} + +func ResolveProblemDetail(err error) ProblemDetailErr { + errorType := typeMapper.GetType(err) + problem := internalErrorMaps[errorType] + if problem != nil { + return problem(err) + } + return nil +} + func getDefaultType(statusCode int) string { return fmt.Sprintf("https://httpstatuses.io/%d", statusCode) } diff --git a/pkg/messaging/consumer/consumer_handler.go b/pkg/messaging/consumer/consumer_handler.go index 09b6da71..d7252846 100644 --- a/pkg/messaging/consumer/consumer_handler.go +++ b/pkg/messaging/consumer/consumer_handler.go @@ -5,6 +5,10 @@ import ( "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/types" ) -type ConsumerHandler[T types.IMessage] interface { - Handle(ctx context.Context, consumeContext types.IMessageConsumeContext[T]) error +type ConsumerHandlerT[T types.IMessage] interface { + Handle(ctx context.Context, consumeContext types.MessageConsumeContextT[T]) error +} + +type ConsumerHandler interface { + Handle(ctx context.Context, consumeContext types.MessageConsumeContext) error } diff --git a/pkg/messaging/pipeline/consumer_pipeline.go b/pkg/messaging/pipeline/consumer_pipeline.go index c7d1642a..a677b1c4 100644 --- a/pkg/messaging/pipeline/consumer_pipeline.go +++ b/pkg/messaging/pipeline/consumer_pipeline.go @@ -9,6 +9,6 @@ import ( type ConsumerHandlerFunc func() error // ConsumerPipeline is a Pipeline for wrapping the inner consumer handler. -type ConsumerPipeline[T types.IMessage] interface { - Handle(ctx context.Context, consumerContext types.IMessageConsumeContext[T], next ConsumerHandlerFunc) error +type ConsumerPipeline interface { + Handle(ctx context.Context, consumerContext types.MessageConsumeContext, next ConsumerHandlerFunc) error } diff --git a/pkg/messaging/producer/producer.go b/pkg/messaging/producer/producer.go index e5c90600..4ea46912 100644 --- a/pkg/messaging/producer/producer.go +++ b/pkg/messaging/producer/producer.go @@ -2,11 +2,11 @@ package producer import ( "context" - "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/types" ) type Producer interface { - Publish(ctx context.Context, message types.IMessage, metadata core.Metadata) error - PublishWithTopicName(ctx context.Context, message types.IMessage, metadata core.Metadata, topicOrExchangeName string) error + PublishMessage(ctx context.Context, message types.IMessage, meta metadata.Metadata) error + PublishMessageWithTopicName(ctx context.Context, message types.IMessage, meta metadata.Metadata, topicOrExchangeName string) error } diff --git a/pkg/messaging/types/message.go b/pkg/messaging/types/message.go index 899958c1..fc2f5919 100644 --- a/pkg/messaging/types/message.go +++ b/pkg/messaging/types/message.go @@ -6,18 +6,15 @@ import ( type IMessage interface { GeMessageId() string - GetCorrelationId() string GetCreated() time.Time GetEventTypeName() string - SetCorrelationId(string) SetEventTypeName(string) } type Message struct { - MessageId string `json:"messageId,omitempty"` - CorrelationId string `json:"correlationId"` - Created time.Time `json:"created"` - EventType string `json:"eventType"` + MessageId string `json:"messageId,omitempty"` + Created time.Time `json:"created"` + EventType string `json:"eventType"` } func NewMessage(messageId string) *Message { @@ -32,18 +29,10 @@ func (m *Message) GeMessageId() string { return m.MessageId } -func (m *Message) GetCorrelationId() string { - return m.CorrelationId -} - func (m *Message) GetCreated() time.Time { return m.Created } -func (m *Message) SetCorrelationId(correlationId string) { - m.CorrelationId = correlationId -} - func (m *Message) GetEventTypeName() string { return m.EventType } diff --git a/pkg/messaging/types/message_consume_context.go b/pkg/messaging/types/message_consume_context.go index 31e210b2..754aced0 100644 --- a/pkg/messaging/types/message_consume_context.go +++ b/pkg/messaging/types/message_consume_context.go @@ -1,24 +1,35 @@ package types import ( - "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" "time" ) -type IMessageConsumeContext[T IMessage] interface { +type MessageConsumeContextBase interface { MessageId() string CorrelationId() string MessageType() string Created() time.Time ContentType() string - Tag() uint64 - Metadata() core.Metadata + DeliveryTag() uint64 + Body() interface{} + Metadata() metadata.Metadata +} + +type MessageConsumeContextT[T IMessage] interface { + MessageConsumeContextBase Message() T + ToMessageConsumeContext() MessageConsumeContext } -type messageConsumeContext[T IMessage] struct { - message T - metadata core.Metadata +type MessageConsumeContext interface { + MessageConsumeContextBase + Message() IMessage +} + +type messageConsumeContextBase struct { + metadata metadata.Metadata + body interface{} contentType string messageType string messageId string @@ -27,10 +38,20 @@ type messageConsumeContext[T IMessage] struct { correlationId string } -func NewMessageConsumeContext[T IMessage](message T, meta core.Metadata, contentType string, messageType string, created time.Time, deliveryTag uint64, messageId string, correlationId string) IMessageConsumeContext[T] { - return &messageConsumeContext[T]{ - message: message, +type messageConsumeContextT[T IMessage] struct { + MessageConsumeContextBase + message T +} + +type messageConsumeContext struct { + MessageConsumeContextBase + message IMessage +} + +func NewMessageContextBase(body interface{}, meta metadata.Metadata, contentType string, messageType string, created time.Time, deliveryTag uint64, messageId string, correlationId string) MessageConsumeContextBase { + return &messageConsumeContextBase{ metadata: meta, + body: body, contentType: contentType, messageId: messageId, tag: deliveryTag, @@ -39,35 +60,60 @@ func NewMessageConsumeContext[T IMessage](message T, meta core.Metadata, content correlationId: correlationId, } } +func NewMessageConsumeContextT[T IMessage](message T, meta metadata.Metadata, contentType string, messageType string, created time.Time, deliveryTag uint64, messageId string, correlationId string) MessageConsumeContextT[T] { + return &messageConsumeContextT[T]{ + message: message, + MessageConsumeContextBase: NewMessageContextBase(message, meta, contentType, messageType, created, deliveryTag, messageId, correlationId), + } +} -func (m messageConsumeContext[T]) Message() T { +func NewMessageConsumeContext(message IMessage, meta metadata.Metadata, contentType string, messageType string, created time.Time, deliveryTag uint64, messageId string, correlationId string) MessageConsumeContext { + return &messageConsumeContext{ + message: message, + MessageConsumeContextBase: NewMessageContextBase(message, meta, contentType, messageType, created, deliveryTag, messageId, correlationId), + } +} + +func (m *messageConsumeContext) Message() IMessage { + return m.message +} + +func (m *messageConsumeContextT[T]) Message() T { return m.message } -func (m messageConsumeContext[T]) MessageId() string { +func (m *messageConsumeContextT[T]) ToMessageConsumeContext() MessageConsumeContext { + return NewMessageConsumeContext(m.Message(), m.Metadata(), m.ContentType(), m.MessageType(), m.Created(), m.DeliveryTag(), m.MessageId(), m.CorrelationId()) +} + +func (m *messageConsumeContextBase) MessageId() string { return m.messageId } -func (m messageConsumeContext[T]) CorrelationId() string { +func (m *messageConsumeContextBase) Body() interface{} { + return m.body +} + +func (m *messageConsumeContextBase) CorrelationId() string { return m.correlationId } -func (m messageConsumeContext[T]) MessageType() string { +func (m *messageConsumeContextBase) MessageType() string { return m.messageType } -func (m messageConsumeContext[T]) ContentType() string { +func (m *messageConsumeContextBase) ContentType() string { return m.contentType } -func (m messageConsumeContext[T]) Metadata() core.Metadata { +func (m *messageConsumeContextBase) Metadata() metadata.Metadata { return m.metadata } -func (m messageConsumeContext[T]) Created() time.Time { +func (m *messageConsumeContextBase) Created() time.Time { return m.created } -func (m messageConsumeContext[T]) Tag() uint64 { +func (m *messageConsumeContextBase) DeliveryTag() uint64 { return m.tag } diff --git a/pkg/messaging/utils/utils.go b/pkg/messaging/utils/utils.go index fac6e9f7..e754f6bc 100644 --- a/pkg/messaging/utils/utils.go +++ b/pkg/messaging/utils/utils.go @@ -8,28 +8,28 @@ import ( "reflect" ) -func GetMessageName(message any) string { +func GetMessageName(message interface{}) string { if reflect.TypeOf(message).Kind() == reflect.Pointer { return strcase.ToSnake(reflect.TypeOf(message).Elem().Name()) } return strcase.ToSnake(reflect.TypeOf(message).Name()) } -func GetTopicOrExchangeName(message types.IMessage) string { +func GetTopicOrExchangeName(message interface{}) string { if reflect.TypeOf(message).Kind() == reflect.Pointer { return strcase.ToSnake(reflect.TypeOf(message).Elem().Name()) } return strcase.ToSnake(reflect.TypeOf(message).Name()) } -func GetQueueName(message types.IMessage) string { +func GetQueueName(message interface{}) string { if reflect.TypeOf(message).Kind() == reflect.Pointer { return strcase.ToSnake(reflect.TypeOf(message).Elem().Name()) } return strcase.ToSnake(reflect.TypeOf(message).Name()) } -func GetRoutingKey(message types.IMessage) string { +func GetRoutingKey(message interface{}) string { if reflect.TypeOf(message).Kind() == reflect.Pointer { return strcase.ToSnake(reflect.TypeOf(message).Elem().Name()) } @@ -57,4 +57,3 @@ func GetAllMessageTypes() []reflect.Type { return squares } - diff --git a/pkg/rabbitmq/consumer/options/rabbitmq_consumer_options.go b/pkg/rabbitmq/consumer/options/rabbitmq_consumer_options.go index f08e0ee5..44fe9f5f 100644 --- a/pkg/rabbitmq/consumer/options/rabbitmq_consumer_options.go +++ b/pkg/rabbitmq/consumer/options/rabbitmq_consumer_options.go @@ -20,7 +20,20 @@ type RabbitMQConsumerOptions struct { ExchangeOptions *RabbitMQExchangeOptions } -func NewDefaultRabbitMQConsumerOptions[T types2.IMessage]() *RabbitMQConsumerOptions { +func NewDefaultRabbitMQConsumerOptions() *RabbitMQConsumerOptions { + return &RabbitMQConsumerOptions{ + ConsumerOptions: &consumer.ConsumerOptions{ExitOnError: false, ConsumerId: ""}, + ConcurrencyLimit: 1, + PrefetchCount: 4, //how many messages we can handle at once + NoLocal: false, + NoWait: true, + BindingOptions: &RabbitMQBindingOptions{}, + ExchangeOptions: &RabbitMQExchangeOptions{Durable: true, Type: types.ExchangeTopic}, + QueueOptions: &RabbitMQQueueOptions{Durable: true}, + } +} + +func NewDefaultRabbitMQConsumerOptionsT[T types2.IMessage]() *RabbitMQConsumerOptions { return &RabbitMQConsumerOptions{ ConsumerOptions: &consumer.ConsumerOptions{ExitOnError: false, ConsumerId: ""}, ConcurrencyLimit: 1, diff --git a/pkg/rabbitmq/consumer/options/rabbitmq_consumer_options_builder.go b/pkg/rabbitmq/consumer/options/rabbitmq_consumer_options_builder.go index f233c1e7..bcdeb99f 100644 --- a/pkg/rabbitmq/consumer/options/rabbitmq_consumer_options_builder.go +++ b/pkg/rabbitmq/consumer/options/rabbitmq_consumer_options_builder.go @@ -5,105 +5,109 @@ import ( "github.com/mehdihadeli/store-golang-microservice-sample/pkg/rabbitmq/types" ) -type RabbitMQConsumerOptionsBuilder[T types2.IMessage] struct { +type RabbitMQConsumerOptionsBuilder struct { rabbitmqConsumerOptions *RabbitMQConsumerOptions } -func NewRabbitMQConsumerOptionsBuilder[T types2.IMessage]() *RabbitMQConsumerOptionsBuilder[T] { - return &RabbitMQConsumerOptionsBuilder[T]{rabbitmqConsumerOptions: NewDefaultRabbitMQConsumerOptions[T]()} +func NewRabbitMQConsumerOptionsBuilderT[T types2.IMessage]() *RabbitMQConsumerOptionsBuilder { + return &RabbitMQConsumerOptionsBuilder{rabbitmqConsumerOptions: NewDefaultRabbitMQConsumerOptionsT[T]()} } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithExitOnError(exitOnError bool) *RabbitMQConsumerOptionsBuilder[T] { +func NewRabbitMQConsumerOptionsBuilder() *RabbitMQConsumerOptionsBuilder { + return &RabbitMQConsumerOptionsBuilder{rabbitmqConsumerOptions: NewDefaultRabbitMQConsumerOptions()} +} + +func (b *RabbitMQConsumerOptionsBuilder) WithExitOnError(exitOnError bool) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.ExitOnError = exitOnError return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithAutoAck(ack bool) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithAutoAck(ack bool) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.AutoAck = ack return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithNoLocal(noLocal bool) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithNoLocal(noLocal bool) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.NoLocal = noLocal return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithNoWait(noWait bool) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithNoWait(noWait bool) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.NoWait = noWait return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithConcurrencyLimit(limit int) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithConcurrencyLimit(limit int) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.ConcurrencyLimit = limit return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithPrefetchCount(count int) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithPrefetchCount(count int) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.PrefetchCount = count return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithConsumerId(consumerId string) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithConsumerId(consumerId string) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.ConsumerId = consumerId return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithQueueName(queueName string) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithQueueName(queueName string) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.QueueOptions.Name = queueName return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithDurable(durable bool) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithDurable(durable bool) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.ExchangeOptions.Durable = durable b.rabbitmqConsumerOptions.QueueOptions.Durable = durable return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithAutoDeleteQueue(autoDelete bool) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithAutoDeleteQueue(autoDelete bool) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.QueueOptions.AutoDelete = autoDelete return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithExclusiveQueue(exclusive bool) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithExclusiveQueue(exclusive bool) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.QueueOptions.Exclusive = exclusive return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithQueueArgs(args map[string]any) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithQueueArgs(args map[string]any) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.QueueOptions.Args = args return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithExchangeName(exchangeName string) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithExchangeName(exchangeName string) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.ExchangeOptions.Name = exchangeName return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithAutoDeleteExchange(autoDelete bool) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithAutoDeleteExchange(autoDelete bool) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.ExchangeOptions.AutoDelete = autoDelete return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithExchangeType(exchangeType types.ExchangeType) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithExchangeType(exchangeType types.ExchangeType) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.ExchangeOptions.Type = exchangeType return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithExchangeArgs(args map[string]any) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithExchangeArgs(args map[string]any) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.ExchangeOptions.Args = args return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithRoutingKey(routingKey string) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithRoutingKey(routingKey string) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.BindingOptions.RoutingKey = routingKey return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) WithBindingArgs(args map[string]any) *RabbitMQConsumerOptionsBuilder[T] { +func (b *RabbitMQConsumerOptionsBuilder) WithBindingArgs(args map[string]any) *RabbitMQConsumerOptionsBuilder { b.rabbitmqConsumerOptions.BindingOptions.Args = args return b } -func (b *RabbitMQConsumerOptionsBuilder[T]) Build() *RabbitMQConsumerOptions { +func (b *RabbitMQConsumerOptionsBuilder) Build() *RabbitMQConsumerOptions { return b.rabbitmqConsumerOptions } diff --git a/pkg/rabbitmq/consumer/rabbitmq_consumer.go b/pkg/rabbitmq/consumer/rabbitmq_consumer.go index 7dda6231..c60b0389 100644 --- a/pkg/rabbitmq/consumer/rabbitmq_consumer.go +++ b/pkg/rabbitmq/consumer/rabbitmq_consumer.go @@ -3,9 +3,10 @@ package consumer import ( "context" "emperror.dev/errors" + "fmt" "github.com/ahmetb/go-linq/v3" "github.com/avast/retry-go" - "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/serializer" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/logger" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/consumer" @@ -14,6 +15,7 @@ import ( "github.com/mehdihadeli/store-golang-microservice-sample/pkg/rabbitmq/consumer/options" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/rabbitmq/rabbitmqErrors" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/rabbitmq/types" + typeMapper "github.com/mehdihadeli/store-golang-microservice-sample/pkg/reflection/type_mappper" "github.com/rabbitmq/amqp091-go" "reflect" "time" @@ -28,20 +30,22 @@ var ( retryOptions = []retry.Option{retry.Attempts(retryAttempts), retry.Delay(retryDelay), retry.DelayType(retry.BackOffDelay)} ) -type RabbitMQConsumer[T types2.IMessage] struct { +type rabbitMQConsumer[T types2.IMessage] struct { rabbitmqConsumerOptions *options.RabbitMQConsumerOptions connection types.IConnection - handler consumer.ConsumerHandler[T] - pipelines []pipeline.ConsumerPipeline[T] + handlerDefault consumer.ConsumerHandler channel *amqp091.Channel deliveryRoutines chan struct{} // chan should init before using channel eventSerializer serializer.EventSerializer logger logger.Logger ErrChan chan error + handler interface{} + pipelines []pipeline.ConsumerPipeline } -func NewRabbitMQConsumer[T types2.IMessage](eventSerializer serializer.EventSerializer, logger logger.Logger, connection types.IConnection, builderFunc func(builder *options.RabbitMQConsumerOptionsBuilder[T]), handler consumer.ConsumerHandler[T], pipelines ...pipeline.ConsumerPipeline[T]) (consumer.Consumer, error) { - builder := options.NewRabbitMQConsumerOptionsBuilder[T]() +// NewRabbitMQConsumerT create a new generic RabbitMQ consumer +func NewRabbitMQConsumerT[T types2.IMessage](eventSerializer serializer.EventSerializer, logger logger.Logger, connection types.IConnection, builderFunc func(builder *options.RabbitMQConsumerOptionsBuilder), handler consumer.ConsumerHandlerT[T], pipelines ...pipeline.ConsumerPipeline) (consumer.Consumer, error) { + builder := options.NewRabbitMQConsumerOptionsBuilderT[T]() if builderFunc != nil { builderFunc(builder) } @@ -49,21 +53,45 @@ func NewRabbitMQConsumer[T types2.IMessage](eventSerializer serializer.EventSeri consumerConfig := builder.Build() deliveryRoutines := make(chan struct{}, consumerConfig.ConcurrencyLimit) - cons := &RabbitMQConsumer[T]{ + cons := &rabbitMQConsumer[T]{ + eventSerializer: eventSerializer, + logger: logger, rabbitmqConsumerOptions: consumerConfig, deliveryRoutines: deliveryRoutines, ErrChan: make(chan error), connection: connection, handler: handler, pipelines: pipelines, + } + + return cons, nil +} + +// NewRabbitMQConsumer create a new RabbitMQ consumer +func NewRabbitMQConsumer(eventSerializer serializer.EventSerializer, logger logger.Logger, connection types.IConnection, builderFunc func(builder *options.RabbitMQConsumerOptionsBuilder), handler consumer.ConsumerHandler, pipelines ...pipeline.ConsumerPipeline) (consumer.Consumer, error) { + builder := options.NewRabbitMQConsumerOptionsBuilder() + if builderFunc != nil { + builderFunc(builder) + } + + consumerConfig := builder.Build() + deliveryRoutines := make(chan struct{}, consumerConfig.ConcurrencyLimit) + + cons := &rabbitMQConsumer[types2.IMessage]{ eventSerializer: eventSerializer, logger: logger, + rabbitmqConsumerOptions: consumerConfig, + deliveryRoutines: deliveryRoutines, + ErrChan: make(chan error), + connection: connection, + handler: handler, + pipelines: pipelines, } return cons, nil } -func (r *RabbitMQConsumer[T]) Consume(ctx context.Context) error { +func (r *rabbitMQConsumer[T]) Consume(ctx context.Context) error { //https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/receive.go if r.connection == nil { return errors.New("connection is nil") @@ -144,7 +172,7 @@ func (r *RabbitMQConsumer[T]) Consume(ctx context.Context) error { return } //https://github.com/streadway/amqp/blob/2aa28536587a0090d8280eed56c75867ce7e93ec/delivery.go#L62 - r.handleReceived(ctx, msg, r.handler) + r.handleReceived(ctx, msg) } } }() @@ -153,14 +181,13 @@ func (r *RabbitMQConsumer[T]) Consume(ctx context.Context) error { return nil } -func (r *RabbitMQConsumer[T]) UnConsume(ctx context.Context) error { - if r.channel != nil && r.channel.IsClosed() == false { - err := r.channel.Cancel(r.rabbitmqConsumerOptions.ConsumerId, false) - if err != nil { - return err +func (r *rabbitMQConsumer[T]) UnConsume(ctx context.Context) error { + defer func() { + if r.channel != nil && r.channel.IsClosed() == false { + r.channel.Cancel(r.rabbitmqConsumerOptions.ConsumerId, false) + r.channel.Close() } - r.channel.Close() - } + }() done := make(chan struct{}, 1) @@ -175,12 +202,10 @@ func (r *RabbitMQConsumer[T]) UnConsume(ctx context.Context) error { select { case <-done: return nil - case <-ctx.Done(): - return ctx.Err() } } -func (r *RabbitMQConsumer[T]) reConsumeOnDropConnection(ctx context.Context) { +func (r *rabbitMQConsumer[T]) reConsumeOnDropConnection(ctx context.Context) { go func() { for { select { @@ -200,13 +225,16 @@ func (r *RabbitMQConsumer[T]) reConsumeOnDropConnection(ctx context.Context) { }() } -func (r *RabbitMQConsumer[T]) handleReceived(ctx context.Context, delivery amqp091.Delivery, handler consumer.ConsumerHandler[T]) { +func (r *rabbitMQConsumer[T]) handleReceived(ctx context.Context, delivery amqp091.Delivery) { // for ensuring our handler execute completely after shutdown r.deliveryRoutines <- struct{}{} defer func() { <-r.deliveryRoutines }() consumeContext := r.createConsumeContext(delivery) + if consumeContext == nil { + return + } var ack func() var nack func() @@ -228,24 +256,37 @@ func (r *RabbitMQConsumer[T]) handleReceived(ctx context.Context, delivery amqp0 } } - r.handle(ctx, ack, nack, consumeContext, handler) + r.handle(ctx, ack, nack, consumeContext) } -func (r *RabbitMQConsumer[T]) handle(ctx context.Context, ack func(), nack func(), messageConsumeContext types2.IMessageConsumeContext[T], handler consumer.ConsumerHandler[T]) { +func (r *rabbitMQConsumer[T]) handle(ctx context.Context, ack func(), nack func(), messageConsumeContext types2.MessageConsumeContextBase) { err := retry.Do(func() error { - if len(r.pipelines) > 0 { - var reversPipes = r.reversOrder(r.pipelines) + defaultHandler, ok := r.handler.(consumer.ConsumerHandler) - var lastHandler pipeline.ConsumerHandlerFunc = func() error { - return handler.Handle(ctx, messageConsumeContext) + if r.pipelines != nil && len(r.pipelines) > 0 { + var reversPipes = r.reversOrder(r.pipelines) + var lastHandler pipeline.ConsumerHandlerFunc + if ok { + lastHandler = func() error { + return defaultHandler.Handle(ctx, messageConsumeContext.(types2.MessageConsumeContext)) + } + } else { + lastHandler = func() error { + handler := r.handler.(consumer.ConsumerHandlerT[T]) + return handler.Handle(ctx, messageConsumeContext.(types2.MessageConsumeContextT[T])) + } } - aggregateResult := linq.From(reversPipes).AggregateWithSeedT(lastHandler, func(next pipeline.ConsumerHandlerFunc, pipe pipeline.ConsumerPipeline[T]) pipeline.ConsumerHandlerFunc { + aggregateResult := linq.From(reversPipes).AggregateWithSeedT(lastHandler, func(next pipeline.ConsumerHandlerFunc, pipe pipeline.ConsumerPipeline) pipeline.ConsumerHandlerFunc { pipeValue := pipe nexValue := next var handlerFunc pipeline.ConsumerHandlerFunc = func() error { - return pipeValue.Handle(ctx, messageConsumeContext, nexValue) + genericContext, ok := messageConsumeContext.(types2.MessageConsumeContextT[T]) + if ok { + return pipeValue.Handle(ctx, genericContext.ToMessageConsumeContext(), nexValue) + } + return pipeValue.Handle(ctx, messageConsumeContext.(types2.MessageConsumeContext), nexValue) } return handlerFunc }) @@ -258,13 +299,24 @@ func (r *RabbitMQConsumer[T]) handle(ctx context.Context, ack func(), nack func( } return nil } else { - err := handler.Handle(ctx, messageConsumeContext) - return err + if ok { + err := defaultHandler.Handle(ctx, messageConsumeContext.(types2.MessageConsumeContext)) + if err != nil { + return err + } + } else { + handler := r.handler.(consumer.ConsumerHandlerT[T]) + err := handler.Handle(ctx, messageConsumeContext.(types2.MessageConsumeContextT[T])) + if err != nil { + return err + } + } } + return nil }, append(retryOptions, retry.Context(ctx))...) if err != nil { - r.logger.Error("[RabbitMQConsumer.Handle] error in handling consume message of RabbitmqMQ, prepare for nacking message") + r.logger.Error("[rabbitMQConsumer.Handle] error in handling consume message of RabbitmqMQ, prepare for nacking message") if nack != nil && r.rabbitmqConsumerOptions.AutoAck == false { nack() } @@ -273,22 +325,33 @@ func (r *RabbitMQConsumer[T]) handle(ctx context.Context, ack func(), nack func( } } -func (r *RabbitMQConsumer[T]) createConsumeContext(delivery amqp091.Delivery) types2.IMessageConsumeContext[T] { +func (r *rabbitMQConsumer[T]) createConsumeContext(delivery amqp091.Delivery) types2.MessageConsumeContextBase { message := r.deserializeData(delivery.ContentType, delivery.Type, delivery.Body) - var metadata core.Metadata + if reflect.ValueOf(message).IsZero() || reflect.ValueOf(message).IsNil() { + return *new(types2.MessageConsumeContextT[T]) + } + + var meta metadata.Metadata if delivery.Headers != nil { - metadata = core.MapToMetadata(delivery.Headers) + meta = metadata.MapToMetadata(delivery.Headers) } - consumeContext := types2.NewMessageConsumeContext[T](message, metadata, delivery.ContentType, delivery.Type, delivery.Timestamp, delivery.DeliveryTag, delivery.MessageId, delivery.CorrelationId) + if typeMapper.GetTypeFromGeneric[T]() == typeMapper.GetTypeFromGeneric[types2.IMessage]() { + consumeContext := types2.NewMessageConsumeContext(message.(types2.IMessage), meta, delivery.ContentType, delivery.Type, delivery.Timestamp, delivery.DeliveryTag, delivery.MessageId, delivery.CorrelationId) + return consumeContext + } + + consumeContext := types2.NewMessageConsumeContextT[T](message.(T), meta, delivery.ContentType, delivery.Type, delivery.Timestamp, delivery.DeliveryTag, delivery.MessageId, delivery.CorrelationId) return consumeContext } -func (r *RabbitMQConsumer[T]) deserializeData(contentType string, eventType string, body []byte) T { +func (r *rabbitMQConsumer[T]) deserializeData(contentType string, eventType string, body []byte) interface{} { if contentType == "" { contentType = "application/json" } + if body == nil || len(body) == 0 { + r.logger.Error("message body is nil or empty in the consumer") return *new(T) } @@ -296,17 +359,18 @@ func (r *RabbitMQConsumer[T]) deserializeData(contentType string, eventType stri //deserialize, err := r.eventSerializer.DeserializeType(body, typeMapper.GetTypeFromGeneric[T](), contentType) deserialize, err := r.eventSerializer.DeserializeMessage(body, eventType, contentType) // or this to explicit type deserialization --> r.eventSerializer.DeserializeType(body, typeMapper.GetTypeFromGeneric[T](), contentType) / jsonSerializer.UnmarshalT[T](body) if err != nil { - return *new(T) + r.logger.Errorf(fmt.Sprintf("error in deserilizng of type '%s' in the consumer", eventType)) + return nil } - return deserialize.(T) + return deserialize } - return *new(T) + return nil } -func (r *RabbitMQConsumer[T]) reversOrder(values []pipeline.ConsumerPipeline[T]) []pipeline.ConsumerPipeline[T] { - var reverseValues []pipeline.ConsumerPipeline[T] +func (r *rabbitMQConsumer[T]) reversOrder(values []pipeline.ConsumerPipeline) []pipeline.ConsumerPipeline { + var reverseValues []pipeline.ConsumerPipeline for i := len(values) - 1; i >= 0; i-- { reverseValues = append(reverseValues, values[i]) @@ -315,7 +379,7 @@ func (r *RabbitMQConsumer[T]) reversOrder(values []pipeline.ConsumerPipeline[T]) return reverseValues } -func (r *RabbitMQConsumer[T]) existsPipeType(p reflect.Type) bool { +func (r *rabbitMQConsumer[T]) existsPipeType(p reflect.Type) bool { for _, pipe := range r.pipelines { if reflect.TypeOf(pipe) == p { return true diff --git a/pkg/rabbitmq/consumer/rabbitmq_consumer_test.go b/pkg/rabbitmq/consumer/rabbitmq_consumer_test.go index a1dc3e30..b44b4dbf 100644 --- a/pkg/rabbitmq/consumer/rabbitmq_consumer_test.go +++ b/pkg/rabbitmq/consumer/rabbitmq_consumer_test.go @@ -35,15 +35,15 @@ func Test_Consume_Message(t *testing.T) { return } - rabbitmqConsumer, err := NewRabbitMQConsumer[*ProducerConsumerMessage]( + rabbitmqConsumer, err := NewRabbitMQConsumerT[*ProducerConsumerMessage]( json.NewJsonEventSerializer(), defaultLogger.Logger, conn, - func(builder *options.RabbitMQConsumerOptionsBuilder[*ProducerConsumerMessage]) { + func(builder *options.RabbitMQConsumerOptionsBuilder) { //builder.WithAutoAck(true) }, NewTestMessageHandler(), - NewPipeline1[*ProducerConsumerMessage]()) + NewPipeline1()) var consumers []consumer.Consumer consumers = append(consumers, rabbitmqConsumer) @@ -74,14 +74,86 @@ func Test_Consume_Message(t *testing.T) { fmt.Println("after 10 second of closing connection") fmt.Println(conn.IsClosed()) - err = rabbitmqProducer.Publish(context.Background(), NewProducerConsumerMessage("test"), nil) + err = rabbitmqProducer.PublishMessage(context.Background(), NewProducerConsumerMessage("test"), nil) for err != nil { - err = rabbitmqProducer.Publish(context.Background(), NewProducerConsumerMessage("test"), nil) + err = rabbitmqProducer.PublishMessage(context.Background(), NewProducerConsumerMessage("test"), nil) } - err = rabbitmqProducer.Publish(context.Background(), NewProducerConsumerMessage("test"), nil) + err = rabbitmqProducer.PublishMessage(context.Background(), NewProducerConsumerMessage("test"), nil) for err != nil { - err = rabbitmqProducer.Publish(context.Background(), NewProducerConsumerMessage("test"), nil) + err = rabbitmqProducer.PublishMessage(context.Background(), NewProducerConsumerMessage("test"), nil) + } + + time.Sleep(time.Second * 5) + fmt.Println(conn.IsClosed()) + fmt.Println(conn.IsConnected()) +} + +func Test_Consume_Default_Message(t *testing.T) { + test.SkipCI(t) + conn, err := types.NewRabbitMQConnection(context.Background(), &config.RabbitMQConfig{ + RabbitMqHostOptions: &config.RabbitMqHostOptions{ + UserName: "guest", + Password: "guest", + HostName: "localhost", + Port: 5672, + }, + }) + if err != nil { + t.Fatal(err) + return + } + + rabbitmqConsumer, err := NewRabbitMQConsumer( + json.NewJsonEventSerializer(), + defaultLogger.Logger, + conn, + func(builder *options.RabbitMQConsumerOptionsBuilder) { + builder.WithExchangeType(types.ExchangeTopic). + WithExchangeName("producer_consumer_message"). + WithQueueName("producer_consumer_message"). + WithRoutingKey("producer_consumer_message") + }, + NewTestMessageHandler2(), + NewPipeline1()) + + var consumers []consumer.Consumer + consumers = append(consumers, rabbitmqConsumer) + + b := bus.NewRabbitMQBus(defaultLogger.Logger, consumers) + err = b.Start(context.Background()) + if err != nil { + return + } + + rabbitmqProducer, err := producer.NewRabbitMQProducer( + conn, + func(builder *options2.RabbitMQProducerOptionsBuilder) { + builder.WithExchangeType(types.ExchangeTopic) + }, + defaultLogger.Logger, + json.NewJsonEventSerializer()) + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second * 5) + + fmt.Println("closing connection") + conn.Close() + fmt.Println(conn.IsClosed()) + + time.Sleep(time.Second * 10) + fmt.Println("after 10 second of closing connection") + fmt.Println(conn.IsClosed()) + + err = rabbitmqProducer.PublishMessage(context.Background(), NewProducerConsumerMessage("test"), nil) + for err != nil { + err = rabbitmqProducer.PublishMessage(context.Background(), NewProducerConsumerMessage("test"), nil) + } + + err = rabbitmqProducer.PublishMessage(context.Background(), NewProducerConsumerMessage("test"), nil) + for err != nil { + err = rabbitmqProducer.PublishMessage(context.Background(), NewProducerConsumerMessage("test"), nil) } time.Sleep(time.Second * 5) @@ -101,11 +173,11 @@ func NewProducerConsumerMessage(data string) *ProducerConsumerMessage { } } -// /////////// ConsumerHandler +// /////////// ConsumerHandlerT type TestMessageHandler struct { } -func (t *TestMessageHandler) Handle(ctx context.Context, consumeContext types2.IMessageConsumeContext[*ProducerConsumerMessage]) error { +func (t *TestMessageHandler) Handle(ctx context.Context, consumeContext types2.MessageConsumeContextT[*ProducerConsumerMessage]) error { message := consumeContext.Message() fmt.Println(message) @@ -116,14 +188,29 @@ func NewTestMessageHandler() *TestMessageHandler { return &TestMessageHandler{} } +type TestMessageHandler2 struct { +} + +func (t *TestMessageHandler2) Handle(ctx context.Context, consumeContext types2.MessageConsumeContext) error { + message := consumeContext.Message() + fmt.Println(message) + + return nil +} + +func NewTestMessageHandler2() *TestMessageHandler2 { + return &TestMessageHandler2{} +} + // /////////////// ConsumerPipeline -type Pipeline1[T types2.IMessage] struct { +type Pipeline1 struct { } -func NewPipeline1[T types2.IMessage]() pipeline.ConsumerPipeline[T] { - return &Pipeline1[T]{} +func NewPipeline1() pipeline.ConsumerPipeline { + return &Pipeline1{} } -func (p Pipeline1[T]) Handle(ctx context.Context, consumerContext types2.IMessageConsumeContext[T], next pipeline.ConsumerHandlerFunc) error { + +func (p Pipeline1) Handle(ctx context.Context, consumerContext types2.MessageConsumeContext, next pipeline.ConsumerHandlerFunc) error { fmt.Println("PipelineBehaviourTest.Handled") fmt.Println(fmt.Sprintf("pipeline got a message with id '%s'", consumerContext.Message().GeMessageId())) diff --git a/pkg/rabbitmq/producer/rabbitmq_producer.go b/pkg/rabbitmq/producer/rabbitmq_producer.go index 029388eb..bbd203b8 100644 --- a/pkg/rabbitmq/producer/rabbitmq_producer.go +++ b/pkg/rabbitmq/producer/rabbitmq_producer.go @@ -3,11 +3,9 @@ package producer import ( "context" "emperror.dev/errors" - "fmt" - "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/metadata" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/serializer" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/logger" - messageHeader "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/message_header" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/producer" types2 "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/types" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/utils" @@ -34,11 +32,11 @@ func NewRabbitMQProducer(connection types.IConnection, builderFunc func(builder return &rabbitMQProducer{logger: logger, connection: connection, eventSerializer: eventSerializer, rabbitmqProducerOptions: builder.Build()}, nil } -func (r *rabbitMQProducer) Publish(ctx context.Context, message types2.IMessage, metadata core.Metadata) error { - return r.PublishWithTopicName(ctx, message, metadata, "") +func (r *rabbitMQProducer) PublishMessage(ctx context.Context, message types2.IMessage, meta metadata.Metadata) error { + return r.PublishMessageWithTopicName(ctx, message, meta, "") } -func (r *rabbitMQProducer) PublishWithTopicName(ctx context.Context, message types2.IMessage, metadata core.Metadata, topicOrExchangeName string) error { +func (r *rabbitMQProducer) PublishMessageWithTopicName(ctx context.Context, message types2.IMessage, meta metadata.Metadata, topicOrExchangeName string) error { //https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/publisher_confirms.go if r.connection == nil { return errors.New("connection is nil") @@ -55,18 +53,13 @@ func (r *rabbitMQProducer) PublishWithTopicName(ctx context.Context, message typ } defer channel.Close() - if message.GetEventTypeName() == "" { - message.SetEventTypeName(typeMapper.GetTypeName(message)) // just message type name not full type name because in other side package name for type could be different) - } - metadata = getMetadata(message, metadata) + meta = getMetadata(message, meta) serializedObj, err := r.eventSerializer.Serialize(message) if err != nil { return err } - fmt.Println(string(serializedObj.Data)) - var exchange string if topicOrExchangeName != "" { @@ -88,10 +81,10 @@ func (r *rabbitMQProducer) PublishWithTopicName(ctx context.Context, message typ channel.NotifyPublish(confirms) props := amqp091.Publishing{ - CorrelationId: message.GetCorrelationId(), + CorrelationId: meta.GetCorrelationId(), MessageId: message.GeMessageId(), Timestamp: time.Now(), - Headers: core.MetadataToMap(metadata), + Headers: metadata.MetadataToMap(meta), Type: message.GetEventTypeName(), //typeMapper.GetTypeName(message) - just message type name not full type name because in other side package name for type could be different ContentType: serializedObj.ContentType, Body: serializedObj.Data, @@ -117,27 +110,30 @@ func (r *rabbitMQProducer) PublishWithTopicName(ctx context.Context, message typ return nil } -func getMetadata(message types2.IMessage, metadata core.Metadata) core.Metadata { - metadata = core.FromMetadata(metadata) +func getMetadata(message types2.IMessage, meta metadata.Metadata) metadata.Metadata { + meta = metadata.FromMetadata(meta) + + if message.GetEventTypeName() == "" { + message.SetEventTypeName(typeMapper.GetTypeName(message)) // just message type name not full type name because in other side package name for type could be different) + } + meta.SetMessageType(message.GetEventTypeName()) - if metadata.ExistsKey(messageHeader.MessageId) == false { - metadata.SetValue(messageHeader.MessageId, message.GeMessageId()) + if meta.GetMessageId() == "" { + meta.SetMessageId(message.GeMessageId()) } - if metadata.ExistsKey(messageHeader.Created) == false { - metadata.SetValue(messageHeader.Created, message.GetCreated()) + if meta.GetMessageCreated() == *new(time.Time) { + meta.SetMessageCreated(message.GetCreated()) } - if metadata.ExistsKey(messageHeader.CorrelationId) == false { + if meta.GetCorrelationId() == "" { cid := uuid.NewV4().String() - metadata.SetValue(messageHeader.CorrelationId, cid) - message.SetCorrelationId(cid) + meta.SetCorrelationId(cid) } - metadata.SetValue(messageHeader.Name, utils.GetMessageName(message)) - metadata.SetValue(messageHeader.Type, message.GetEventTypeName()) + meta.SetMessageName(utils.GetMessageName(message)) - return metadata + return meta } func (r *rabbitMQProducer) ensureExchange(channel *amqp091.Channel, exchangeName string) error { diff --git a/pkg/rabbitmq/producer/rabbitmq_producer_test.go b/pkg/rabbitmq/producer/rabbitmq_producer_test.go index e7e476d9..7ef57616 100644 --- a/pkg/rabbitmq/producer/rabbitmq_producer_test.go +++ b/pkg/rabbitmq/producer/rabbitmq_producer_test.go @@ -35,7 +35,7 @@ func Test_Publish_Message(t *testing.T) { t.Fatal(err) } - err = rabbitmqProducer.Publish(context.Background(), NewProducerMessage("test"), nil) + err = rabbitmqProducer.PublishMessage(context.Background(), NewProducerMessage("test"), nil) if err != nil { return } diff --git a/pkg/test/messaging/consumer/rabbitmq_fake_consumer.go b/pkg/test/messaging/consumer/rabbitmq_fake_consumer.go new file mode 100644 index 00000000..f4e2e9b4 --- /dev/null +++ b/pkg/test/messaging/consumer/rabbitmq_fake_consumer.go @@ -0,0 +1,37 @@ +package consumer + +import ( + "context" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/core/serializer" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/logger" + consumer2 "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/consumer" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/types" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/rabbitmq/consumer" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/rabbitmq/consumer/options" + types2 "github.com/mehdihadeli/store-golang-microservice-sample/pkg/rabbitmq/types" +) + +type RabbitMQFakeTestConsumer struct { + isHandled bool + consumer2.Consumer +} + +func NewRabbitMQFakeTestConsumer(eventSerializer serializer.EventSerializer, logger logger.Logger, connection types2.IConnection, builderFunc func(builder *options.RabbitMQConsumerOptionsBuilder)) *RabbitMQFakeTestConsumer { + fakeConsumer := &RabbitMQFakeTestConsumer{} + + t, err := consumer.NewRabbitMQConsumer(eventSerializer, logger, connection, builderFunc, fakeConsumer) + if err != nil { + return nil + } + fakeConsumer.Consumer = t + return fakeConsumer +} + +func (f *RabbitMQFakeTestConsumer) Handle(ctx context.Context, consumeContext types.MessageConsumeContext) error { + f.isHandled = true + return nil +} + +func (f *RabbitMQFakeTestConsumer) IsHandled() bool { + return f.isHandled +} diff --git a/pkg/test/messaging/fake_message.go b/pkg/test/messaging/fake_message.go new file mode 100644 index 00000000..e4d305fe --- /dev/null +++ b/pkg/test/messaging/fake_message.go @@ -0,0 +1,14 @@ +package messaging + +import ( + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/messaging/types" + uuid "github.com/satori/go.uuid" +) + +type FakeMessage struct { + *types.Message +} + +func NewFakeMessage() *FakeMessage { + return &FakeMessage{Message: types.NewMessage(uuid.NewV4().String())} +} diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 2e280e83..9facb92b 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -8,5 +8,6 @@ import ( func SkipCI(t *testing.T) { if os.Getenv("CI") != "" { t.Skip("Skipping testing in CI environment") + return } } diff --git a/pkg/web/workers_runner.go b/pkg/web/workers_runner.go index 2bf5cbbd..77d47239 100644 --- a/pkg/web/workers_runner.go +++ b/pkg/web/workers_runner.go @@ -25,6 +25,12 @@ func (r *WorkersRunner) Start(ctx context.Context) chan error { r.errChan <- e return case <-ctx.Done(): + stopErr := r.Stop(ctx) + if stopErr != nil { + r.errChan <- stopErr + return + } + return } } }() diff --git a/services/catalogs/read_service/config/config.development.json b/services/catalogs/read_service/config/config.development.json index a15656cb..99e8c541 100644 --- a/services/catalogs/read_service/config/config.development.json +++ b/services/catalogs/read_service/config/config.development.json @@ -5,10 +5,12 @@ "timeout": 20 }, "grpc": { + "name":"catalogs_read_service", "port": ":6004", "development": true }, "http": { + "name":"catalogs_read_service", "port": ":7001", "development": true, "timeout": 30, diff --git a/services/catalogs/read_service/config/config.test.json b/services/catalogs/read_service/config/config.test.json index a15656cb..1a2ba9c1 100644 --- a/services/catalogs/read_service/config/config.test.json +++ b/services/catalogs/read_service/config/config.test.json @@ -5,10 +5,12 @@ "timeout": 20 }, "grpc": { + "name": "catalogs_read_service", "port": ":6004", "development": true }, "http": { + "name": "catalogs_read_service", "port": ":7001", "development": true, "timeout": 30, diff --git a/services/catalogs/read_service/internal/products/configurations/consumers/consumers_config.go b/services/catalogs/read_service/internal/products/configurations/consumers/consumers_config.go index fdd16ca7..e4cfd030 100644 --- a/services/catalogs/read_service/internal/products/configurations/consumers/consumers_config.go +++ b/services/catalogs/read_service/internal/products/configurations/consumers/consumers_config.go @@ -16,33 +16,33 @@ func ConfigConsumers(infra *infrastructure.InfrastructureConfigurations) error { //add custom message type mappings //utils.RegisterCustomMessageTypesToRegistrty(map[string]types.IMessage{"productCreatedV1": &creatingProductIntegration.ProductCreatedV1{}}) - productCreatedConsumer, err := rabbitmqConsumer.NewRabbitMQConsumer[*creatingProductIntegration.ProductCreatedV1]( + productCreatedConsumer, err := rabbitmqConsumer.NewRabbitMQConsumerT[*creatingProductIntegration.ProductCreatedV1]( infra.EventSerializer, infra.Log, infra.RabbitMQConnection, - func(builder *options.RabbitMQConsumerOptionsBuilder[*creatingProductIntegration.ProductCreatedV1]) {}, + func(builder *options.RabbitMQConsumerOptionsBuilder) {}, creatingProductIntegration.NewProductCreatedConsumer(consumerBase)) if err != nil { return err } infra.Consumers = append(infra.Consumers, productCreatedConsumer) - productDeletedConsumer, err := rabbitmqConsumer.NewRabbitMQConsumer[*deletingProductIntegration.ProductDeletedV1]( + productDeletedConsumer, err := rabbitmqConsumer.NewRabbitMQConsumerT[*deletingProductIntegration.ProductDeletedV1]( infra.EventSerializer, infra.Log, infra.RabbitMQConnection, - func(builder *options.RabbitMQConsumerOptionsBuilder[*deletingProductIntegration.ProductDeletedV1]) {}, + func(builder *options.RabbitMQConsumerOptionsBuilder) {}, deletingProductIntegration.NewProductDeletedConsumer(consumerBase)) if err != nil { return err } infra.Consumers = append(infra.Consumers, productDeletedConsumer) - productUpdatedConsumer, err := rabbitmqConsumer.NewRabbitMQConsumer[*updatingProductIntegration.ProductUpdatedV1]( + productUpdatedConsumer, err := rabbitmqConsumer.NewRabbitMQConsumerT[*updatingProductIntegration.ProductUpdatedV1]( infra.EventSerializer, infra.Log, infra.RabbitMQConnection, - func(builder *options.RabbitMQConsumerOptionsBuilder[*updatingProductIntegration.ProductUpdatedV1]) {}, + func(builder *options.RabbitMQConsumerOptionsBuilder) {}, updatingProductIntegration.NewProductUpdatedConsumer(consumerBase)) if err != nil { return err diff --git a/services/catalogs/read_service/internal/products/features/creating_product/events/integration/external/v1/product_created_consumer.go b/services/catalogs/read_service/internal/products/features/creating_product/events/integration/external/v1/product_created_consumer.go index d62832e7..139988c8 100644 --- a/services/catalogs/read_service/internal/products/features/creating_product/events/integration/external/v1/product_created_consumer.go +++ b/services/catalogs/read_service/internal/products/features/creating_product/events/integration/external/v1/product_created_consumer.go @@ -24,7 +24,7 @@ func NewProductCreatedConsumer(productConsumerBase *delivery.ProductConsumersBas return &productCreatedConsumer{productConsumerBase} } -func (c *productCreatedConsumer) Handle(ctx context.Context, consumeContext types2.IMessageConsumeContext[*ProductCreatedV1]) error { +func (c *productCreatedConsumer) Handle(ctx context.Context, consumeContext types2.MessageConsumeContextT[*ProductCreatedV1]) error { if consumeContext.Message() == nil { return nil } diff --git a/services/catalogs/read_service/internal/products/features/deleting_products/events/integration/external/v1/product_deleted_consumer.go b/services/catalogs/read_service/internal/products/features/deleting_products/events/integration/external/v1/product_deleted_consumer.go index f57c0117..fd30f79a 100644 --- a/services/catalogs/read_service/internal/products/features/deleting_products/events/integration/external/v1/product_deleted_consumer.go +++ b/services/catalogs/read_service/internal/products/features/deleting_products/events/integration/external/v1/product_deleted_consumer.go @@ -24,7 +24,7 @@ func NewProductDeletedConsumer(productConsumerBase *delivery.ProductConsumersBas return &productDeletedConsumer{productConsumerBase} } -func (c *productDeletedConsumer) Handle(ctx context.Context, consumeContext types2.IMessageConsumeContext[*ProductDeletedV1]) error { +func (c *productDeletedConsumer) Handle(ctx context.Context, consumeContext types2.MessageConsumeContextT[*ProductDeletedV1]) error { if consumeContext.Message() == nil { return nil } diff --git a/services/catalogs/read_service/internal/products/features/updating_products/events/integration/external/v1/product_updated_consumer.go b/services/catalogs/read_service/internal/products/features/updating_products/events/integration/external/v1/product_updated_consumer.go index 40849ec3..48e11d8e 100644 --- a/services/catalogs/read_service/internal/products/features/updating_products/events/integration/external/v1/product_updated_consumer.go +++ b/services/catalogs/read_service/internal/products/features/updating_products/events/integration/external/v1/product_updated_consumer.go @@ -24,7 +24,7 @@ func NewProductUpdatedConsumer(productConsumerBase *delivery.ProductConsumersBas return &productUpdatedConsumer{productConsumerBase} } -func (c *productUpdatedConsumer) Handle(ctx context.Context, consumeContext types2.IMessageConsumeContext[*ProductUpdatedV1]) error { +func (c *productUpdatedConsumer) Handle(ctx context.Context, consumeContext types2.MessageConsumeContextT[*ProductUpdatedV1]) error { if consumeContext.Message() == nil { return nil } diff --git a/services/catalogs/read_service/internal/shared/server/server.go b/services/catalogs/read_service/internal/shared/server/server.go index e4c060a8..5e8a34af 100644 --- a/services/catalogs/read_service/internal/shared/server/server.go +++ b/services/catalogs/read_service/internal/shared/server/server.go @@ -57,7 +57,7 @@ func (s *Server) Run() error { switch deliveryType { case "http": go func() { - if err := echoServer.RunHttpServer(nil); err != nil { + if err := echoServer.RunHttpServer(ctx, nil); err != nil { s.log.Errorf("(s.RunHttpServer) err: {%v}", err) serverError = err cancel() @@ -67,7 +67,7 @@ func (s *Server) Run() error { case "grpc": go func() { - if err := grpcServer.RunGrpcServer(nil); err != nil { + if err := grpcServer.RunGrpcServer(ctx, nil); err != nil { s.log.Errorf("(s.RunGrpcServer) err: {%v}", err) serverError = err cancel() @@ -94,22 +94,11 @@ func (s *Server) Run() error { } }() + // waiting for app get a canceled or completed signal <-ctx.Done() s.waitForShootDown(constants.WaitShotDownDuration) - switch deliveryType { - case "http": - s.log.Infof("%s is shutting down Http PORT: {%s}", web.GetMicroserviceName(s.cfg), s.cfg.Http.Port) - if err := echoServer.GracefulShutdown(ctx); err != nil { - s.log.Warnf("(Shutdown) err: {%v}", err) - } - case "grpc": - s.log.Infof("%s is shutting down Grpc PORT: {%s}", web.GetMicroserviceName(s.cfg), s.cfg.GRPC.Port) - grpcServer.GracefulShutdown() - } - - backgroundWorkers.Stop(ctx) - + // waiting for shutdown time reached <-s.doneCh s.log.Infof("%s server exited properly", web.GetMicroserviceName(s.cfg)) diff --git a/services/catalogs/read_service/internal/shared/test_fixture/e2e/e2e_test_fixture.go b/services/catalogs/read_service/internal/shared/test_fixture/e2e/e2e_test_fixture.go index 84dd5d1f..6a2e382c 100644 --- a/services/catalogs/read_service/internal/shared/test_fixture/e2e/e2e_test_fixture.go +++ b/services/catalogs/read_service/internal/shared/test_fixture/e2e/e2e_test_fixture.go @@ -45,6 +45,7 @@ func NewE2ETestFixture() *E2ETestFixture { v1Groups := &V1Groups{ProductsGroup: productsV1} + // this should not be in integration test because of cyclic dependencies err := mediatr.ConfigProductsMediator(infrastructures) if err != nil { cancel() @@ -57,6 +58,7 @@ func NewE2ETestFixture() *E2ETestFixture { return nil } + // this should not be in integration test because of cyclic dependencies err = consumers.ConfigConsumers(infrastructures) if err != nil { cancel() diff --git a/services/catalogs/read_service/internal/shared/test_fixture/integration/integration_test_fixture.go b/services/catalogs/read_service/internal/shared/test_fixture/integration/integration_test_fixture.go index cb48eda5..e4aa1125 100644 --- a/services/catalogs/read_service/internal/shared/test_fixture/integration/integration_test_fixture.go +++ b/services/catalogs/read_service/internal/shared/test_fixture/integration/integration_test_fixture.go @@ -2,16 +2,19 @@ package integration import ( "context" + "emperror.dev/errors" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/constants" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/logger/defaultLogger" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/rabbitmq/consumer/options" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/test/messaging/consumer" webWoker "github.com/mehdihadeli/store-golang-microservice-sample/pkg/web" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/read_service/config" - "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/read_service/internal/products/configurations/consumers" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/read_service/internal/products/configurations/mappings" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/read_service/internal/products/contracts" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/read_service/internal/products/data/repositories" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/read_service/internal/shared/configurations/infrastructure" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/read_service/internal/shared/web/workers" + "time" ) type IntegrationTestFixture struct { @@ -40,12 +43,6 @@ func NewIntegrationTestFixture() *IntegrationTestFixture { return nil } - err = consumers.ConfigConsumers(infrastructures) - if err != nil { - cancel() - return nil - } - workersRunner := webWoker.NewWorkersRunner([]webWoker.Worker{ workers.NewRabbitMQWorkerWorker(infrastructures), }) @@ -77,3 +74,35 @@ func (e *IntegrationTestFixture) Run() { } }() } + +func (e *IntegrationTestFixture) FakeConsumer(messageName string) *consumer.RabbitMQFakeTestConsumer { + fakeConsumer := consumer.NewRabbitMQFakeTestConsumer( + e.EventSerializer, + e.Log, + e.RabbitMQConnection, + func(builder *options.RabbitMQConsumerOptionsBuilder) { + builder.WithExchangeName(messageName).WithQueueName(messageName).WithRoutingKey(messageName) + }) + + e.Consumers = append(e.Consumers, fakeConsumer) + + return fakeConsumer +} + +func (e *IntegrationTestFixture) WaitUntilConditionMet(conditionToMet func() bool) error { + timeout := 20 * time.Second + + startTime := time.Now() + timeOutExpired := false + meet := conditionToMet() + for meet == false { + if timeOutExpired { + return errors.New("Condition not met for the test, timeout exceeded") + } + time.Sleep(time.Second * 2) + meet = conditionToMet() + timeOutExpired = time.Now().Sub(startTime) > timeout + } + + return nil +} diff --git a/services/catalogs/write_service/config/config.development.json b/services/catalogs/write_service/config/config.development.json index 350811b2..42ef87ef 100644 --- a/services/catalogs/write_service/config/config.development.json +++ b/services/catalogs/write_service/config/config.development.json @@ -5,10 +5,12 @@ "timeout": 20 }, "grpc": { + "name":"catalogs_write_service", "port": ":6003", "development": true }, "http": { + "name":"catalogs_write_service", "port": ":7000", "development": true, "timeout": 30, diff --git a/services/catalogs/write_service/internal/products/features/creating_product/commands/v1/create_product_handler.go b/services/catalogs/write_service/internal/products/features/creating_product/commands/v1/create_product_handler.go index 4357fb4e..6d75133b 100644 --- a/services/catalogs/write_service/internal/products/features/creating_product/commands/v1/create_product_handler.go +++ b/services/catalogs/write_service/internal/products/features/creating_product/commands/v1/create_product_handler.go @@ -55,7 +55,7 @@ func (c *CreateProductHandler) Handle(ctx context.Context, command *CreateProduc productCreated := v1.NewProductCreatedV1(productDto) - err = c.rabbitmqProducer.Publish(ctx, productCreated, nil) + err = c.rabbitmqProducer.PublishMessage(ctx, productCreated, nil) if err != nil { return nil, tracing.TraceWithErr(span, customErrors.NewApplicationErrorWrap(err, "[CreateProductHandler.PublishMessage] error in publishing ProductCreated integration event")) } diff --git a/services/catalogs/write_service/internal/products/features/deleting_product/commands/v1/delete_product_handler.go b/services/catalogs/write_service/internal/products/features/deleting_product/commands/v1/delete_product_handler.go index a2b77d94..bb7a20d5 100644 --- a/services/catalogs/write_service/internal/products/features/deleting_product/commands/v1/delete_product_handler.go +++ b/services/catalogs/write_service/internal/products/features/deleting_product/commands/v1/delete_product_handler.go @@ -37,7 +37,7 @@ func (c *DeleteProductHandler) Handle(ctx context.Context, command *DeleteProduc } productDeleted := v1.NewProductDeletedV1(command.ProductID.String()) - err := c.rabbitmqProducer.Publish(ctx, productDeleted, nil) + err := c.rabbitmqProducer.PublishMessage(ctx, productDeleted, nil) if err != nil { return nil, tracing.TraceWithErr(span, customErrors.NewApplicationErrorWrap(err, "[DeleteProductHandler_Handle.PublishMessage] error in publishing 'ProductDeleted' message")) } diff --git a/services/catalogs/write_service/internal/products/features/updating_product/commands/v1/update_product_handler.go b/services/catalogs/write_service/internal/products/features/updating_product/commands/v1/update_product_handler.go index 0742c9c8..a9d9908a 100644 --- a/services/catalogs/write_service/internal/products/features/updating_product/commands/v1/update_product_handler.go +++ b/services/catalogs/write_service/internal/products/features/updating_product/commands/v1/update_product_handler.go @@ -60,7 +60,7 @@ func (c *UpdateProductHandler) Handle(ctx context.Context, command *UpdateProduc productUpdated := v1.NewProductUpdatedV1(productDto) - err = c.rabbitmqProducer.Publish(ctx, productUpdated, nil) + err = c.rabbitmqProducer.PublishMessage(ctx, productUpdated, nil) if err != nil { return nil, tracing.TraceWithErr(span, customErrors.NewApplicationErrorWrap(err, "[UpdateProductHandler_Handle.PublishMessage] error in publishing 'ProductUpdated' message")) } diff --git a/services/catalogs/write_service/internal/shared/server/server.go b/services/catalogs/write_service/internal/shared/server/server.go index cdc596dc..ce055d70 100644 --- a/services/catalogs/write_service/internal/shared/server/server.go +++ b/services/catalogs/write_service/internal/shared/server/server.go @@ -56,7 +56,7 @@ func (s *Server) Run() error { switch deliveryType { case "http": go func() { - if err := echoServer.RunHttpServer(nil); err != nil { + if err := echoServer.RunHttpServer(ctx, nil); err != nil { s.log.Errorf("(s.RunHttpServer) err: {%v}", err) serverError = err cancel() @@ -66,7 +66,7 @@ func (s *Server) Run() error { case "grpc": go func() { - if err := grpcServer.RunGrpcServer(nil); err != nil { + if err := grpcServer.RunGrpcServer(ctx, nil); err != nil { s.log.Errorf("(s.RunGrpcServer) err: {%v}", err) serverError = err cancel() @@ -93,22 +93,11 @@ func (s *Server) Run() error { } }() + // waiting for app get a canceled or completed signal <-ctx.Done() s.waitForShootDown(constants.WaitShotDownDuration) - switch deliveryType { - case "http": - s.log.Infof("%s is shutting down Http PORT: {%s}", web.GetMicroserviceName(s.cfg), s.cfg.Http.Port) - if err := echoServer.GracefulShutdown(ctx); err != nil { - s.log.Warnf("(Shutdown) err: {%v}", err) - } - case "grpc": - s.log.Infof("%s is shutting down Grpc PORT: {%s}", web.GetMicroserviceName(s.cfg), s.cfg.GRPC.Port) - grpcServer.GracefulShutdown() - } - - backgroundWorkers.Stop(ctx) - + // waiting for shutdown time reached <-s.doneCh s.log.Infof("%s server exited properly", web.GetMicroserviceName(s.cfg)) diff --git a/services/catalogs/write_service/internal/shared/test_fixtures/integration/integration_test_fixture.go b/services/catalogs/write_service/internal/shared/test_fixtures/integration/integration_test_fixture.go index 0cc070be..a995a6fb 100644 --- a/services/catalogs/write_service/internal/shared/test_fixtures/integration/integration_test_fixture.go +++ b/services/catalogs/write_service/internal/shared/test_fixtures/integration/integration_test_fixture.go @@ -2,14 +2,18 @@ package integration import ( "context" + "emperror.dev/errors" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/constants" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/logger/defaultLogger" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/rabbitmq/consumer/options" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/test/messaging/consumer" webWoker "github.com/mehdihadeli/store-golang-microservice-sample/pkg/web" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/write_service/config" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/write_service/internal/products/configurations/mappings" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/write_service/internal/products/contracts" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/write_service/internal/products/data/repositories" "github.com/mehdihadeli/store-golang-microservice-sample/services/catalogs/write_service/internal/shared/configurations/infrastructure" + "time" ) type IntegrationTestFixture struct { @@ -50,3 +54,35 @@ func NewIntegrationTestFixture() *IntegrationTestFixture { func (e *IntegrationTestFixture) Run() { } + +func (e *IntegrationTestFixture) FakeConsumer(messageName string) *consumer.RabbitMQFakeTestConsumer { + fakeConsumer := consumer.NewRabbitMQFakeTestConsumer( + e.EventSerializer, + e.Log, + e.RabbitMQConnection, + func(builder *options.RabbitMQConsumerOptionsBuilder) { + builder.WithExchangeName(messageName).WithQueueName(messageName).WithRoutingKey(messageName) + }) + + e.Consumers = append(e.Consumers, fakeConsumer) + + return fakeConsumer +} + +func (e *IntegrationTestFixture) WaitUntilConditionMet(conditionToMet func() bool) error { + timeout := 20 * time.Second + + startTime := time.Now() + timeOutExpired := false + meet := conditionToMet() + for meet == false { + if timeOutExpired { + return errors.New("Condition not met for the test, timeout exceeded") + } + time.Sleep(time.Second * 2) + meet = conditionToMet() + timeOutExpired = time.Now().Sub(startTime) > timeout + } + + return nil +} diff --git a/services/orders/config/config.development.json b/services/orders/config/config.development.json index 74067555..f8f6b9c2 100644 --- a/services/orders/config/config.development.json +++ b/services/orders/config/config.development.json @@ -5,10 +5,12 @@ "timeout": 20 }, "grpc": { + "name":"order_service", "port": ":6005", "development": true }, "http": { + "name":"order_service", "port": ":8000", "development": true, "timeout": 30, diff --git a/services/orders/config/config.test.json b/services/orders/config/config.test.json index 74067555..f8f6b9c2 100644 --- a/services/orders/config/config.test.json +++ b/services/orders/config/config.test.json @@ -5,10 +5,12 @@ "timeout": 20 }, "grpc": { + "name":"order_service", "port": ":6005", "development": true }, "http": { + "name":"order_service", "port": ":8000", "development": true, "timeout": 30, diff --git a/services/orders/go.mod b/services/orders/go.mod index c6b7e308..4b3f4ef6 100644 --- a/services/orders/go.mod +++ b/services/orders/go.mod @@ -37,6 +37,7 @@ require ( github.com/ajg/form v1.5.1 // indirect github.com/andybalholm/brotli v1.0.2 // indirect github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de // indirect + github.com/avast/retry-go v3.0.0+incompatible // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/brpaz/echozap v1.1.3 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect diff --git a/services/orders/go.sum b/services/orders/go.sum index 0f35f719..ba4ca54e 100644 --- a/services/orders/go.sum +++ b/services/orders/go.sum @@ -154,6 +154,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/aws/aws-sdk-go v1.17.7/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v1.8.0/go.mod h1:xEFuWz+3TYdlPRuo+CqATbeDWIWyaT5uAPwPaWtgse0= diff --git a/services/orders/internal/orders/features/creating_order/commands/v1/create_order_handler_test.go b/services/orders/internal/orders/features/creating_order/commands/v1/create_order_handler_test.go index d230fc3a..a24748d7 100644 --- a/services/orders/internal/orders/features/creating_order/commands/v1/create_order_handler_test.go +++ b/services/orders/internal/orders/features/creating_order/commands/v1/create_order_handler_test.go @@ -24,6 +24,12 @@ func Test_Create_Order_Command_Handler(t *testing.T) { return } + fakeConsumer := fixture.FakeConsumer("order_created_v_1") + + if err != nil { + return + } + fixture.Run() orderDto := dtos.CreateOrderRequestDto{ @@ -45,5 +51,9 @@ func Test_Create_Order_Command_Handler(t *testing.T) { assert.NotNil(t, result) assert.Equal(t, command.OrderId, result.OrderId) - time.Sleep(time.Second * 10) + time.Sleep(time.Second * 2) + + assert.NoError(t, fixture.WaitUntilConditionMet(func() bool { + return fakeConsumer.IsHandled() + })) } diff --git a/services/orders/internal/orders/projections/mongo_order_projection.go b/services/orders/internal/orders/projections/mongo_order_projection.go index c36f9e70..52779234 100644 --- a/services/orders/internal/orders/projections/mongo_order_projection.go +++ b/services/orders/internal/orders/projections/mongo_order_projection.go @@ -65,7 +65,7 @@ func (m *mongoOrderProjection) onOrderCreated(ctx context.Context, evt *creating orderCreatedEvent := v1.NewOrderCreatedV1(orderReadDto) - err = m.rabbitmqProducer.Publish(ctx, orderCreatedEvent, nil) + err = m.rabbitmqProducer.PublishMessage(ctx, orderCreatedEvent, nil) if err != nil { return tracing.TraceWithErr(span, customErrors.NewApplicationErrorWrap(err, "[mongoOrderProjection_onOrderCreated.PublishMessage] error in publishing OrderCreated integration event")) } diff --git a/services/orders/internal/shared/server/server.go b/services/orders/internal/shared/server/server.go index 73170fff..be3285b8 100644 --- a/services/orders/internal/shared/server/server.go +++ b/services/orders/internal/shared/server/server.go @@ -56,7 +56,7 @@ func (s *Server) Run() error { switch deliveryType { case "http": go func() { - if err := echoServer.RunHttpServer(nil); err != nil { + if err := echoServer.RunHttpServer(ctx, nil); err != nil { s.log.Errorf("(s.RunHttpServer) err: {%v}", err) serverError = err cancel() @@ -66,7 +66,7 @@ func (s *Server) Run() error { case "grpc": go func() { - if err := grpcServer.RunGrpcServer(nil); err != nil { + if err := grpcServer.RunGrpcServer(ctx, nil); err != nil { s.log.Errorf("(s.RunGrpcServer) err: {%v}", err) serverError = err cancel() @@ -93,24 +93,13 @@ func (s *Server) Run() error { } }() + // waiting for app get a canceled or completed signal <-ctx.Done() s.waitForShootDown(constants.WaitShotDownDuration) - switch deliveryType { - case "http": - s.log.Infof("%s is shutting down Http PORT: {%s}", web.GetMicroserviceName(s.cfg), s.cfg.Http.Port) - if err := echoServer.GracefulShutdown(ctx); err != nil { - s.log.Warnf("(Shutdown) err: {%v}", err) - } - case "grpc": - s.log.Infof("%s is shutting down Grpc PORT: {%s}", web.GetMicroserviceName(s.cfg), s.cfg.GRPC.Port) - grpcServer.GracefulShutdown() - } - - backgroundWorkers.Stop(ctx) - + // waiting for shutdown time reached <-s.doneCh - s.log.Infof("%s server exited properly", web.GetMicroserviceName(s.cfg)) + s.log.Infof("microservice %s exited successfully", web.GetMicroserviceName(s.cfg)) return serverError } diff --git a/services/orders/internal/shared/test_fixtures/e2e/e2e_test_fixture.go b/services/orders/internal/shared/test_fixtures/e2e/e2e_test_fixture.go index 5b4151f6..c7d2da36 100644 --- a/services/orders/internal/shared/test_fixtures/e2e/e2e_test_fixture.go +++ b/services/orders/internal/shared/test_fixtures/e2e/e2e_test_fixture.go @@ -49,25 +49,28 @@ func NewE2ETestFixture() *E2ETestFixture { v1Groups := &V1Groups{OrdersGroup: ordersV1} + // this should not be in integration test because of cyclic dependencies err := mediatr.ConfigOrdersMediator(infrastructures) if err != nil { cancel() return nil } - err = mappings.ConfigureMappings() + // this should not be in integration test because of cyclic dependencies + err = consumers.ConfigConsumers(infrastructures) if err != nil { cancel() return nil } - projections.ConfigOrderProjections(infrastructures) - err = consumers.ConfigConsumers(infrastructures) + err = mappings.ConfigureMappings() if err != nil { cancel() return nil } + projections.ConfigOrderProjections(infrastructures) + httpServer := httptest.NewServer(echo) grpcServer := grpcServer.NewGrpcServer(cfg.GRPC, defaultLogger.Logger) @@ -97,7 +100,7 @@ func NewE2ETestFixture() *E2ETestFixture { func (e *E2ETestFixture) Run() { go func() { - if err := e.GrpcServer.RunGrpcServer(nil); err != nil { + if err := e.GrpcServer.RunGrpcServer(e.ctx, nil); err != nil { e.cancel() e.Log.Errorf("(s.RunGrpcServer) err: %v", err) return diff --git a/services/orders/internal/shared/test_fixtures/integration/integration_test_fixture.go b/services/orders/internal/shared/test_fixtures/integration/integration_test_fixture.go index a3c609ca..d8a0e19c 100644 --- a/services/orders/internal/shared/test_fixtures/integration/integration_test_fixture.go +++ b/services/orders/internal/shared/test_fixtures/integration/integration_test_fixture.go @@ -2,13 +2,15 @@ package integration import ( "context" + "emperror.dev/errors" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/constants" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/es/contracts/store" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/eventstroredb" "github.com/mehdihadeli/store-golang-microservice-sample/pkg/logger/defaultLogger" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/rabbitmq/consumer/options" + "github.com/mehdihadeli/store-golang-microservice-sample/pkg/test/messaging/consumer" webWoker "github.com/mehdihadeli/store-golang-microservice-sample/pkg/web" "github.com/mehdihadeli/store-golang-microservice-sample/services/orders/config" - "github.com/mehdihadeli/store-golang-microservice-sample/services/orders/internal/orders/configurations/consumers" "github.com/mehdihadeli/store-golang-microservice-sample/services/orders/internal/orders/configurations/mappings" "github.com/mehdihadeli/store-golang-microservice-sample/services/orders/internal/orders/configurations/projections" "github.com/mehdihadeli/store-golang-microservice-sample/services/orders/internal/orders/contracts/repositories" @@ -24,10 +26,10 @@ type IntegrationTestFixture struct { *infrastructure.InfrastructureConfiguration OrderAggregateStore store.AggregateStore[*aggregate.Order] MongoOrderReadRepository repositories.OrderReadRepository - workersRunner *webWoker.WorkersRunner ctx context.Context cancel context.CancelFunc Cleanup func() + cleanupChan chan struct{} } func NewIntegrationTestFixture() *IntegrationTestFixture { @@ -48,42 +50,74 @@ func NewIntegrationTestFixture() *IntegrationTestFixture { cancel() return nil } - - projections.ConfigOrderProjections(infrastructures) - err = consumers.ConfigConsumers(infrastructures) - if err != nil { - cancel() - return nil - } - workersRunner := webWoker.NewWorkersRunner([]webWoker.Worker{ - workers.NewRabbitMQWorkerWorker(infrastructures), workers.NewEventStoreDBWorker(infrastructures), - }) + projections.ConfigOrderProjections(infrastructures) + cleanupChan := make(chan struct{}) return &IntegrationTestFixture{ + cleanupChan: cleanupChan, Cleanup: func() { - workersRunner.Stop(ctx) + cleanupChan <- struct{}{} cancel() cleanup() }, InfrastructureConfiguration: infrastructures, OrderAggregateStore: orderAggregateStore, MongoOrderReadRepository: mongoOrderReadRepository, - workersRunner: workersRunner, ctx: ctx, cancel: cancel, } } func (e *IntegrationTestFixture) Run() { - workersErr := e.workersRunner.Start(e.ctx) + workersRunner := webWoker.NewWorkersRunner([]webWoker.Worker{ + workers.NewRabbitMQWorkerWorker(e.InfrastructureConfiguration), workers.NewEventStoreDBWorker(e.InfrastructureConfiguration), + }) + + workersErr := workersRunner.Start(e.ctx) go func() { for { select { case _ = <-workersErr: + workersRunner.Stop(e.ctx) e.cancel() return + case <-e.cleanupChan: + workersRunner.Stop(e.ctx) + return } } }() } + +func (e *IntegrationTestFixture) FakeConsumer(messageName string) *consumer.RabbitMQFakeTestConsumer { + fakeConsumer := consumer.NewRabbitMQFakeTestConsumer( + e.EventSerializer, + e.Log, + e.RabbitMQConnection, + func(builder *options.RabbitMQConsumerOptionsBuilder) { + builder.WithExchangeName(messageName).WithQueueName(messageName).WithRoutingKey(messageName) + }) + + e.Consumers = append(e.Consumers, fakeConsumer) + + return fakeConsumer +} + +func (e *IntegrationTestFixture) WaitUntilConditionMet(conditionToMet func() bool) error { + timeout := 20 * time.Second + + startTime := time.Now() + timeOutExpired := false + meet := conditionToMet() + for meet == false { + if timeOutExpired { + return errors.New("Condition not met for the test, timeout exceeded") + } + time.Sleep(time.Second * 2) + meet = conditionToMet() + timeOutExpired = time.Now().Sub(startTime) > timeout + } + + return nil +} diff --git a/services/orders/internal/shared/web/workers/rabbitmq_worker.go b/services/orders/internal/shared/web/workers/rabbitmq_worker.go index 774eef75..fcf732d2 100644 --- a/services/orders/internal/shared/web/workers/rabbitmq_worker.go +++ b/services/orders/internal/shared/web/workers/rabbitmq_worker.go @@ -8,16 +8,15 @@ import ( ) func NewRabbitMQWorkerWorker(infra *infrastructure.InfrastructureConfiguration) web.Worker { - rabbitMQBus := rabbitmqBus.NewRabbitMQBus(infra.Log, infra.Consumers) - + bus := rabbitmqBus.NewRabbitMQBus(infra.Log, infra.Consumers) return web.NewBackgroundWorker(func(ctx context.Context) error { - err := rabbitMQBus.Start(ctx) + err := bus.Start(ctx) if err != nil { infra.Log.Errorf("[RabbitMQWorkerWorker.Start] error in the starting rabbitmq worker: {%v}", err) return err } return nil }, func(ctx context.Context) error { - return rabbitMQBus.Stop(ctx) + return bus.Stop(ctx) }) }