From b4e3c811f696afcbbdfee46fc41d1dcd2a970995 Mon Sep 17 00:00:00 2001 From: alishazaee Date: Tue, 29 Oct 2024 20:28:44 +0330 Subject: [PATCH] feat(source): Implement new source event --- adapter/rabbitmq/adapter.go | 10 ++ cmd/source/faker/consumer/main.go | 60 +++++++ cmd/source/faker/publisher/main.go | 61 +++++++ cmd/source/main.go | 47 +++++- config.yml | 12 ++ config/config.go | 2 + contract/go/source/source.pb.go | 158 +++++++++++++----- contract/protobuf/source/source.proto | 6 + pkg/encoder/newsource.go | 34 ++++ source/config.go | 7 +- source/eventhandler/newsource.go | 72 ++++++++ .../redis/rediswritekey/writekey.go | 33 ++++ source/service/writekey/service.go | 37 ++-- source/service/writekey/service_test.go | 28 ---- 14 files changed, 481 insertions(+), 86 deletions(-) create mode 100644 adapter/rabbitmq/adapter.go create mode 100644 cmd/source/faker/consumer/main.go create mode 100644 cmd/source/faker/publisher/main.go create mode 100644 pkg/encoder/newsource.go create mode 100644 source/eventhandler/newsource.go create mode 100644 source/repository/redis/rediswritekey/writekey.go diff --git a/adapter/rabbitmq/adapter.go b/adapter/rabbitmq/adapter.go new file mode 100644 index 00000000..6a82c850 --- /dev/null +++ b/adapter/rabbitmq/adapter.go @@ -0,0 +1,10 @@ +package rabbitmq + +type Config struct { + UserName string `koanf:"username"` + Password string `koanf:"password"` + Host string `koanf:"host"` + Port int `koanf:"port"` + Vhost string `koanf:"vhost"` + ReconnectSecond int `koanf:"reconnect_second"` +} diff --git a/cmd/source/faker/consumer/main.go b/cmd/source/faker/consumer/main.go new file mode 100644 index 00000000..bc75d202 --- /dev/null +++ b/cmd/source/faker/consumer/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "sync" + + "github.com/labstack/gommon/log" + "github.com/ormushq/ormus/adapter/otela" + "github.com/ormushq/ormus/config" + "github.com/ormushq/ormus/destination/dconfig" + "github.com/ormushq/ormus/pkg/channel" + rbbitmqchannel "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmq" + "github.com/ormushq/ormus/pkg/encoder" +) + +func main() { + cfg := config.C() + done := make(chan bool) + wg := &sync.WaitGroup{} + dbConfig := dconfig.RabbitMQConsumerConnection{ + User: cfg.RabbitMq.UserName, + Password: cfg.RabbitMq.Password, + Host: cfg.RabbitMq.Host, + Port: cfg.RabbitMq.Port, + Vhost: cfg.RabbitMq.Vhost, + ReconnectSecond: cfg.RabbitMq.ReconnectSecond, + } + bufferSize := cfg.Source.BufferSize + numberInstants := cfg.Source.NumberInstants + maxRetryPolicy := cfg.Source.MaxRetry + eventName := cfg.Source.NewSourceEventName + err := otela.Configure(wg, done, otela.Config{Exporter: otela.ExporterConsole}) + if err != nil { + panic(err.Error()) + } + + outputAdapter := rbbitmqchannel.New(done, wg, dbConfig) + err = outputAdapter.NewChannel(eventName, channel.OutputOnly, bufferSize, numberInstants, maxRetryPolicy) + if err != nil { + panic(err) + } + outputChannel, err := outputAdapter.GetOutputChannel(eventName) + if err != nil { + panic(err) + } + + wg.Add(1) + + go func() { + defer wg.Done() + for msg := range outputChannel { + m := encoder.DecodeNewSourceEvent(string(msg.Body)) + log.Info(m.WriteKey) + + if err := msg.Ack(); err != nil { + panic(err) + } + } + }() + wg.Wait() +} diff --git a/cmd/source/faker/publisher/main.go b/cmd/source/faker/publisher/main.go new file mode 100644 index 00000000..2f62c6dd --- /dev/null +++ b/cmd/source/faker/publisher/main.go @@ -0,0 +1,61 @@ +package main + +import ( + "sync" + + "github.com/google/uuid" + "github.com/ormushq/ormus/adapter/otela" + "github.com/ormushq/ormus/config" + "github.com/ormushq/ormus/contract/go/source" + "github.com/ormushq/ormus/destination/dconfig" + "github.com/ormushq/ormus/pkg/channel" + rbbitmqchannel "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmq" + "github.com/ormushq/ormus/pkg/encoder" +) + +func main() { + cfg := config.C() + done := make(chan bool) + wg := &sync.WaitGroup{} + dbConfig := dconfig.RabbitMQConsumerConnection{ + User: cfg.RabbitMq.UserName, + Password: cfg.RabbitMq.Password, + Host: cfg.RabbitMq.Host, + Port: cfg.RabbitMq.Port, + Vhost: cfg.RabbitMq.Vhost, + ReconnectSecond: cfg.RabbitMq.ReconnectSecond, + } + bufferSize := cfg.Source.BufferSize + numberInstants := cfg.Source.NumberInstants + maxRetryPolicy := cfg.Source.MaxRetry + testCount := 100 + + err := otela.Configure(wg, done, otela.Config{Exporter: otela.ExporterConsole}) + if err != nil { + panic(err.Error()) + } + inputAdapter := rbbitmqchannel.New(done, wg, dbConfig) + err = inputAdapter.NewChannel(cfg.Source.NewSourceEventName, channel.InputOnlyMode, bufferSize, numberInstants, maxRetryPolicy) + if err != nil { + panic(err.Error()) + } + inputChannel, err := inputAdapter.GetInputChannel(cfg.Source.NewSourceEventName) + if err != nil { + panic(err.Error()) + } + + wg.Add(1) + go func() { + defer wg.Done() + for messageID := 0; messageID < testCount; messageID++ { + msg := encoder.EncodeNewSourceEvent(&source.NewSourceEvent{ + ProjectId: uuid.New().String(), + OwnerId: uuid.New().String(), + WriteKey: uuid.New().String(), + }) + inputChannel <- []byte(msg) + + } + }() + wg.Wait() +} diff --git a/cmd/source/main.go b/cmd/source/main.go index 7af34351..2a0c26c4 100644 --- a/cmd/source/main.go +++ b/cmd/source/main.go @@ -1,16 +1,24 @@ package main import ( + "context" "log/slog" "os" "os/signal" "sync" "github.com/ormushq/ormus/adapter/otela" + "github.com/ormushq/ormus/adapter/redis" "github.com/ormushq/ormus/config" + "github.com/ormushq/ormus/destination/dconfig" "github.com/ormushq/ormus/logger" + "github.com/ormushq/ormus/pkg/channel" + rbbitmqchannel "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmq" "github.com/ormushq/ormus/source/delivery/httpserver" "github.com/ormushq/ormus/source/delivery/httpserver/statushandler" + sourceevent "github.com/ormushq/ormus/source/eventhandler" + writekeyrepo "github.com/ormushq/ormus/source/repository/redis/rediswritekey" + "github.com/ormushq/ormus/source/service/writekey" ) // @termsOfService http://swagger.io/terms/ @@ -58,6 +66,14 @@ func main() { handlers := []httpserver.Handler{ statushandler.New(), } + err := otela.Configure(wg, done, otela.Config{Exporter: otela.ExporterConsole}) + if err != nil { + panic(err.Error()) + } + + cfg := config.C() + _, Consumer := SetupSourceServices(cfg) + Consumer.Consume(context.Background(), cfg.Source.NewSourceEventName, done, wg, Consumer.ProcessNewSourceEvent) //----------------- Setup Tracer -----------------// otelcfg := otela.Config{ @@ -68,7 +84,7 @@ func main() { MetricExposePort: config.C().Source.Otel.MetricExposePort, Exporter: otela.ExporterGrpc, } - err := otela.Configure(wg, done, otelcfg) + err = otela.Configure(wg, done, otelcfg) if err != nil { l.Error(err.Error()) } @@ -86,3 +102,32 @@ func main() { close(done) wg.Wait() } + +func SetupSourceServices(cfg config.Config) (writekey.Service, sourceevent.Consumer) { + done := make(chan bool) + wg := &sync.WaitGroup{} + dbConfig := dconfig.RabbitMQConsumerConnection{ + User: cfg.RabbitMq.UserName, + Password: cfg.RabbitMq.Password, + Host: cfg.RabbitMq.Host, + Port: cfg.RabbitMq.Port, + Vhost: cfg.RabbitMq.Vhost, + ReconnectSecond: cfg.RabbitMq.ReconnectSecond, + } + outputAdapter := rbbitmqchannel.New(done, wg, dbConfig) + err := outputAdapter.NewChannel(cfg.Source.NewSourceEventName, channel.OutputOnly, cfg.Source.BufferSize, cfg.Source.NumberInstants, cfg.Source.MaxRetry) + if err != nil { + panic(err) + } + + adapter, err := redis.New(cfg.Redis) + if err != nil { + panic(err) + } + + writeKeyRepo := writekeyrepo.New(adapter) + writeKeySvc := writekey.New(&writeKeyRepo, cfg.Source) + eventHandler := sourceevent.New(outputAdapter, writeKeySvc) + + return writeKeySvc, *eventHandler +} diff --git a/config.yml b/config.yml index 856846ef..aca451db 100644 --- a/config.yml +++ b/config.yml @@ -4,6 +4,11 @@ source: http_server: port: 8080 network: "tcp" + write_key_expiration: 120 + new_source_event_name: "new-source-event" + buffersize: 100 + number_instants: 10 + maxretry: 5 otel: endpoint: "otel_collector:4317" service_name: "source" @@ -19,6 +24,13 @@ redis: host: 127.0.0.1 db: 0 password: "" +rabbitmq: + host: "localhost" + port: 5672 + password: "guest" + username: "guest" + reconnect_second: 1 + vhost: "/" etcd: port: 2379 host: 127.0.0.1 diff --git a/config/config.go b/config/config.go index 5e37fc0c..3d09c44c 100644 --- a/config/config.go +++ b/config/config.go @@ -2,6 +2,7 @@ package config import ( "github.com/ormushq/ormus/adapter/etcd" + "github.com/ormushq/ormus/adapter/rabbitmq" "github.com/ormushq/ormus/adapter/redis" "github.com/ormushq/ormus/adapter/scylladb" "github.com/ormushq/ormus/destination/dconfig" @@ -12,6 +13,7 @@ import ( type Config struct { Redis redis.Config `koanf:"redis"` Etcd etcd.Config `koanf:"etcd"` + RabbitMq rabbitmq.Config `koanf:"rabbitmq"` Manager manager.Config `koanf:"manager"` Source source.Config `koanf:"source"` Destination dconfig.Config `koanf:"destination"` diff --git a/contract/go/source/source.pb.go b/contract/go/source/source.pb.go index 4f744755..d8c2a2c1 100644 --- a/contract/go/source/source.pb.go +++ b/contract/go/source/source.pb.go @@ -1,18 +1,18 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.2 -// protoc v5.28.0 +// protoc-gen-go v1.26.0 +// protoc v3.12.4 // source: source/source.proto -package manager +package source import ( reflect "reflect" sync "sync" + timestamp "github.com/golang/protobuf/ptypes/timestamp" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -144,17 +144,17 @@ type Source struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - WriteKey string `protobuf:"bytes,2,opt,name=write_key,json=writeKey,proto3" json:"write_key,omitempty"` - Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` - Description string `protobuf:"bytes,4,opt,name=description,proto3" json:"description,omitempty"` - ProjectId string `protobuf:"bytes,5,opt,name=project_id,json=projectId,proto3" json:"project_id,omitempty"` - OwnerId string `protobuf:"bytes,6,opt,name=owner_id,json=ownerId,proto3" json:"owner_id,omitempty"` - Status Status `protobuf:"varint,7,opt,name=status,proto3,enum=source.Status" json:"status,omitempty"` - Metadata *SourceMetadata `protobuf:"bytes,8,opt,name=metadata,proto3" json:"metadata,omitempty"` - CreatedAt *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` - UpdatedAt *timestamppb.Timestamp `protobuf:"bytes,10,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"` - DeletedAt *timestamppb.Timestamp `protobuf:"bytes,11,opt,name=deleted_at,json=deletedAt,proto3" json:"deleted_at,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + WriteKey string `protobuf:"bytes,2,opt,name=write_key,json=writeKey,proto3" json:"write_key,omitempty"` + Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` + Description string `protobuf:"bytes,4,opt,name=description,proto3" json:"description,omitempty"` + ProjectId string `protobuf:"bytes,5,opt,name=project_id,json=projectId,proto3" json:"project_id,omitempty"` + OwnerId string `protobuf:"bytes,6,opt,name=owner_id,json=ownerId,proto3" json:"owner_id,omitempty"` + Status Status `protobuf:"varint,7,opt,name=status,proto3,enum=source.Status" json:"status,omitempty"` + Metadata *SourceMetadata `protobuf:"bytes,8,opt,name=metadata,proto3" json:"metadata,omitempty"` + CreatedAt *timestamp.Timestamp `protobuf:"bytes,9,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` + UpdatedAt *timestamp.Timestamp `protobuf:"bytes,10,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"` + DeletedAt *timestamp.Timestamp `protobuf:"bytes,11,opt,name=deleted_at,json=deletedAt,proto3" json:"deleted_at,omitempty"` } func (x *Source) Reset() { @@ -245,27 +245,90 @@ func (x *Source) GetMetadata() *SourceMetadata { return nil } -func (x *Source) GetCreatedAt() *timestamppb.Timestamp { +func (x *Source) GetCreatedAt() *timestamp.Timestamp { if x != nil { return x.CreatedAt } return nil } -func (x *Source) GetUpdatedAt() *timestamppb.Timestamp { +func (x *Source) GetUpdatedAt() *timestamp.Timestamp { if x != nil { return x.UpdatedAt } return nil } -func (x *Source) GetDeletedAt() *timestamppb.Timestamp { +func (x *Source) GetDeletedAt() *timestamp.Timestamp { if x != nil { return x.DeletedAt } return nil } +type NewSourceEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ProjectId string `protobuf:"bytes,1,opt,name=project_id,json=projectId,proto3" json:"project_id,omitempty"` + OwnerId string `protobuf:"bytes,2,opt,name=owner_id,json=ownerId,proto3" json:"owner_id,omitempty"` + WriteKey string `protobuf:"bytes,3,opt,name=write_key,json=writeKey,proto3" json:"write_key,omitempty"` +} + +func (x *NewSourceEvent) Reset() { + *x = NewSourceEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_source_source_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NewSourceEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NewSourceEvent) ProtoMessage() {} + +func (x *NewSourceEvent) ProtoReflect() protoreflect.Message { + mi := &file_source_source_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NewSourceEvent.ProtoReflect.Descriptor instead. +func (*NewSourceEvent) Descriptor() ([]byte, []int) { + return file_source_source_proto_rawDescGZIP(), []int{2} +} + +func (x *NewSourceEvent) GetProjectId() string { + if x != nil { + return x.ProjectId + } + return "" +} + +func (x *NewSourceEvent) GetOwnerId() string { + if x != nil { + return x.OwnerId + } + return "" +} + +func (x *NewSourceEvent) GetWriteKey() string { + if x != nil { + return x.WriteKey + } + return "" +} + var File_source_source_proto protoreflect.FileDescriptor var file_source_source_proto_rawDesc = []byte{ @@ -306,14 +369,20 @@ var file_source_source_proto_rawDesc = []byte{ 0x0a, 0x0a, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, - 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x2a, 0x32, 0x0a, 0x06, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x41, 0x43, - 0x54, 0x49, 0x56, 0x45, 0x10, 0x00, 0x12, 0x15, 0x0a, 0x11, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, - 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x01, 0x42, 0x2e, 0x5a, - 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x72, 0x6d, 0x75, - 0x73, 0x68, 0x71, 0x2f, 0x6f, 0x72, 0x6d, 0x75, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, - 0x63, 0x74, 0x2f, 0x67, 0x6f, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, 0x67, 0x0a, 0x0e, 0x4e, 0x65, 0x77, + 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x70, + 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x49, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x77, + 0x6e, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6f, 0x77, + 0x6e, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x5f, 0x6b, + 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x72, 0x69, 0x74, 0x65, 0x4b, + 0x65, 0x79, 0x2a, 0x32, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x11, 0x0a, 0x0d, + 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x00, 0x12, + 0x15, 0x0a, 0x11, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x41, 0x43, + 0x54, 0x49, 0x56, 0x45, 0x10, 0x01, 0x42, 0x2d, 0x5a, 0x2b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x72, 0x6d, 0x75, 0x73, 0x68, 0x71, 0x2f, 0x6f, 0x72, 0x6d, + 0x75, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x2f, 0x67, 0x6f, 0x2f, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -329,19 +398,20 @@ func file_source_source_proto_rawDescGZIP() []byte { } var file_source_source_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_source_source_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_source_source_proto_goTypes = []any{ - (Status)(0), // 0: source.Status - (*SourceMetadata)(nil), // 1: source.SourceMetadata - (*Source)(nil), // 2: source.Source - (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp +var file_source_source_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_source_source_proto_goTypes = []interface{}{ + (Status)(0), // 0: source.Status + (*SourceMetadata)(nil), // 1: source.SourceMetadata + (*Source)(nil), // 2: source.Source + (*NewSourceEvent)(nil), // 3: source.NewSourceEvent + (*timestamp.Timestamp)(nil), // 4: google.protobuf.Timestamp } var file_source_source_proto_depIdxs = []int32{ 0, // 0: source.Source.status:type_name -> source.Status 1, // 1: source.Source.metadata:type_name -> source.SourceMetadata - 3, // 2: source.Source.created_at:type_name -> google.protobuf.Timestamp - 3, // 3: source.Source.updated_at:type_name -> google.protobuf.Timestamp - 3, // 4: source.Source.deleted_at:type_name -> google.protobuf.Timestamp + 4, // 2: source.Source.created_at:type_name -> google.protobuf.Timestamp + 4, // 3: source.Source.updated_at:type_name -> google.protobuf.Timestamp + 4, // 4: source.Source.deleted_at:type_name -> google.protobuf.Timestamp 5, // [5:5] is the sub-list for method output_type 5, // [5:5] is the sub-list for method input_type 5, // [5:5] is the sub-list for extension type_name @@ -355,7 +425,7 @@ func file_source_source_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_source_source_proto_msgTypes[0].Exporter = func(v any, i int) any { + file_source_source_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SourceMetadata); i { case 0: return &v.state @@ -367,7 +437,7 @@ func file_source_source_proto_init() { return nil } } - file_source_source_proto_msgTypes[1].Exporter = func(v any, i int) any { + file_source_source_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Source); i { case 0: return &v.state @@ -379,6 +449,18 @@ func file_source_source_proto_init() { return nil } } + file_source_source_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NewSourceEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -386,7 +468,7 @@ func file_source_source_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_source_source_proto_rawDesc, NumEnums: 1, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, diff --git a/contract/protobuf/source/source.proto b/contract/protobuf/source/source.proto index bc4fd3aa..67cf39e8 100644 --- a/contract/protobuf/source/source.proto +++ b/contract/protobuf/source/source.proto @@ -30,3 +30,9 @@ message Source { google.protobuf.Timestamp updated_at = 10; google.protobuf.Timestamp deleted_at = 11; } + +message NewSourceEvent { + string project_id = 1; + string owner_id = 2; + string write_key = 3; +} \ No newline at end of file diff --git a/pkg/encoder/newsource.go b/pkg/encoder/newsource.go new file mode 100644 index 00000000..5ae96cfc --- /dev/null +++ b/pkg/encoder/newsource.go @@ -0,0 +1,34 @@ +package encoder + +import ( + "encoding/base64" + + "github.com/ormushq/ormus/contract/go/source" + "google.golang.org/protobuf/proto" +) + +func EncodeNewSourceEvent(newSource *source.NewSourceEvent) string { + payload, err := proto.Marshal(newSource) + if err != nil { + return "" + } + + return base64.StdEncoding.EncodeToString(payload) +} + +func DecodeNewSourceEvent(newSourceEvent string) *source.NewSourceEvent { + payload, err := base64.StdEncoding.DecodeString(newSourceEvent) + if err != nil { + return nil + } + mu := source.NewSourceEvent{} + if err := proto.Unmarshal(payload, &mu); err != nil { + return nil + } + + return &source.NewSourceEvent{ + ProjectId: (mu.ProjectId), + OwnerId: mu.OwnerId, + WriteKey: mu.WriteKey, + } +} diff --git a/source/config.go b/source/config.go index 7a5a0692..f2739119 100644 --- a/source/config.go +++ b/source/config.go @@ -12,5 +12,10 @@ type HTTPServer struct { type Config struct { HTTPServer HTTPServer `koanf:"http_server"` // TODO - add source, auth and etc configurations - Otel otela.Otel `koanf:"otel"` + Otel otela.Otel `koanf:"otel"` + WriteKeyRedisExpiration uint `koanf:"write_key_expiration"` + NewSourceEventName string `koanf:"new_source_event_name"` + BufferSize int `koanf:"buffersize"` + NumberInstants int `koanf:"number_instants"` + MaxRetry int `koanf:"maxretry"` } diff --git a/source/eventhandler/newsource.go b/source/eventhandler/newsource.go new file mode 100644 index 00000000..ba2fc3de --- /dev/null +++ b/source/eventhandler/newsource.go @@ -0,0 +1,72 @@ +package eventhandler + +import ( + "context" + "fmt" + "sync" + + "github.com/ormushq/ormus/logger" + "github.com/ormushq/ormus/pkg/channel" + "github.com/ormushq/ormus/pkg/encoder" + "github.com/ormushq/ormus/source/service/writekey" +) + +type ProcessMessage func(ctx context.Context, msg channel.Message) error + +type Consumer struct { + broker channel.Adapter + writeKeyService writekey.Service +} + +func New(broker channel.Adapter, writeKeyService writekey.Service) *Consumer { + return &Consumer{ + broker: broker, + writeKeyService: writeKeyService, + } +} + +func (c Consumer) ProcessNewSourceEvent(ctx context.Context, msg channel.Message) error { + decodedEvent := encoder.DecodeNewSourceEvent(string(msg.Body)) + // Log retrieval + logger.L().Info(fmt.Sprintf("project id : %s, write key: %s, owner id: %s: has been retrieved", + decodedEvent.ProjectId, decodedEvent.WriteKey, decodedEvent.OwnerId)) + + err := c.writeKeyService.CreateNewWriteKey(ctx, decodedEvent.OwnerId, decodedEvent.ProjectId, decodedEvent.WriteKey) + if err != nil { + logger.L().Error("err on creating writekey in redis", "err msg:", err.Error()) + // TODO support Nack in pkg + } + + logger.L().Debug("the message has been received") + err = msg.Ack() + if err != nil { + logger.L().Debug("ack failed for message", "err msg:", err.Error()) + } + + return nil +} + +func (c Consumer) Consume(ctx context.Context, queueName string, done <-chan bool, wg *sync.WaitGroup, processMessage ProcessMessage) { + logger.L().Debug("Consumer started") + wg.Add(1) + go func() { + defer wg.Done() + + msgChan, err := c.broker.GetOutputChannel(queueName) + if err != nil { + logger.L().Debug("error while subscribing to source topic") + } + for { + select { + case msg := <-msgChan: + go func() { + if err := processMessage(ctx, msg); err != nil { + logger.L().Debug("error processing message", "err msg:", err.Error()) + } + }() + case <-done: + return + } + } + }() +} diff --git a/source/repository/redis/rediswritekey/writekey.go b/source/repository/redis/rediswritekey/writekey.go new file mode 100644 index 00000000..677ff98a --- /dev/null +++ b/source/repository/redis/rediswritekey/writekey.go @@ -0,0 +1,33 @@ +package rediswritekey + +import ( + "context" + "fmt" + "time" + + proto_source "github.com/ormushq/ormus/contract/go/source" + "github.com/ormushq/ormus/pkg/richerror" +) + +func (r *DB) CreateNewWriteKey(ctx context.Context, writeKey *proto_source.NewSourceEvent, expirationTime uint) error { + err := r.adapter.Client().Set(ctx, fmt.Sprintf("%s+%s", writeKey.OwnerId, writeKey.ProjectId), + writeKey.WriteKey, time.Minute*time.Duration(expirationTime)).Err() + if err != nil { + return richerror.New("source").WithWrappedError(err).WithKind(richerror.KindUnexpected).WithMessage(err.Error()) + } + + return nil +} + +func (r *DB) GetWriteKey(ctx context.Context, ownerID, projectID string) (*proto_source.NewSourceEvent, error) { + wk, err := r.adapter.Client().Get(ctx, fmt.Sprintf("%s+%s", ownerID, projectID)).Result() + if err != nil { + return nil, richerror.New("source.repository").WithWrappedError(err).WithKind(richerror.KindUnexpected).WithMessage(err.Error()) + } + + return &proto_source.NewSourceEvent{ + OwnerId: ownerID, + ProjectId: projectID, + WriteKey: wk, + }, nil +} diff --git a/source/service/writekey/service.go b/source/service/writekey/service.go index f6b04e7d..dff7be0a 100644 --- a/source/service/writekey/service.go +++ b/source/service/writekey/service.go @@ -2,37 +2,38 @@ package writekey import ( "context" + + proto_source "github.com/ormushq/ormus/contract/go/source" + "github.com/ormushq/ormus/pkg/richerror" + "github.com/ormushq/ormus/source" ) -// Repository is an interface representing what methods should be implemented by the repository. type Repository interface { - // TODO - implementation redis - IsValidWriteKey(ctx context.Context, writeKey string) (bool, error) + CreateNewWriteKey(ctx context.Context, writeKey *proto_source.NewSourceEvent, expirationTime uint) error + GetWriteKey(ctx context.Context, ownerID, projectID string) (*proto_source.NewSourceEvent, error) } -// Service show dependencies writeKey authservice. type Service struct { - repo Repository + writeKeyRepo Repository + config source.Config } -// Constructor writeKey authservice. -func New(repo Repository) Service { +func New(writeKeyRepo Repository, config source.Config) Service { return Service{ - repo: repo, + writeKeyRepo: writeKeyRepo, + config: config, } } -// IsValid checks whether the writeKey is valid or not. -func (s Service) IsValid(ctx context.Context, writeKey string) (bool, error) { - // TODO - How errmsg handling ? Rich-errmsg or ...? - isValid, err := s.repo.IsValidWriteKey(ctx, writeKey) +func (s Service) CreateNewWriteKey(ctx context.Context, ownerID, projectID, writeKey string) error { + err := s.writeKeyRepo.CreateNewWriteKey(ctx, &proto_source.NewSourceEvent{ + ProjectId: projectID, + OwnerId: ownerID, + WriteKey: writeKey, + }, s.config.WriteKeyRedisExpiration) if err != nil { - // TODO - logger - return false, err - } - if !isValid { - return false, err + return richerror.New("source.service").WithWrappedError(err) } - return true, nil + return nil } diff --git a/source/service/writekey/service_test.go b/source/service/writekey/service_test.go index 377a780c..61b79d71 100644 --- a/source/service/writekey/service_test.go +++ b/source/service/writekey/service_test.go @@ -3,9 +3,6 @@ package writekey_test import ( "context" "fmt" - "testing" - - "github.com/ormushq/ormus/source/service/writekey" ) type mockRepo struct{} @@ -17,28 +14,3 @@ func (m mockRepo) IsValidWriteKey(ctx context.Context, writeKey string) (bool, e } return true, nil } - -func TestIsValid(t *testing.T) { - t.Run("writekey not found", func(t *testing.T) { - m := new(mockRepo) - service := writekey.New(m) - ctx := context.Background() - _, err := service.IsValid(ctx, "") - if err == nil { - t.Fatal("error is nil") - } - }) - - t.Run("writekey is exists and valid", func(t *testing.T) { - m := new(mockRepo) - service := writekey.New(m) - ctx := context.Background() - isValid, err := service.IsValid(ctx, "asdfffg4g5g56d5s4s6s5sd8") - if err != nil { - t.Fatal("error is not nil") - } - if !isValid { - t.Fatal("writekey is not valid") - } - }) -}