Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka sink #3

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

sink "github.com/mr-karan/nomad-events-sink/internal/sinks"
"github.com/mr-karan/nomad-events-sink/pkg/stream"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slog"
)

type Opts struct {
Expand All @@ -17,7 +17,7 @@ type Opts struct {
// App is the global container that holds
// objects of various routines that run on boot.
type App struct {
log *logrus.Logger
log *slog.Logger
stream *stream.Stream
sink sink.Sink
opts Opts
Expand All @@ -40,7 +40,7 @@ func (app *App) Start(ctx context.Context) {
// from last event which is processed.
err := app.stream.InitIndex(ctx)
if err != nil {
app.log.WithError(err).Fatal("error initialising index store")
app.log.Error("error initialising index store", "error", err)
}

for _, t := range app.opts.topics {
Expand All @@ -50,7 +50,7 @@ func (app *App) Start(ctx context.Context) {
defer wg.Done()
// Subscribe to events.
if err := app.stream.Subscribe(ctx, topic, app.opts.maxReconnectAttempts); err != nil {
app.log.WithField("topic", topic).WithError(err).Fatal("error subscribing to events")
app.log.Error("error subscribing to events", "error", err, "topic", topic)
}
}()
}
Expand Down
37 changes: 20 additions & 17 deletions cmd/app/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ import (
sink "github.com/mr-karan/nomad-events-sink/internal/sinks"
"github.com/mr-karan/nomad-events-sink/internal/sinks/provider"
"github.com/mr-karan/nomad-events-sink/pkg/stream"
"github.com/sirupsen/logrus"
flag "github.com/spf13/pflag"
"golang.org/x/exp/slog"
)

// initLogger initializes logger.
func initLogger(ko *koanf.Koanf) *logrus.Logger {
logger := logrus.New()
logger.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
DisableLevelTruncation: true,
})
if ko.String("app.log") == "debug" {
logger.SetLevel(logrus.DebugLevel)
// initLogger initialies a logger.
func initLogger(lvl string) *slog.Logger {
opts := slog.HandlerOptions{
AddSource: true,
Level: slog.LevelInfo,
}
if lvl == "debug" {
opts.Level = slog.LevelDebug
}
return logger

return slog.New(opts.NewTextHandler(os.Stdout))
}

// initConfig loads config to `ko`
Expand Down Expand Up @@ -74,7 +74,7 @@ func initConfig(cfgDefault string, envPrefix string) (*koanf.Koanf, error) {
return ko, nil
}

func initSink(ko *koanf.Koanf, log *logrus.Logger) sink.Sink {
func initSink(ko *koanf.Koanf, log *slog.Logger) sink.Sink {
// Initialise HTTP Provider.
http, err := provider.NewHTTP(
provider.HTTPOpts{
Expand All @@ -87,7 +87,8 @@ func initSink(ko *koanf.Koanf, log *logrus.Logger) sink.Sink {
HealthCheckStatus: ko.Int("sinks.http.healthcheck.status"),
})
if err != nil {
log.WithError(err).Fatal("error initialising http sink provider")
log.Error("error initialising http sink provider", "error", err)
exit()
}

sink := sink.New([]provider.Provider{http}, sink.Opts{
Expand All @@ -98,20 +99,22 @@ func initSink(ko *koanf.Koanf, log *logrus.Logger) sink.Sink {
Log: log,
})
if err != nil {
log.WithError(err).Fatal("error initialising sink")
log.Error("error initialising sink", "error", err)
exit()
}
return sink
}

func initStream(ctx context.Context, ko *koanf.Koanf, log *logrus.Logger, cb stream.CallbackFunc) *stream.Stream {
func initStream(ctx context.Context, ko *koanf.Koanf, log *slog.Logger, cb stream.CallbackFunc) *stream.Stream {
s, err := stream.New(
ko.String("app.data_dir"),
ko.Duration("app.commit_index_interval"),
cb,
true,
log,
)
if err != nil {
log.WithError(err).Fatal("error initialising stream")
log.Error("error initialising stream", "error", err)
exit()
}

return s
Expand Down
5 changes: 3 additions & 2 deletions cmd/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
var (
// Version of the build. This is injected at build-time.
buildString = "unknown"
exit = func() { os.Exit(1) }
)

func main() {
Expand All @@ -28,7 +29,7 @@ func main() {
}

var (
log = initLogger(ko)
log = initLogger(ko.String("app.log"))
sink = initSink(ko, log)
stream = initStream(ctx, ko, log, func(e api.Event, meta stream.Meta) {
sink.Add(e)
Expand All @@ -45,6 +46,6 @@ func main() {
}

// Start an instance of app.
app.log.WithField("version", buildString).Info("booting nomad events collector")
app.log.Info("booting nomad events collector", "version", buildString)
app.Start(ctx)
}
3 changes: 1 addition & 2 deletions config.sample.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[app]
log = "debug" # `debug` for verbose logs. `info` otherwise.
data_dir = "./data/events" # Directory to store `index.json` file which is used to set index offset when listening for events.
commit_index_interval = "30s" # Interval at which `index.json` gets stored in `data_dir`.
commit_index_interval = "5s" # Interval at which `index.json` gets stored in `data_dir`.

[stream]
topics = ["Deployment", "Allocation", "Evaluation", "Job", "Node"] # Topics to subscribe events for.
Expand All @@ -18,7 +18,6 @@ events_count = 5 # If a batch has more events than `events_counts`, it is flushe
root_url = "http://localhost:3333" # HTTP server URL to `POST` events data to.
timeout = "7s" # Timeout for the ingestion request.
max_idle_conns = 100 # Number of keep-alive connections to keep in pool.

[sinks.http.healthcheck]
enabled = false # Abort if the upstream is unhealthy. This check is performed only during start of program.
url = "http://localhost:3333" # Ping endpoint for the HTTP provider.
Expand Down
24 changes: 13 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
module github.com/mr-karan/nomad-events-sink

go 1.18
go 1.20

require (
github.com/hashicorp/nomad/api v0.0.0-20211103234928-04cab9dbecd3
github.com/knadh/koanf v1.3.2
github.com/sirupsen/logrus v1.8.1
github.com/hashicorp/nomad/api v0.0.0-20230331162532-78c90c57c530
github.com/knadh/koanf v1.5.0
github.com/spf13/pflag v1.0.5
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
)

require (
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/hashicorp/cronexpr v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/cronexpr v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pelletier/go-toml v1.7.0 // indirect
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
golang.org/x/sys v0.6.0 // indirect
)
Loading