Skip to content

Commit

Permalink
feat(source): Implement new source event
Browse files Browse the repository at this point in the history
  • Loading branch information
alishazaee committed Oct 29, 2024
1 parent a5219d9 commit 625b269
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 39 deletions.
10 changes: 10 additions & 0 deletions adapter/rabbitmq/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package rabbitmq

type Config struct {
UserName string `koanf:"username"`
Password string `koanf:"password"`
Host string `koanf:"host"`
Port int `koanf:"port"`
Vhost string `koanf:"vhost"`
ReconnectSecond int `koanf:"reconnect_second"`
}
60 changes: 60 additions & 0 deletions cmd/source/faker/consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"sync"

"github.com/labstack/gommon/log"
"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/pkg/channel"
rbbitmqchannel "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmq"
"github.com/ormushq/ormus/pkg/encoder"
)

func main() {
cfg := config.C()
done := make(chan bool)
wg := &sync.WaitGroup{}
dbConfig := dconfig.RabbitMQConsumerConnection{
User: cfg.RabbitMq.UserName,
Password: cfg.RabbitMq.Password,
Host: cfg.RabbitMq.Host,
Port: cfg.RabbitMq.Port,
Vhost: cfg.RabbitMq.Vhost,
ReconnectSecond: cfg.RabbitMq.ReconnectSecond,
}
bufferSize := cfg.Source.BufferSize
numberInstants := cfg.Source.NumberInstants
maxRetryPolicy := cfg.Source.MaxRetry
eventName := cfg.Source.NewSourceEventName
err := otela.Configure(wg, done, otela.Config{Exporter: otela.ExporterConsole})
if err != nil {
panic(err.Error())
}

outputAdapter := rbbitmqchannel.New(done, wg, dbConfig)
err = outputAdapter.NewChannel(eventName, channel.OutputOnly, bufferSize, numberInstants, maxRetryPolicy)
if err != nil {
panic(err)
}
outputChannel, err := outputAdapter.GetOutputChannel(eventName)
if err != nil {
panic(err)
}

wg.Add(1)

go func() {
defer wg.Done()
for msg := range outputChannel {
m := encoder.DecodeNewSourceEvent(string(msg.Body))
log.Info(m.WriteKey)

if err := msg.Ack(); err != nil {
panic(err)
}
}
}()
wg.Wait()
}
61 changes: 61 additions & 0 deletions cmd/source/faker/publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"sync"

"github.com/google/uuid"
"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/contract/go/source"
"github.com/ormushq/ormus/destination/dconfig"
"github.com/ormushq/ormus/pkg/channel"
rbbitmqchannel "github.com/ormushq/ormus/pkg/channel/adapter/rabbitmq"
"github.com/ormushq/ormus/pkg/encoder"
)

func main() {
cfg := config.C()
done := make(chan bool)
wg := &sync.WaitGroup{}
dbConfig := dconfig.RabbitMQConsumerConnection{
User: cfg.RabbitMq.UserName,
Password: cfg.RabbitMq.Password,
Host: cfg.RabbitMq.Host,
Port: cfg.RabbitMq.Port,
Vhost: cfg.RabbitMq.Vhost,
ReconnectSecond: cfg.RabbitMq.ReconnectSecond,
}
bufferSize := cfg.Source.BufferSize
numberInstants := cfg.Source.NumberInstants
maxRetryPolicy := cfg.Source.MaxRetry
testCount := 100

err := otela.Configure(wg, done, otela.Config{Exporter: otela.ExporterConsole})
if err != nil {
panic(err.Error())
}
inputAdapter := rbbitmqchannel.New(done, wg, dbConfig)
err = inputAdapter.NewChannel(cfg.Source.NewSourceEventName, channel.InputOnlyMode, bufferSize, numberInstants, maxRetryPolicy)
if err != nil {
panic(err.Error())
}
inputChannel, err := inputAdapter.GetInputChannel(cfg.Source.NewSourceEventName)
if err != nil {
panic(err.Error())
}

wg.Add(1)
go func() {
defer wg.Done()
for messageID := 0; messageID < testCount; messageID++ {
msg := encoder.EncodeNewSourceEvent(&source.NewSourceEvent{
ProjectId: uuid.New().String(),
OwnerId: uuid.New().String(),
WriteKey: uuid.New().String(),
})
inputChannel <- []byte(msg)

}
}()
wg.Wait()
}
12 changes: 12 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ source:
http_server:
port: 8080
network: "tcp"
write_key_expiration: 120
new_source_event_name: "new-source-event"
buffersize: 100
number_instants: 10
maxretry: 5
otel:
endpoint: "otel_collector:4317"
service_name: "source"
Expand All @@ -19,6 +24,13 @@ redis:
host: 127.0.0.1
db: 0
password: ""
rabbitmq:
host: "localhost"
port: 5672
password: "guest"
username: "guest"
reconnect_second: 1
vhost: "/"
etcd:
port: 2379
host: 127.0.0.1
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"github.com/ormushq/ormus/adapter/etcd"
"github.com/ormushq/ormus/adapter/rabbitmq"
"github.com/ormushq/ormus/adapter/redis"
"github.com/ormushq/ormus/adapter/scylladb"
"github.com/ormushq/ormus/destination/dconfig"
Expand All @@ -12,6 +13,7 @@ import (
type Config struct {
Redis redis.Config `koanf:"redis"`
Etcd etcd.Config `koanf:"etcd"`
RabbitMq rabbitmq.Config `koanf:"rabbitmq"`
Manager manager.Config `koanf:"manager"`
Source source.Config `koanf:"source"`
Destination dconfig.Config `koanf:"destination"`
Expand Down
Loading

0 comments on commit 625b269

Please sign in to comment.