Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): raw event connector #1720

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
248bc2a
feat(sink): meter event
hekike Oct 19, 2024
9bcf353
fix(storage): meter id
hekike Oct 19, 2024
4a83e6d
feat(query): meter
hekike Oct 19, 2024
eea1d46
feat(meter): query
hekike Oct 19, 2024
59fbd59
feat(streaming): unique aggregation
hekike Oct 20, 2024
a38468b
feat(streaming): raw events connector
hekike Oct 21, 2024
6bc872c
feat(streaming): add back mv
hekike Oct 21, 2024
ec443fa
feat(connector): isnert
hekike Oct 21, 2024
97920f6
fix(raw): query
hekike Oct 21, 2024
40c1f8a
feat(server): use parsed
hekike Oct 21, 2024
b5ff8e1
fix(app): wire
hekike Oct 21, 2024
b1b7ade
fix(connector): query without window
hekike Oct 21, 2024
8406261
fix(meter): parse string number
hekike Oct 21, 2024
7d3b8a1
test(meter): parse numeric string value
hekike Oct 21, 2024
be77d88
fix(connector): round windows
hekike Oct 21, 2024
f8c0b5b
refactor(streaming): reuse raw event connctor
hekike Oct 21, 2024
39741ae
feat(streaming): add query raw events config
hekike Oct 21, 2024
d56a0c6
feat(config): streaming engine
hekike Oct 21, 2024
ef3d896
feat(config): unify streaming
hekike Oct 21, 2024
9ca22ab
refactor(wire): rename factory
hekike Oct 21, 2024
d121fa2
test(config): fix
hekike Oct 21, 2024
080da75
refactor(connector): gci
hekike Oct 21, 2024
96ce765
refactor(connector): gci
hekike Oct 21, 2024
6c00d3f
test(connector): insert meter event
hekike Oct 21, 2024
c125900
feat(entitlement): use streaming connector factiry
hekike Oct 21, 2024
b82e021
fix(streaming): query raw
hekike Oct 22, 2024
a9866d5
fix(connector): table order
hekike Oct 22, 2024
cbf6404
feat(connector): delete meter parse connector
hekike Oct 23, 2024
10df0e4
feat(connector): remove meter change
hekike Oct 23, 2024
7a1e5cc
refactor(connector): clickhouse pkg
hekike Oct 23, 2024
8317009
feat(streaming): make event table name configurable
hekike Oct 23, 2024
7a5fd36
test(config): fix
hekike Oct 23, 2024
279cf6d
refactor(connector): remove unused field
hekike Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 48 additions & 15 deletions app/common/openmeter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/openmeterio/openmeter/openmeter/namespace"
"github.com/openmeterio/openmeter/openmeter/sink/flushhandler"
"github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/materialized_view"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/raw_events"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/driver/noop"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
Expand All @@ -34,25 +36,56 @@ func NewMeterRepository(meters []*models.Meter) *meter.InMemoryRepository {
return meter.NewInMemoryRepository(slicesx.Map(meters, lo.FromPtr[models.Meter]))
}

