Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Raccoon CLI #92

Merged
merged 44 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
0680791
cmd: add basic CLI harness
turtleDev Aug 1, 2024
dcf9d90
cmd: wip on server flags
turtleDev Aug 5, 2024
17cfb24
cmd: server: bind flags for event acknowledgement
turtleDev Aug 5, 2024
a80e037
cmd: server: add kafka client flags
turtleDev Aug 6, 2024
c393338
cli: remove usused comments
turtleDev Aug 6, 2024
c3a481e
cmd: simplify boolean flags
turtleDev Aug 6, 2024
1d9eea7
cmd: fix --event.ack parsing
turtleDev Aug 6, 2024
36fa95b
config: kafka: make bootstrap server config required
turtleDev Aug 6, 2024
3c19fe0
cmd: fix duration flags not propagating values to config
turtleDev Aug 6, 2024
a123ad6
cmd: chore: allow flags on root command
turtleDev Aug 6, 2024
ceda933
ci: update tests to use raccoon cli
turtleDev Aug 6, 2024
98bed57
ci: fix compose not starting raccoon server
turtleDev Aug 6, 2024
d8d5029
chore: update config tests
turtleDev Aug 6, 2024
a8625be
wip: migration to salt/config
turtleDev Aug 14, 2024
9be8641
config: replace use of time.Duration values with ints
turtleDev Aug 14, 2024
6839586
config: WIP validation
turtleDev Aug 14, 2024
c7a392c
config: refactor validation + add fallbacks
turtleDev Aug 14, 2024
4138a3f
docs: kinesis: mark kinesis credentials file required
turtleDev Aug 14, 2024
bc7de1f
config: purge utils
turtleDev Aug 14, 2024
6e5b674
chore: tidy deps
turtleDev Aug 14, 2024
5f925d6
config: chore: use consolidate config values
turtleDev Aug 14, 2024
f42f4bc
wip: config: salt/config integration
turtleDev Aug 14, 2024
c47efbb
config: refactor config to comply with salt/config
turtleDev Aug 15, 2024
9527560
config: revamp error validation
turtleDev Aug 15, 2024
aaaf3bf
config: fix defaults
turtleDev Aug 15, 2024
7442e94
config: fix tags on publisher config
turtleDev Aug 15, 2024
42d9fe3
config: purge tests
turtleDev Aug 15, 2024
881d56a
config: add default for server.websocket.conn.group.default
turtleDev Aug 15, 2024
2c58e3e
docs: fix ping/pong and write timeout defaults
turtleDev Aug 16, 2024
bdd7eff
config: move WORKER_KAFKA_DELIVERY_CHANNEL_SIZE to kafka publisher
turtleDev Aug 16, 2024
da3e91d
cmd: server: add flag for config file
turtleDev Aug 16, 2024
93e95ba
config: update cors to use comma separated values
turtleDev Aug 17, 2024
04a4d02
ci: add testing yaml config
turtleDev Aug 17, 2024
8d39df8
chore: add sample raccoon config
turtleDev Aug 17, 2024
1c128d8
cli: integrate salt/cmdx
turtleDev Aug 17, 2024
d55de64
config: make kafka config type-concrete
turtleDev Aug 17, 2024
9f70396
misc: fix panic on server shutdown when prometheus metrics are enabled
turtleDev Aug 17, 2024
5c8f661
ci: update test environment
turtleDev Aug 17, 2024
5e43b94
docker: make image act as a CLI
turtleDev Aug 18, 2024
a44ba4c
chore: removed unused configs
turtleDev Aug 18, 2024
d828316
config: kafka: omit empty configs
turtleDev Aug 18, 2024
db00cf5
cmd: add dynamically generated cli flags
turtleDev Aug 18, 2024
7f14b58
cmd: remove duration flag parser
turtleDev Aug 18, 2024
3a499e7
cmd: server: simplify flag parsing
turtleDev Aug 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 0 additions & 42 deletions .env.sample

