From 7eabfc888a5f570d175fe5bb153ba3792eb7cd52 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Tue, 22 Dec 2020 12:51:00 +0100 Subject: [PATCH] [Filebeat] Add pubsub_alternative_host to gcp pubsub input (#23215) * Add pubsub_alternative_host to gcp pubsub input * Apply suggestions * Add changelog entry * Add new option comment * Make error more descriptive and reorder imports (cherry picked from commit 545598f48058408996529af3252b1b28d6dbf53d) --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/gcppubsub/config.go | 3 +++ x-pack/filebeat/input/gcppubsub/input.go | 33 ++++++++++++++++------- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a1c4cbf0e84..49d3c5ff12b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -535,6 +535,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Misp improvements: Migration to httpjson v2 config, pagination and deduplication ID {pull}23070[23070] - Add Google Workspace module and mark Gsuite module as deprecated {pull}22950[22950] - Mark m365 defender, defender atp, okta and google workspace modules as GA {pull}23113[23113] +- Added `alternative_host` option to google pubsub input {pull}23215[23215] *Heartbeat* diff --git a/x-pack/filebeat/input/gcppubsub/config.go b/x-pack/filebeat/input/gcppubsub/config.go index fdb4c30dfee..ec49368c0d9 100644 --- a/x-pack/filebeat/input/gcppubsub/config.go +++ b/x-pack/filebeat/input/gcppubsub/config.go @@ -36,6 +36,9 @@ type config struct { // JSON blob containing authentication credentials and key. CredentialsJSON []byte `config:"credentials_json"` + + // Overrides the default Pub/Sub service address and disables TLS. For testing. + AlternativeHost string `config:"alternative_host"` } func (c *config) Validate() error { diff --git a/x-pack/filebeat/input/gcppubsub/input.go b/x-pack/filebeat/input/gcppubsub/input.go index fe15808450d..dd8a60d8502 100644 --- a/x-pack/filebeat/input/gcppubsub/input.go +++ b/x-pack/filebeat/input/gcppubsub/input.go @@ -8,12 +8,14 @@ import ( "context" "crypto/sha256" "encoding/hex" + "fmt" "sync" "time" "cloud.google.com/go/pubsub" "github.com/pkg/errors" "google.golang.org/api/option" + "google.golang.org/grpc" "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" @@ -147,15 +149,7 @@ func (in *pubsubInput) run() error { ctx, cancel := context.WithCancel(in.workerCtx) defer cancel() - // Make pubsub client. - opts := []option.ClientOption{option.WithUserAgent(useragent.UserAgent("Filebeat", false))} - if in.CredentialsFile != "" { - opts = append(opts, option.WithCredentialsFile(in.CredentialsFile)) - } else if len(in.CredentialsJSON) > 0 { - option.WithCredentialsJSON(in.CredentialsJSON) - } - - client, err := pubsub.NewClient(ctx, in.ProjectID, opts...) + client, err := in.newPubsubClient(ctx) if err != nil { return err } @@ -250,3 +244,24 @@ func (in *pubsubInput) getOrCreateSubscription(ctx context.Context, client *pubs return nil, errors.New("no subscription exists and 'subscription.create' is not enabled") } + +func (in *pubsubInput) newPubsubClient(ctx context.Context) (*pubsub.Client, error) { + opts := []option.ClientOption{option.WithUserAgent(useragent.UserAgent("Filebeat", false))} + + if in.AlternativeHost != "" { + // this will be typically set because we want to point the input to a testing pubsub emulator + conn, err := grpc.Dial(in.AlternativeHost, grpc.WithInsecure()) + if err != nil { + return nil, fmt.Errorf("cannot connect to alternative host %q: %w", in.AlternativeHost, err) + } + opts = append(opts, option.WithGRPCConn(conn), option.WithTelemetryDisabled()) + } + + if in.CredentialsFile != "" { + opts = append(opts, option.WithCredentialsFile(in.CredentialsFile)) + } else if len(in.CredentialsJSON) > 0 { + opts = append(opts, option.WithCredentialsJSON(in.CredentialsJSON)) + } + + return pubsub.NewClient(ctx, in.ProjectID, opts...) +}