Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gcppubsub output #13

Merged
merged 3 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions command/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import (
)

type logRunner struct {
logger *zap.SugaredLogger
cmd *cobra.Command
out *output.Options
pcapFile string
logger *zap.SugaredLogger
cmd *cobra.Command
out *output.Options
}

func newLogRunner(options *output.Options, logger *zap.Logger) *cobra.Command {
Expand Down
7 changes: 3 additions & 4 deletions command/pcap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ import (
)

type pcapRunner struct {
logger *zap.SugaredLogger
cmd *cobra.Command
out *output.Options
pcapFile string
logger *zap.SugaredLogger
cmd *cobra.Command
out *output.Options
}

func newPCAPRunner(options *output.Options, logger *zap.Logger) *cobra.Command {
Expand Down
7 changes: 7 additions & 0 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/andrewkroh/stream/pkg/output"

// Register outputs.
_ "github.com/andrewkroh/stream/pkg/output/gcppubsub"
_ "github.com/andrewkroh/stream/pkg/output/tcp"
_ "github.com/andrewkroh/stream/pkg/output/tls"
_ "github.com/andrewkroh/stream/pkg/output/udp"
Expand Down Expand Up @@ -65,6 +66,12 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().StringVar(&opts.WebhookOptions.Password, "webhook-password", "", "webhook password for basic authentication")
rootCmd.PersistentFlags().StringVar(&opts.WebhookOptions.Username, "webhook-username", "", "webhook username for basic authentication")

// GCP Pubsub output flags.
rootCmd.PersistentFlags().StringVar(&opts.GCPPubsubOptions.Project, "gcppubsub-project", "test", "GCP Pubsub project name")
rootCmd.PersistentFlags().StringVar(&opts.GCPPubsubOptions.Topic, "gcppubsub-topic", "topic", "GCP Pubsub topic name")
rootCmd.PersistentFlags().StringVar(&opts.GCPPubsubOptions.Subscription, "gcppubsub-subscription", "subscription", "GCP Pubsub subscription name")
rootCmd.PersistentFlags().BoolVar(&opts.GCPPubsubOptions.Clear, "gcppubsub-clear", true, "GCP Pubsub clear flag")

// Sub-commands.
rootCmd.AddCommand(newLogRunner(&opts, logger))
rootCmd.AddCommand(newPCAPRunner(&opts, logger))
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/andrewkroh/stream
go 1.15

require (
cloud.google.com/go/pubsub v1.0.1
github.com/elastic/go-concert v0.0.4
github.com/google/gopacket v1.1.19
github.com/spf13/cobra v1.1.1
Expand All @@ -12,4 +13,5 @@ require (
go.uber.org/zap v1.10.0
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
google.golang.org/api v0.13.0
)
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSR
cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY=
cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc=
cloud.google.com/go v0.46.3 h1:AVXDdKsrtX33oR9fbCMu/+c1o8Ofjq6Ku/MInaLVg5Y=
cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
cloud.google.com/go/pubsub v1.0.1 h1:W9tAK3E57P75u0XLLR82LZyw8VpAnhmyTOxW9qzmyj8=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
Expand Down Expand Up @@ -55,6 +57,7 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
Expand All @@ -68,6 +71,7 @@ github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OI
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
Expand All @@ -88,6 +92,7 @@ github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
Expand Down Expand Up @@ -177,6 +182,7 @@ github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec/go.mod h1:Wp40HwmjM59Fk
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down Expand Up @@ -230,11 +236,13 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -252,6 +260,7 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
Expand Down Expand Up @@ -285,6 +294,7 @@ google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEt
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
google.golang.org/api v0.9.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
google.golang.org/api v0.13.0 h1:Q3Ui3V3/CVinFWFiW39Iw0kMuVrRzYX0wN6OPFp0lTA=
google.golang.org/api v0.13.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsbkAI=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand All @@ -298,9 +308,11 @@ google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRn
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a h1:Ob5/580gVHBJZgXnff1cZDbG+xLtMVE5mDRTe+nIsX4=
google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
184 changes: 184 additions & 0 deletions pkg/output/gcppubsub/gcppubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package gcppubsub

import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
"os"

"cloud.google.com/go/pubsub"
"google.golang.org/api/iterator"

"github.com/andrewkroh/stream/pkg/output"
)

func init() {
output.Register("gcppubsub", New)
}

type Output struct {
opts *output.Options
client *pubsub.Client
cancelFunc func()
}

func New(opts *output.Options) (output.Output, error) {
if opts.Addr == "" {
return nil, errors.New("emulator address is required")
}

os.Setenv("PUBSUB_EMULATOR_HOST", opts.Addr)

ctx, cancel := context.WithCancel(context.Background())
client, err := pubsub.NewClient(ctx, opts.GCPPubsubOptions.Project)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create client: %v", err)
}

return &Output{opts: opts, client: client, cancelFunc: cancel}, nil
}

func (o *Output) DialContext(ctx context.Context) error {
// Disable HTTP keep-alives to ensure no extra goroutines hang around.
httpClient := http.Client{Transport: &http.Transport{DisableKeepAlives: true}}

// Sanity check the emulator.
resp, err := httpClient.Get("http://" + o.opts.Addr)
if err != nil {
return err
}
_, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
marc-gr marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %v", resp.StatusCode)
}

if o.opts.GCPPubsubOptions.Clear {
if err := o.clear(); err != nil {
return err
}
}

if err := o.createTopic(); err != nil {
return err
}

if err := o.createSubscription(); err != nil {
return err
}

return nil
}

