Skip to content

Commit

Permalink
feat: Raccoon CLI (#92)
Browse files Browse the repository at this point in the history
* cmd: add basic CLI harness

* cmd: wip on server flags

* cmd: server: bind flags for event acknowledgement

* cmd: server: add kafka client flags

* cli: remove usused comments

* cmd: simplify boolean flags

* cmd: fix --event.ack parsing

* config: kafka: make bootstrap server config required

* cmd: fix duration flags not propagating values to config

* cmd: chore: allow flags on root command

* ci: update tests to use raccoon cli

* ci: fix compose not starting raccoon server

* chore: update config tests

* wip: migration to salt/config

* config: replace use of time.Duration values with ints

This makes it easier to parse data from config files and cli flags

* config: WIP validation

* config: refactor validation + add fallbacks

* docs: kinesis: mark kinesis credentials file required

* config: purge utils

* chore: tidy deps

* config: chore: use consolidate config values

* wip: config: salt/config integration

* config: refactor config to comply with salt/config

* config: revamp error validation

* config: fix defaults

* config: fix tags on publisher config

* config: purge tests

* config: add default for server.websocket.conn.group.default

* docs: fix ping/pong and write timeout defaults

The docs incorrectly specified their values in seconds, when in
reality raccoon expects them to be milliseconds

* config: move WORKER_KAFKA_DELIVERY_CHANNEL_SIZE to kafka publisher

It's now called PUBLISHER_KAFKA_DELIVERY_CHANNEL_SIZE

* cmd: server: add flag for config file

* config: update cors to use comma separated values

* ci: add testing yaml config

* chore: add sample raccoon config

* cli: integrate salt/cmdx

* config: make kafka config type-concrete

* misc: fix panic on server shutdown when prometheus metrics are enabled

* ci: update test environment

* docker: make image act as a CLI

* chore: removed unused configs

* config: kafka: omit empty configs

* cmd: add dynamically generated cli flags

* cmd: remove duration flag parser

raccoon now uses integer values to represent most duration
parameters.

* cmd: server: simplify flag parsing
  • Loading branch information
turtleDev authored Aug 27, 2024
1 parent 9d8f2c1 commit f04adb7
Show file tree
Hide file tree
Showing 45 changed files with 2,334 additions and 834 deletions.
42 changes: 0 additions & 42 deletions .env.sample

This file was deleted.

41 changes: 0 additions & 41 deletions .env.test

This file was deleted.

4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ jobs:
go-version: "1.22.4"
- name: Checkout code
uses: actions/checkout@v2
- name: Initialise test config
run: cp .env.test .env
- name: Start raccoon
run: make docker-run
- name: Run tests
Expand Down Expand Up @@ -47,6 +45,8 @@ jobs:
- uses: actions/checkout@v2
- name: Build
run: make build
- name: Smoke test
run: ./raccoon
benchmark:
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ RUN make build
FROM debian:bookworm-slim
WORKDIR /app
COPY --from=0 /app/raccoon ./raccoon
CMD ["./raccoon"]
ENTRYPOINT [ "/app/raccoon" ]
2 changes: 1 addition & 1 deletion Dockerfile.release
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM debian:bookworm-slim
WORKDIR /app
COPY raccoon .
CMD ["./raccoon"]
ENTRYPOINT [ "/app/raccoon" ]
62 changes: 37 additions & 25 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ type Publisher interface {

// StartServer starts the server
func StartServer(ctx context.Context, cancel context.CancelFunc) {
bufferChannel := make(chan collector.CollectRequest, config.Worker.ChannelSize)
bufferChannel := make(chan collector.CollectRequest, config.Worker.Buffer.ChannelSize)
httpServices := services.Create(bufferChannel)
logger.Info("Start Server -->")
httpServices.Start(ctx, cancel)
logger.Infof("Start publisher --> %s", config.Publisher)
logger.Infof("Start publisher --> %s", config.Publisher.Type)
publisher, err := initPublisher()
if err != nil {
logger.Errorf("Error creating %q publisher: %v\n", config.Publisher, err)
logger.Errorf("Error creating %q publisher: %v\n", config.Publisher.Type, err)
logger.Info("Exiting server")
os.Exit(0)
}

logger.Info("Start worker -->")
workerPool := worker.CreateWorkerPool(config.Worker.WorkersPoolSize, bufferChannel, config.Worker.DeliveryChannelSize, publisher)
workerPool := worker.CreateWorkerPool(config.Worker.PoolSize, bufferChannel, publisher)
workerPool.StartWorkers()

go reportProcMetrics()
Expand All @@ -58,18 +58,19 @@ func StartServer(ctx context.Context, cancel context.CancelFunc) {
func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collector.CollectRequest, workerPool *worker.Pool, pub Publisher) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
workerFlushTimeout := time.Duration(config.Worker.Buffer.FlushTimeoutMS) * time.Millisecond
for {
sig := <-signalChan
switch sig {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
logger.Info(fmt.Sprintf("[App.Server] Received a signal %s", sig))
httpServices.Shutdown(ctx)
logger.Info("Server shutdown all the listeners")
timedOut := workerPool.FlushWithTimeOut(config.Worker.WorkerFlushTimeout)
timedOut := workerPool.FlushWithTimeOut(workerFlushTimeout)
if timedOut {
logger.Info(fmt.Sprintf("WorkerPool flush timedout %t", timedOut))
}
flushInterval := config.PublisherKafka.FlushInterval
flushInterval := config.Publisher.Kafka.FlushInterval
logger.Infof("Closing %q producer\n", pub.Name())
logger.Info(fmt.Sprintf("Wait %d ms for all messages to be delivered", flushInterval))
eventsInProducer := 0
Expand All @@ -93,9 +94,9 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices
fmt.Sprintf("%s_messages_delivered_total", pub.Name()),
int64(eventsInChannel+eventsInProducer),
map[string]string{
"success": "false",
"conn_group": "NA",
"event_type": "NA",
"topic": "NA",
},
)
logger.Info("Exiting server")
Expand All @@ -108,7 +109,8 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices

func reportProcMetrics() {
m := &runtime.MemStats{}
for range time.Tick(config.MetricInfo.RuntimeStatsRecordInterval) {
reportInterval := time.Duration(config.Metric.RuntimeStatsRecordIntervalMS) * time.Millisecond
for range time.Tick(reportInterval) {
metrics.Gauge("server_go_routines_count_current", runtime.NumGoroutine(), map[string]string{})
runtime.ReadMemStats(m)
metrics.Gauge("server_mem_heap_alloc_bytes_current", m.HeapAlloc, map[string]string{})
Expand All @@ -123,50 +125,60 @@ func reportProcMetrics() {
}

func initPublisher() (Publisher, error) {
switch config.Publisher {
switch config.Publisher.Type {
case "kafka":
return kafka.New()
case "pubsub":
client, err := pubsubsdk.NewClient(
context.Background(),
config.PublisherPubSub.ProjectId,
option.WithCredentialsFile(config.PublisherPubSub.CredentialsFile),
config.Publisher.PubSub.ProjectId,
option.WithCredentialsFile(config.Publisher.PubSub.CredentialsFile),
)
if err != nil {
return nil, fmt.Errorf("error creating pubsub client: %w", err)
}
var (
topicRetention = time.Duration(config.Publisher.PubSub.TopicRetentionPeriodMS) * time.Millisecond
delayThreshold = time.Duration(config.Publisher.PubSub.PublishDelayThresholdMS) * time.Millisecond
publishTimeout = time.Duration(config.Publisher.PubSub.PublishTimeoutMS) * time.Millisecond
)
return pubsub.New(
client,
pubsub.WithTopicFormat(config.EventDistribution.PublisherPattern),
pubsub.WithTopicAutocreate(config.PublisherPubSub.TopicAutoCreate),
pubsub.WithTopicRetention(config.PublisherPubSub.TopicRetentionPeriod),
pubsub.WithDelayThreshold(config.PublisherPubSub.PublishDelayThreshold),
pubsub.WithCountThreshold(config.PublisherPubSub.PublishCountThreshold),
pubsub.WithByteThreshold(config.PublisherPubSub.PublishByteThreshold),
pubsub.WithTimeout(config.PublisherPubSub.PublishTimeout),
pubsub.WithTopicFormat(config.Event.DistributionPublisherPattern),
pubsub.WithTopicAutocreate(config.Publisher.PubSub.TopicAutoCreate),
pubsub.WithTopicRetention(topicRetention),
pubsub.WithDelayThreshold(delayThreshold),
pubsub.WithCountThreshold(config.Publisher.PubSub.PublishCountThreshold),
pubsub.WithByteThreshold(config.Publisher.PubSub.PublishByteThreshold),
pubsub.WithTimeout(publishTimeout),
)
case "kinesis":
cfg, err := awsconfig.LoadDefaultConfig(
context.Background(),
awsconfig.WithRegion(config.PublisherKinesis.Region),
awsconfig.WithRegion(config.Publisher.Kinesis.Region),
awsconfig.WithSharedConfigFiles(
[]string{config.PublisherKinesis.CredentialsFile},
[]string{config.Publisher.Kinesis.CredentialsFile},
),
)
if err != nil {
return nil, fmt.Errorf("error locating aws credentials: %w", err)
}
conf := config.PublisherKinesis
var (
conf = config.Publisher.Kinesis
publishTimeout = time.Duration(conf.PublishTimeoutMS) * time.Millisecond
probeInterval = time.Duration(conf.StreamProbeIntervalMS) * time.Millisecond
)

return kinesis.New(
kinesissdk.NewFromConfig(cfg),
kinesis.WithStreamPattern(config.EventDistribution.PublisherPattern),
kinesis.WithStreamPattern(config.Event.DistributionPublisherPattern),
kinesis.WithStreamAutocreate(conf.StreamAutoCreate),
kinesis.WithStreamMode(types.StreamMode(conf.StreamMode)),
kinesis.WithShards(conf.DefaultShards),
kinesis.WithPublishTimeout(conf.PublishTimeout),
kinesis.WithStreamProbleInterval(conf.StreamProbeInterval),
kinesis.WithPublishTimeout(publishTimeout),
kinesis.WithStreamProbleInterval(probeInterval),
)
default:
return nil, fmt.Errorf("unknown publisher: %v", config.Publisher)
return nil, fmt.Errorf("unknown publisher: %v", config.Publisher.Type)
}
}
43 changes: 43 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package cmd

import (
"github.com/MakeNowJust/heredoc"
"github.com/raystack/salt/cmdx"
"github.com/spf13/cobra"
)

func New() *cobra.Command {
root := &cobra.Command{
Use: "raccoon",
Short: "Scalable event ingestion tool",
SilenceUsage: true,
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
},
Args: cobra.NoArgs,
Long: heredoc.Doc(`
Raccoon is a high-throughput, low-latency service to collect
events in real-time from your web, mobile apps, and services
using multiple network protocols.`),
Run: func(cmd *cobra.Command, args []string) {
cmd.Help()
},
Annotations: map[string]string{
"group": "core",
"help:learn": heredoc.Doc(`
Use 'raccoon <command> --help' for more information about a command.
Read the manual at https://raystack.github.io/raccoon/
`),
"help:feedback": heredoc.Doc(`
Open an issue here https://github.com/raystack/raccoon/issues
`),
},
}

cmdx.SetHelp(root)
root.AddCommand(cmdx.SetCompletionCmd("raccoon"))
root.AddCommand(cmdx.SetRefCmd(root))

root.AddCommand(serverCommand())
return root
}
90 changes: 90 additions & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package cmd

import (
"fmt"
"reflect"
"strconv"

"github.com/raystack/raccoon/app"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
"github.com/raystack/raccoon/middleware"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

func serverCommand() *cobra.Command {
var configFile = "config.yaml"
command := &cobra.Command{
Use: "server",
Short: "Start raccoon server",
RunE: func(cmd *cobra.Command, args []string) error {
err := config.Load(configFile)
if err != nil {
return err
}
middleware.Load()
metrics.Setup()
defer metrics.Close()
logger.SetLevel(config.Log.Level)
return app.Run()
},
}
command.Flags().SortFlags = false
command.Flags().StringVarP(&configFile, "config", "c", configFile, "path to config file")
for _, cfg := range config.Walk() {
bindFlag(command.Flags(), cfg.Ref, cfg.Meta)
}
return command
}

func bindFlag(flag *pflag.FlagSet, ref any, meta reflect.StructField) {

flagName := meta.Tag.Get("cmdx")
desc := meta.Tag.Get("desc")

switch v := ref.(type) {
case *config.AckType:
flag.Var(ackTypeFlag{v}, flagName, desc)
case *string:
flag.StringVar(v, flagName, *v, desc)
case *int:
flag.IntVar(v, flagName, *v, desc)
case *int64:
flag.Int64Var(v, flagName, *v, desc)
case *uint32:
flag.Uint32Var(v, flagName, *v, desc)
case *bool:
flag.BoolVar(v, flagName, *v, desc)
case *[]string:
flag.StringSliceVar(v, flagName, *v, desc)
default:
msg := fmt.Sprintf("unsupport flag of type %T", ref)
panic(msg)
}
}

type ackTypeFlag struct {
value *config.AckType
}

func (af ackTypeFlag) String() string {
if af.value == nil {
return "0"
}
return fmt.Sprintf("%d", *af.value)
}

func (af ackTypeFlag) Set(raw string) error {
v, err := strconv.ParseInt(raw, 10, 0)
if err != nil {
return fmt.Errorf("error parsing bool: %w", err)
}
*af.value = config.AckType(v)
return nil
}

func (af ackTypeFlag) Type() string {
return "int"
}
Loading

0 comments on commit f04adb7

Please sign in to comment.