func NewClickHouseStreamingConnector(
func NewStreamingConnector(
ctx context.Context,
conf config.AggregationConfiguration,
clickHouse clickhouse.Conn,
meterRepository meter.Repository,
logger *slog.Logger,
) (*clickhouse_connector.ClickhouseConnector, error) {
streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: conf.CreateOrReplaceMeter,
PopulateMeter: conf.PopulateMeter,
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
) (streaming.Connector, error) {
var (
connector streaming.Connector
err error
)

switch conf.Engine {
case config.AggregationEngineClickHouseRaw:
connector, err = raw_events.NewConnector(ctx, raw_events.ConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
EventsTableName: conf.EventsTableName,
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse raw engine: %w", err)
}

case config.AggregationEngineClickHouseMV:
connector, err = materialized_view.NewConnector(ctx, materialized_view.ConnectorConfig{
ClickHouse: clickHouse,
Database: conf.ClickHouse.Database,
EventsTableName: conf.EventsTableName,
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,

Meters: meterRepository,
PopulateMeter: conf.PopulateMeter,
CreateOrReplaceMeter: conf.CreateOrReplaceMeter,
QueryRawEvents: conf.QueryRawEvents,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse mv engine: %w", err)
}
default:
return nil, fmt.Errorf("invalid aggregation engine: %s", conf.Engine)
}

return streamingConnector, nil
return connector, nil
}

func NewNamespacedTopicResolver(config config.Configuration) (*topicresolver.NamespacedTopicResolver, error) {
Expand Down Expand Up @@ -135,7 +168,7 @@ func NewKafkaNamespaceHandler(

func NewNamespaceHandlers(
kafkaHandler *kafkaingest.NamespaceHandler,
clickHouseHandler *clickhouse_connector.ClickhouseConnector,
clickHouseHandler streaming.Connector,
) []namespace.Handler {
return []namespace.Handler{
kafkaHandler,
Expand Down
5 changes: 1 addition & 4 deletions app/common/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver"
"github.com/openmeterio/openmeter/openmeter/meter"
registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder"
"github.com/openmeterio/openmeter/openmeter/streaming"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/router"
)
Expand Down Expand Up @@ -102,8 +100,7 @@ var OpenMeter = wire.NewSet(
NewMeterRepository,
wire.Bind(new(meter.Repository), new(*meter.InMemoryRepository)),

NewClickHouseStreamingConnector,
wire.Bind(new(streaming.Connector), new(*clickhouse_connector.ClickhouseConnector)),
NewStreamingConnector,

NewNamespacedTopicResolver,
wire.Bind(new(topicresolver.Resolver), new(*topicresolver.NamespacedTopicResolver)),
Expand Down
80 changes: 80 additions & 0 deletions app/config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,64 @@ import (
"crypto/tls"
"errors"
"fmt"
"slices"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/spf13/viper"
)

type AggregationEngine string

const (
// Raw engine queries the raw events table
AggregationEngineClickHouseRaw AggregationEngine = "clickhouse_raw"
// MV engine maintains and queries materialized views
AggregationEngineClickHouseMV AggregationEngine = "clickhouse_mv"
)

func (e AggregationEngine) Values() []AggregationEngine {
return []AggregationEngine{AggregationEngineClickHouseRaw, AggregationEngineClickHouseMV}
}

func (e AggregationEngine) Validate() error {
if !slices.Contains(e.Values(), e) {
return fmt.Errorf("invalid value")
}
return nil
}

type AggregationConfiguration struct {
// Engine is the aggregation engine to use
Engine AggregationEngine
ClickHouse ClickHouseAggregationConfiguration
Comment on lines +35 to 37
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long term we should probably move the clickhouse config to top level and better organize engine configs.

This change only increases the chaos as it is with aggregation level, but clickhouse specific configs. :/


EventsTableName string

// Set true for ClickHouse first store the incoming inserts into an in-memory buffer
// before flushing them regularly to disk.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
AsyncInsert bool
// Set true if you want an insert statement to return with an acknowledgment immediately
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool

// See https://clickhouse.com/docs/en/operations/settings/settings
// For example, you can set the `max_insert_threads` setting to control the number of threads
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
InsertQuerySettings map[string]string

// Engine specific options

// Populate creates the materialized view with data from the events table
// This is not safe to use in production as requires to stop ingestion
PopulateMeter bool
// CreateOrReplace is used to force the recreation of the materialized view
// This is not safe to use in production as it will drop the existing views
CreateOrReplaceMeter bool
// QueryRawEvents is used to query the raw events table instead of the materialized view
QueryRawEvents bool
}

// Validate validates the configuration.
Expand All @@ -26,6 +70,37 @@ func (c AggregationConfiguration) Validate() error {
return fmt.Errorf("clickhouse: %w", err)
}

if c.Engine == "" {
return errors.New("engine is required")
}

if err := c.Engine.Validate(); err != nil {
return fmt.Errorf("engine: %w", err)
}

if c.EventsTableName == "" {
return errors.New("events table is required")
}

if c.AsyncInsertWait && !c.AsyncInsert {
return errors.New("async insert wait is set but async insert is not")
}

// Validate engine specific options
if c.Engine != AggregationEngineClickHouseMV {
if c.PopulateMeter {
return errors.New("populate meter is only supported with materialized view engine")
}

if c.CreateOrReplaceMeter {
return errors.New("create or replace meter is only with materialized view engine")
}

if c.QueryRawEvents {
return errors.New("query raw events is only with materialized view engine")
}
}

return nil
}

Expand Down Expand Up @@ -100,6 +175,11 @@ func (c ClickHouseAggregationConfiguration) GetClientOptions() *clickhouse.Optio

// ConfigureAggregation configures some defaults in the Viper instance.
func ConfigureAggregation(v *viper.Viper) {
v.SetDefault("aggregation.engine", AggregationEngineClickHouseMV)
v.SetDefault("aggregation.eventsTableName", "om_events")
v.SetDefault("aggregation.asyncInsert", false)
v.SetDefault("aggregation.asyncInsertWait", false)

v.SetDefault("aggregation.clickhouse.address", "127.0.0.1:9000")
v.SetDefault("aggregation.clickhouse.tls", false)
v.SetDefault("aggregation.clickhouse.database", "openmeter")
Expand Down
4 changes: 4 additions & 0 deletions app/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func TestComplete(t *testing.T) {
ConnMaxLifetime: 10 * time.Minute,
BlockBufferSize: 10,
},
Engine: AggregationEngineClickHouseMV,
EventsTableName: "om_events",
AsyncInsert: false,
AsyncInsertWait: false,
},
Sink: SinkConfiguration{
GroupId: "openmeter-sink-worker",
Expand Down
4 changes: 3 additions & 1 deletion app/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type SinkConfiguration struct {
IngestNotifications IngestNotificationsConfiguration
// Kafka client/Consumer configuration
Kafka KafkaConfig
// TODO: remove, config moved to aggregation config
// Storage configuration
Storage StorageConfiguration

Expand Down Expand Up @@ -102,7 +103,7 @@ type StorageConfiguration struct {
// before flushing them regularly to disk.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
AsyncInsert bool
// Set true if you want an insert statement to return with an acknowledgment immediatelyy
// Set true if you want an insert statement to return with an acknowledgment immediately
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool
Expand Down Expand Up @@ -154,6 +155,7 @@ func ConfigureSink(v *viper.Viper) {
v.SetDefault("sink.namespaceRefetchTimeout", "10s")
v.SetDefault("sink.namespaceTopicRegexp", "^om_([A-Za-z0-9]+(?:_[A-Za-z0-9]+)*)_events$")

// TODO: remove, config moved to aggregation config
// Sink Storage
v.SetDefault("sink.storage.asyncInsert", false)
v.SetDefault("sink.storage.asyncInsertWait", false)
Expand Down
4 changes: 2 additions & 2 deletions cmd/balance-worker/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions cmd/jobs/entitlement/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/app/common"
"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/meter"
"github.com/openmeterio/openmeter/openmeter/registry"
registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder"
"github.com/openmeterio/openmeter/openmeter/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver"
Expand Down Expand Up @@ -50,14 +50,7 @@ func initEntitlements(ctx context.Context, conf config.Configuration, logger *sl
return nil, fmt.Errorf("failed to initialize clickhouse client: %w", err)
}

streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
ClickHouse: clickHouseClient,
Database: conf.Aggregation.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: conf.Aggregation.CreateOrReplaceMeter,
PopulateMeter: conf.Aggregation.PopulateMeter,
})
streamingConnector, err := common.NewStreamingConnector(ctx, conf.Aggregation, clickHouseClient, meterRepository, logger)
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/notification-service/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func main() {
})

for _, meter := range conf.Meters {
err := app.StreamingConnector.CreateMeter(ctx, app.NamespaceManager.GetDefaultNamespace(), meter)
err := app.StreamingConnector.CreateMeter(ctx, app.NamespaceManager.GetDefaultNamespace(), *meter)
if err != nil {
slog.Warn("failed to initialize meter", "error", err)
os.Exit(1)
Expand Down
6 changes: 3 additions & 3 deletions cmd/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading