-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Pubsub sink #79
Conversation
This change introduces a new config flag PUBLISHER_TYPE This is used to initialise & configure amongst available publishers. (currently kafka & pubsub). Defaults to "kafka"
This commit changes the metrics exported by pubsub publisher to be semantically compatiable with metrics exported by kafka publisher.
There was a bug in kafka publisher, where when a static topic was specified as the topic format, it would produce the result "static-format%!(EXTRA string=event-type)" This is caused by fmt.Sprintf's behaviour of adding any unspecified operands to the output.
with: | ||
go-version: "1.18" | ||
- name: setup integration environment | ||
uses: ./.github/workflows/integration-test-setup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turtleDev Let's inline this. Any reason to split this workflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This action contains the common steps necessary for running integration tests for kafka and pubsub.
This was introduced to reduce redundant workflow steps within our testing pipelines.
app/server.go
Outdated
} | ||
|
||
func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collection.CollectRequest, workerPool *worker.Pool, kp *publisher.Kafka) { | ||
func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collection.CollectRequest, workerPool *worker.Pool, kp Publisher) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turtleDev Let's update short var kp
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to pub
.
app/server.go
Outdated
"os" | ||
"os/signal" | ||
"runtime" | ||
"syscall" | ||
"time" | ||
|
||
"cloud.google.com/go/pubsub" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turtleDev what is the need of this dependency here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
publisher.NewPubSub()
doesn't create the pubsub client by itself. It leaves that responsiblity to the client code (in this case, initPublisher()
)
publisher/pubsub.go
Outdated
@@ -0,0 +1,234 @@ | |||
package publisher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turtleDev let's create a dedicated package for each publisher within publisher folder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
This PR adds support for GCP PubSub Sink (#77) to Raccoon.
Configuration
This change introduces the following new configurations:
PUBLISHER_TYPE
configures which publisher to use.kafka
,pubsub
kafka
PUBLISHER_PUBSUB_PROJECT_ID
project ID of the target GCP project. (required)PUBLISHER_PUBSUB_CREDENTIALS
(required, but can also alternatively useGOOGLE_APPLICATION_CREDENTIALS
)PUBLISHER_PUBSUB_TOPIC_AUTOCREATE
controls whether unknown topics should be created (defaults tofalse
)PUBLISHER_PUBSUB_TOPIC_RETENTION_MS
defines the message retention policy for topics created by Raccon. Valid values must be between 10 minutes and 31 days (see docs)PUBLISHER_PUBSUB_PUBLISH_DELAY_THRESHOLD_MS
controls the time delay before messages are published (defaults to 10ms)PUBLISHER_PUBSUB_PUBLISH_COUNT_THRESHOLD
specifies the internal buffer length (defaults to 100 messages)PUBLISHER_PUBSUB_PUBLISH_BYTE_THRESHOLD
specifies the internal buffer size (defaults to ~1MiB)PUBLISHER_PUBSUB_PUBLISH_TIMEOUT_MS
specifies how long raccoon tries to publish a batch of messages before giving up. (defaults to 1 minute)