This file was deleted.

41 changes: 0 additions & 41 deletions .env.test

This file was deleted.

4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ jobs:
go-version: "1.22.4"
- name: Checkout code
uses: actions/checkout@v2
- name: Initialise test config
run: cp .env.test .env
- name: Start raccoon
run: make docker-run
- name: Run tests
Expand Down Expand Up @@ -47,6 +45,8 @@ jobs:
- uses: actions/checkout@v2
- name: Build
run: make build
- name: Smoke test
run: ./raccoon
benchmark:
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ RUN make build
FROM debian:bookworm-slim
WORKDIR /app
COPY --from=0 /app/raccoon ./raccoon
CMD ["./raccoon"]
ENTRYPOINT [ "/app/raccoon" ]
2 changes: 1 addition & 1 deletion Dockerfile.release
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM debian:bookworm-slim
WORKDIR /app
COPY raccoon .
CMD ["./raccoon"]
ENTRYPOINT [ "/app/raccoon" ]
62 changes: 37 additions & 25 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ type Publisher interface {

// StartServer starts the server
func StartServer(ctx context.Context, cancel context.CancelFunc) {
bufferChannel := make(chan collector.CollectRequest, config.Worker.ChannelSize)
bufferChannel := make(chan collector.CollectRequest, config.Worker.Buffer.ChannelSize)
httpServices := services.Create(bufferChannel)
logger.Info("Start Server -->")
httpServices.Start(ctx, cancel)
logger.Infof("Start publisher --> %s", config.Publisher)
logger.Infof("Start publisher --> %s", config.Publisher.Type)
publisher, err := initPublisher()
if err != nil {
logger.Errorf("Error creating %q publisher: %v\n", config.Publisher, err)
logger.Errorf("Error creating %q publisher: %v\n", config.Publisher.Type, err)
logger.Info("Exiting server")
os.Exit(0)
}

logger.Info("Start worker -->")
workerPool := worker.CreateWorkerPool(config.Worker.WorkersPoolSize, bufferChannel, config.Worker.DeliveryChannelSize, publisher)
workerPool := worker.CreateWorkerPool(config.Worker.PoolSize, bufferChannel, publisher)
workerPool.StartWorkers()

go reportProcMetrics()
Expand All @@ -58,18 +58,19 @@ func StartServer(ctx context.Context, cancel context.CancelFunc) {
func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collector.CollectRequest, workerPool *worker.Pool, pub Publisher) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
workerFlushTimeout := time.Duration(config.Worker.Buffer.FlushTimeoutMS) * time.Millisecond
for {
sig := <-signalChan
switch sig {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
logger.Info(fmt.Sprintf("[App.Server] Received a signal %s", sig))
httpServices.Shutdown(ctx)
logger.Info("Server shutdown all the listeners")
timedOut := workerPool.FlushWithTimeOut(config.Worker.WorkerFlushTimeout)
timedOut := workerPool.FlushWithTimeOut(workerFlushTimeout)
if timedOut {
logger.Info(fmt.Sprintf("WorkerPool flush timedout %t", timedOut))
}
flushInterval := config.PublisherKafka.FlushInterval
flushInterval := config.Publisher.Kafka.FlushInterval
logger.Infof("Closing %q producer\n", pub.Name())
logger.Info(fmt.Sprintf("Wait %d ms for all messages to be delivered", flushInterval))
eventsInProducer := 0
Expand All @@ -93,9 +94,9 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices
fmt.Sprintf("%s_messages_delivered_total", pub.Name()),
int64(eventsInChannel+eventsInProducer),
map[string]string{
"success": "false",
"conn_group": "NA",
"event_type": "NA",
"topic": "NA",
},
)
logger.Info("Exiting server")
Expand All @@ -108,7 +109,8 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices

func reportProcMetrics() {
m := &runtime.MemStats{}
for range time.Tick(config.MetricInfo.RuntimeStatsRecordInterval) {
reportInterval := time.Duration(config.Metric.RuntimeStatsRecordIntervalMS) * time.Millisecond
for range time.Tick(reportInterval) {
metrics.Gauge("server_go_routines_count_current", runtime.NumGoroutine(), map[string]string{})
runtime.ReadMemStats(m)
metrics.Gauge("server_mem_heap_alloc_bytes_current", m.HeapAlloc, map[string]string{})
Expand All @@ -123,50 +125,60 @@ func reportProcMetrics() {
}

func initPublisher() (Publisher, error) {
switch config.Publisher {
switch config.Publisher.Type {
case "kafka":
return kafka.New()
case "pubsub":
client, err := pubsubsdk.NewClient(
context.Background(),
config.PublisherPubSub.ProjectId,
option.WithCredentialsFile(config.PublisherPubSub.CredentialsFile),
config.Publisher.PubSub.ProjectId,
option.WithCredentialsFile(config.Publisher.PubSub.CredentialsFile),
)
if err != nil {
return nil, fmt.Errorf("error creating pubsub client: %w", err)
}
var (
topicRetention = time.Duration(config.Publisher.PubSub.TopicRetentionPeriodMS) * time.Millisecond
delayThreshold = time.Duration(config.Publisher.PubSub.PublishDelayThresholdMS) * time.Millisecond
publishTimeout = time.Duration(config.Publisher.PubSub.PublishTimeoutMS) * time.Millisecond
)
return pubsub.New(
client,
pubsub.WithTopicFormat(config.EventDistribution.PublisherPattern),
pubsub.WithTopicAutocreate(config.PublisherPubSub.TopicAutoCreate),
pubsub.WithTopicRetention(config.PublisherPubSub.TopicRetentionPeriod),
pubsub.WithDelayThreshold(config.PublisherPubSub.PublishDelayThreshold),
pubsub.WithCountThreshold(config.PublisherPubSub.PublishCountThreshold),
pubsub.WithByteThreshold(config.PublisherPubSub.PublishByteThreshold),
pubsub.WithTimeout(config.PublisherPubSub.PublishTimeout),
pubsub.WithTopicFormat(config.Event.DistributionPublisherPattern),
pubsub.WithTopicAutocreate(config.Publisher.PubSub.TopicAutoCreate),
pubsub.WithTopicRetention(topicRetention),
pubsub.WithDelayThreshold(delayThreshold),
pubsub.WithCountThreshold(config.Publisher.PubSub.PublishCountThreshold),
pubsub.WithByteThreshold(config.Publisher.PubSub.PublishByteThreshold),
pubsub.WithTimeout(publishTimeout),
)
case "kinesis":
cfg, err := awsconfig.LoadDefaultConfig(
context.Background(),
awsconfig.WithRegion(config.PublisherKinesis.Region),
awsconfig.WithRegion(config.Publisher.Kinesis.Region),
awsconfig.WithSharedConfigFiles(
[]string{config.PublisherKinesis.CredentialsFile},
[]string{config.Publisher.Kinesis.CredentialsFile},
),
)
if err != nil {
return nil, fmt.Errorf("error locating aws credentials: %w", err)
}
conf := config.PublisherKinesis
var (
conf = config.Publisher.Kinesis
publishTimeout = time.Duration(conf.PublishTimeoutMS) * time.Millisecond
probeInterval = time.Duration(conf.StreamProbeIntervalMS) * time.Millisecond
)

return kinesis.New(
kinesissdk.NewFromConfig(cfg),
kinesis.WithStreamPattern(config.EventDistribution.PublisherPattern),
kinesis.WithStreamPattern(config.Event.DistributionPublisherPattern),
kinesis.WithStreamAutocreate(conf.StreamAutoCreate),
kinesis.WithStreamMode(types.StreamMode(conf.StreamMode)),
kinesis.WithShards(conf.DefaultShards),
kinesis.WithPublishTimeout(conf.PublishTimeout),
kinesis.WithStreamProbleInterval(conf.StreamProbeInterval),
kinesis.WithPublishTimeout(publishTimeout),
kinesis.WithStreamProbleInterval(probeInterval),
)
default:
return nil, fmt.Errorf("unknown publisher: %v", config.Publisher)
return nil, fmt.Errorf("unknown publisher: %v", config.Publisher.Type)
}
}
43 changes: 43 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package cmd

import (
"github.com/MakeNowJust/heredoc"
"github.com/raystack/salt/cmdx"
"github.com/spf13/cobra"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turtleDev Let's use salt/cmdx for cli.

)

func New() *cobra.Command {
root := &cobra.Command{
Use: "raccoon",
Short: "Scalable event ingestion tool",
SilenceUsage: true,
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
},
Args: cobra.NoArgs,
Long: heredoc.Doc(`
Raccoon is a high-throughput, low-latency service to collect
events in real-time from your web, mobile apps, and services
using multiple network protocols.`),
Run: func(cmd *cobra.Command, args []string) {
cmd.Help()
},
Annotations: map[string]string{
"group": "core",
"help:learn": heredoc.Doc(`
Use 'raccoon <command> --help' for more information about a command.
Read the manual at https://raystack.github.io/raccoon/
`),
"help:feedback": heredoc.Doc(`
Open an issue here https://github.com/raystack/raccoon/issues
`),
},
}

cmdx.SetHelp(root)
root.AddCommand(cmdx.SetCompletionCmd("raccoon"))
root.AddCommand(cmdx.SetRefCmd(root))

root.AddCommand(serverCommand())
return root
}
90 changes: 90 additions & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package cmd

import (
"fmt"
"reflect"
"strconv"

"github.com/raystack/raccoon/app"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
"github.com/raystack/raccoon/middleware"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

func serverCommand() *cobra.Command {
var configFile = "config.yaml"
command := &cobra.Command{
Use: "server",
Short: "Start raccoon server",
RunE: func(cmd *cobra.Command, args []string) error {
err := config.Load(configFile)
if err != nil {
return err
}
middleware.Load()
metrics.Setup()
defer metrics.Close()
logger.SetLevel(config.Log.Level)
return app.Run()
},
}
command.Flags().SortFlags = false
command.Flags().StringVarP(&configFile, "config", "c", configFile, "path to config file")
for _, cfg := range config.Walk() {
bindFlag(command.Flags(), cfg.Ref, cfg.Meta)
}
return command
}

func bindFlag(flag *pflag.FlagSet, ref any, meta reflect.StructField) {

flagName := meta.Tag.Get("cmdx")
desc := meta.Tag.Get("desc")

switch v := ref.(type) {
case *config.AckType:
flag.Var(ackTypeFlag{v}, flagName, desc)
case *string:
flag.StringVar(v, flagName, *v, desc)
case *int:
flag.IntVar(v, flagName, *v, desc)
case *int64:
flag.Int64Var(v, flagName, *v, desc)
case *uint32:
flag.Uint32Var(v, flagName, *v, desc)
case *bool:
flag.BoolVar(v, flagName, *v, desc)
case *[]string:
flag.StringSliceVar(v, flagName, *v, desc)
default:
msg := fmt.Sprintf("unsupport flag of type %T", ref)
panic(msg)
}
}

type ackTypeFlag struct {
value *config.AckType
}

func (af ackTypeFlag) String() string {
if af.value == nil {
return "0"
}
return fmt.Sprintf("%d", *af.value)
}

func (af ackTypeFlag) Set(raw string) error {
v, err := strconv.ParseInt(raw, 10, 0)
if err != nil {
return fmt.Errorf("error parsing bool: %w", err)
}
*af.value = config.AckType(v)
return nil
}

func (af ackTypeFlag) Type() string {
return "int"
}
Loading
Loading