Skip to content

Commit

Permalink
Merge pull request #66 from xataio/add-full-support-for-elasticsearch
Browse files Browse the repository at this point in the history
Improve support for elasticsearch replication
  • Loading branch information
eminano authored Aug 30, 2024
2 parents ef950ec + e4a016e commit bab0a8e
Show file tree
Hide file tree
Showing 29 changed files with 1,584 additions and 725 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ One of exponential/constant backoff policies can be provided for the Kafka commi

| Environment Variable | Default | Required | Description |
| ------------------------------------------------------------ | ------- | -------- | -------------------------------------------------------------------------------------------------------------- |
| PGSTREAM_SEARCH_STORE_URL | N/A | Yes | URL for the search store to connect to. |
| PGSTREAM_OPENSEARCH_STORE_URL | N/A | Yes | URL for the opensearch store to connect to (at least one of the URLs must be provided). |
| PGSTREAM_ELASTICSEARCH_STORE_URL | N/A | Yes | URL for the elasticsearch store to connect to (at least one of the URLs must be provided). |
| PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to the search store is triggered. |
| PGSTREAM_SEARCH_INDEXER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store. |
| PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the search batch indexer for inflight batches. |
Expand Down
12 changes: 7 additions & 5 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
kafkacheckpoint "github.com/xataio/pgstream/pkg/wal/checkpointer/kafka"
kafkaprocessor "github.com/xataio/pgstream/pkg/wal/processor/kafka"
"github.com/xataio/pgstream/pkg/wal/processor/search"
"github.com/xataio/pgstream/pkg/wal/processor/search/opensearch"
"github.com/xataio/pgstream/pkg/wal/processor/search/store"
"github.com/xataio/pgstream/pkg/wal/processor/translator"
"github.com/xataio/pgstream/pkg/wal/processor/webhook/notifier"
"github.com/xataio/pgstream/pkg/wal/processor/webhook/subscription/server"
Expand Down Expand Up @@ -148,8 +148,9 @@ func parseKafkaWriterConfig(kafkaServers []string, kafkaTopic string) *kafkaproc
}

func parseSearchProcessorConfig() *stream.SearchProcessorConfig {
searchStore := viper.GetString("PGSTREAM_SEARCH_STORE_URL")
if searchStore == "" {
opensearchStore := viper.GetString("PGSTREAM_OPENSEARCH_STORE_URL")
elasticsearchStore := viper.GetString("PGSTREAM_ELASTICSEARCH_STORE_URL")
if opensearchStore == "" && elasticsearchStore == "" {
return nil
}

Expand All @@ -160,8 +161,9 @@ func parseSearchProcessorConfig() *stream.SearchProcessorConfig {
MaxQueueBytes: viper.GetInt64("PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES"),
CleanupBackoff: parseBackoffConfig("PGSTREAM_SEARCH_INDEXER_CLEANUP"),
},
Store: opensearch.Config{
URL: searchStore,
Store: store.Config{
OpenSearchURL: opensearchStore,
ElasticsearchURL: elasticsearchStore,
},
Retrier: search.StoreRetryConfig{
Backoff: parseBackoffConfig("PGSTREAM_SEARCH_STORE"),
Expand Down
37 changes: 18 additions & 19 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.2

require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/elastic/go-elasticsearch/v8 v8.0.0-20210311100734-5d6b0c808457
github.com/elastic/go-elasticsearch/v8 v8.14.0
github.com/go-logr/zerologr v1.2.3
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/google/go-cmp v0.6.0
Expand All @@ -13,14 +13,16 @@ require (
github.com/jackc/pgx/v5 v5.6.0
github.com/labstack/echo/v4 v4.12.0
github.com/mitchellh/mapstructure v1.5.0
github.com/opensearch-project/opensearch-go v1.1.0
github.com/pterm/pterm v0.12.79
github.com/rs/xid v1.5.0
github.com/rs/zerolog v1.32.0
github.com/segmentio/kafka-go v0.4.47
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.31.0
github.com/testcontainers/testcontainers-go v0.33.0
github.com/testcontainers/testcontainers-go/modules/elasticsearch v0.33.0
github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0
github.com/testcontainers/testcontainers-go/modules/opensearch v0.31.0
github.com/testcontainers/testcontainers-go/modules/postgres v0.31.0
Expand All @@ -34,25 +36,25 @@ require (
atomicgo.dev/schedule v0.1.0 // indirect
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Microsoft/hcsshim v0.11.4 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/containerd/console v1.0.3 // indirect
github.com/containerd/containerd v1.7.15 // indirect
github.com/containerd/containerd v1.7.18 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/platforms v0.2.1 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/docker v25.0.6+incompatible // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/docker v27.1.1+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gookit/color v1.5.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand All @@ -63,7 +65,7 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/lithammer/fuzzysearch v1.1.8 // indirect
Expand All @@ -72,6 +74,7 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
Expand Down Expand Up @@ -106,18 +109,14 @@ require (
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/mod v0.16.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit bab0a8e

Please sign in to comment.