func (o *Output) Close() error {
o.client.Topic(o.opts.GCPPubsubOptions.Topic).Stop()
o.cancelFunc()
return nil
}

func (o *Output) Write(b []byte) (int, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

topic := o.client.Topic(o.opts.GCPPubsubOptions.Topic)
result := topic.Publish(ctx, &pubsub.Message{Data: b})

// Wait for message to publish and get assigned ID.
if _, err := result.Get(ctx); err != nil {
return 0, err
}

return len(b), nil
}

func (o *Output) clear() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Clear all topics.
topics := o.client.Topics(ctx)
for {
topic, err := topics.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
if err = topic.Delete(ctx); err != nil {
return fmt.Errorf("failed to delete topic %v: %v", topic.ID(), err)
}
}

// Clear all subscriptions.
subs := o.client.Subscriptions(ctx)
for {
sub, err := subs.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}

if err = sub.Delete(ctx); err != nil {
return fmt.Errorf("failed to delete subscription %v: %v", sub.ID(), err)
}
}

return nil
}

func (o *Output) createTopic() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

topic := o.client.Topic(o.opts.GCPPubsubOptions.Topic)
exists, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("failed to check if topic exists: %v", err)
}

if !exists {
if _, err := o.client.CreateTopic(ctx, o.opts.GCPPubsubOptions.Topic); err != nil {
return fmt.Errorf("failed to create the topic: %v", err)
}
}

return nil
}

func (o *Output) createSubscription() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sub := o.client.Subscription(o.opts.GCPPubsubOptions.Subscription)
exists, err := sub.Exists(ctx)
if err != nil {
return fmt.Errorf("failed to check if sub exists: %v", err)
}

if !exists {
_, err := o.client.CreateSubscription(
ctx,
o.opts.GCPPubsubOptions.Subscription,
pubsub.SubscriptionConfig{
Topic: o.client.Topic(o.opts.GCPPubsubOptions.Topic),
},
)
if err != nil {
return fmt.Errorf("failed to create subscription: %v", err)
}
}

return nil
}
8 changes: 8 additions & 0 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Options struct {
RateLimit int // UDP rate limit in bytes.

WebhookOptions
GCPPubsubOptions
}

type WebhookOptions struct {
Expand All @@ -24,3 +25,10 @@ type WebhookOptions struct {
Username string // Basic auth username.
Password string // Basic auth password.
}

type GCPPubsubOptions struct {
Project string // Project name.
Topic string // Topic name. Will create it if not exists.
Subscription string // Subscription name. Will create it if not exists.
Clear bool // Clear will clear all topics and subscriptions before running.
}