Skip to content

Commit

Permalink
Merge pull request #1729 from openmeterio/refactor/sink-worker
Browse files Browse the repository at this point in the history
refactor(sink): namespace and topic subscription update
  • Loading branch information
chrisgacsal authored Oct 22, 2024
2 parents d5fa39e + cd6922b commit 9f7f5b0
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 79 deletions.
16 changes: 9 additions & 7 deletions app/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,15 @@ func TestComplete(t *testing.T) {
},
},
Sink: SinkConfiguration{
GroupId: "openmeter-sink-worker",
MinCommitCount: 500,
MaxCommitWait: 30 * time.Second,
MaxPollTimeout: 100 * time.Millisecond,
NamespaceRefetch: 15 * time.Second,
FlushSuccessTimeout: 5 * time.Second,
DrainTimeout: 10 * time.Second,
GroupId: "openmeter-sink-worker",
MinCommitCount: 500,
MaxCommitWait: 30 * time.Second,
MaxPollTimeout: 100 * time.Millisecond,
NamespaceRefetch: 15 * time.Second,
FlushSuccessTimeout: 5 * time.Second,
DrainTimeout: 10 * time.Second,
NamespaceRefetchTimeout: 9 * time.Second,
NamespaceTopicRegexp: "^om_test_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$",
Dedupe: DedupeConfiguration{
Enabled: true,
DedupeDriverConfiguration: DedupeDriverRedisConfiguration{
Expand Down
17 changes: 17 additions & 0 deletions app/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type SinkConfiguration struct {
Kafka KafkaConfig
// Storage configuration
Storage StorageConfiguration

// NamespaceRefetchTimeout is the timeout for updating namespaces and consumer subscription.
// It must be less than NamespaceRefetch interval.
NamespaceRefetchTimeout time.Duration

// NamespaceTopicRegexp defines the regular expression to match/validate topic names the sink-worker needs to subscribe to.
NamespaceTopicRegexp string
}

func (c SinkConfiguration) Validate() error {
Expand Down Expand Up @@ -53,6 +60,14 @@ func (c SinkConfiguration) Validate() error {
errs = append(errs, errors.New("DrainTimeout must be greater than 0"))
}

if c.NamespaceRefetchTimeout != 0 && c.NamespaceRefetchTimeout > c.NamespaceRefetch {
errs = append(errs, errors.New("NamespaceRefetchTimeout must be less than or equal to NamespaceRefetch"))
}

if c.NamespaceTopicRegexp == "" {
errs = append(errs, errors.New("NamespaceTopicRegexp must no be empty"))
}

if err := c.IngestNotifications.Validate(); err != nil {
errs = append(errs, errorsx.WithPrefix(err, "ingest notifications"))
}
Expand Down Expand Up @@ -136,6 +151,8 @@ func ConfigureSink(v *viper.Viper) {
v.SetDefault("sink.flushSuccessTimeout", "5s")
v.SetDefault("sink.drainTimeout", "10s")
v.SetDefault("sink.ingestNotifications.maxEventsInBatch", 500)
v.SetDefault("sink.namespaceRefetchTimeout", "10s")
v.SetDefault("sink.namespaceTopicRegexp", "^om_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$")

// Sink Storage
v.SetDefault("sink.storage.asyncInsert", false)
Expand Down
2 changes: 2 additions & 0 deletions app/config/testdata/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ sink:
minCommitCount: 500
maxCommitWait: 30s
namespaceRefetch: 15s
namespaceRefetchTimeout: 9s
namespaceTopicRegexp: "^om_test_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$"
dedupe:
enabled: true
driver: redis
Expand Down
32 changes: 17 additions & 15 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,23 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr
}

sinkConfig := sink.SinkConfig{
Logger: logger,
Tracer: tracer,
MetricMeter: metricMeter,
MeterRepository: meterRepository,
Storage: storage,
Deduplicator: deduplicator,
Consumer: consumer,
MinCommitCount: config.Sink.MinCommitCount,
MaxCommitWait: config.Sink.MaxCommitWait,
MaxPollTimeout: config.Sink.MaxPollTimeout,
FlushSuccessTimeout: config.Sink.FlushSuccessTimeout,
DrainTimeout: config.Sink.DrainTimeout,
NamespaceRefetch: config.Sink.NamespaceRefetch,
FlushEventHandler: flushHandler,
TopicResolver: topicResolver,
Logger: logger,
Tracer: tracer,
MetricMeter: metricMeter,
MeterRepository: meterRepository,
Storage: storage,
Deduplicator: deduplicator,
Consumer: consumer,
MinCommitCount: config.Sink.MinCommitCount,
MaxCommitWait: config.Sink.MaxCommitWait,
MaxPollTimeout: config.Sink.MaxPollTimeout,
FlushSuccessTimeout: config.Sink.FlushSuccessTimeout,
DrainTimeout: config.Sink.DrainTimeout,
NamespaceRefetch: config.Sink.NamespaceRefetch,
FlushEventHandler: flushHandler,
TopicResolver: topicResolver,
NamespaceRefetchTimeout: config.Sink.NamespaceRefetchTimeout,
NamespaceTopicRegexp: config.Sink.NamespaceTopicRegexp,
}

return sink.NewSink(sinkConfig)
Expand Down
1 change: 1 addition & 0 deletions e2e/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ sink:
minCommitCount: 500
maxCommitWait: 1s
namespaceRefetch: 1s
namespaceRefetchTimeout: 1s
dedupe:
enabled: true
driver: redis
Expand Down
6 changes: 5 additions & 1 deletion openmeter/ingest/kafkaingest/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
kafkastats "github.com/openmeterio/openmeter/pkg/kafka/metrics/stats"
)

const (
HeaderKeyNamespace = "namespace"
)

// Collector is a receiver of events that handles sending those events to a downstream Kafka broker.
type Collector struct {
Producer *kafka.Producer
Expand Down Expand Up @@ -86,7 +90,7 @@ func (s Collector) Ingest(ctx context.Context, namespace string, ev event.Event)
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Timestamp: ev.Time(),
Headers: []kafka.Header{
{Key: "namespace", Value: []byte(namespace)},
{Key: HeaderKeyNamespace, Value: []byte(namespace)},
{Key: "specversion", Value: []byte(ev.SpecVersion())},
{Key: "ingested_at", Value: []byte(time.Now().UTC().Format(time.RFC3339))},
},
Expand Down
Loading

0 comments on commit 9f7f5b0

Please sign in to comment.