Skip to content

Commit

Permalink
feat(ingest): better batching
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmalkmus committed Aug 20, 2024
1 parent 321ac3d commit 5a1d546
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 21 deletions.
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ issues:
- linters:
- staticcheck
text: "SA1019: client.Datasets.QueryLegacy"
- linters:
- staticcheck
text: "SA1019: client.QueryLegacy"
- linters:
- staticcheck
text: 'SA1019: "github.com/axiomhq/axiom-go/axiom/querylegacy"'
65 changes: 45 additions & 20 deletions internal/cmd/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ type options struct {
// Delimiter that separates CSV fields.
Delimiter string
// FlushEvery flushes the ingestion buffer after the specified duration. It
// is only valid when ingesting a stream of newline delimited JSON objects
// of unknown length.
// is only valid when ingesting batchable data, e.g. newline delimited JSON
// and CSV (with field names explicitly set) data that is not encoded.
FlushEvery time.Duration
// BatchSize to aim for when ingesting batchable data.
BatchSize uint
// ContentType of the data to ingest.
ContentType axiom.ContentType
contentType string // for the flag value
Expand All @@ -81,7 +83,7 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
}

cmd := &cobra.Command{
Use: "ingest <dataset-name> [(-f|--file) <filename> [ ...]] [--timestamp-field <timestamp-field>] [--timestamp-format <timestamp-format>] [--flush-every <duration>] [(-t|--content-type <content-type>] [(-e|--content-encoding <content-encoding>] [(-l|--label) <key>:<value> [ ...]]",
Use: "ingest <dataset-name> [(-f|--file) <filename> [ ...]] [--timestamp-field <timestamp-field>] [--timestamp-format <timestamp-format>] [(-d|--delimiter <delimiter>] [--flush-every <duration>] [(-b|--batch-size <batch-size>] [(-t|--content-type <content-type>] [(-e|--content-encoding <content-encoding>] [(-l|--label) <key>:<value> [ ...]] [--csv-fields <field> [ ...]] [--continue-on-error <TRUE|FALSE>]",
Short: "Ingest structured data",
Long: heredoc.Doc(`
Ingest structured data into an Axiom dataset.
Expand Down Expand Up @@ -193,15 +195,22 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
if err := complete(cmd.Context(), opts); err != nil {
return err
}
return run(cmd.Context(), opts, cmd.Flag("flush-every").Changed, cmd.Flag("csv-fields").Changed)
return run(
cmd.Context(),
opts,
cmd.Flag("flush-every").Changed,
cmd.Flag("batch-size").Changed,
cmd.Flag("csv-fields").Changed,
)
},
}

cmd.Flags().StringSliceVarP(&opts.Filenames, "file", "f", nil, "File(s) to ingest (- to read from stdin). If stdin is a pipe the default value is -, otherwise this is a required parameter")
cmd.Flags().StringVar(&opts.TimestampField, "timestamp-field", "", "Field to take the ingestion time from (defaults to _time)")
cmd.Flags().StringVar(&opts.TimestampFormat, "timestamp-format", "", "Format used in the the timestamp field. Default uses a heuristic parser. Must be expressed using the reference time 'Mon Jan 2 15:04:05 -0700 MST 2006'")
cmd.Flags().StringVarP(&opts.Delimiter, "delimiter", "d", "", "Delimiter that separates CSV fields (only valid when input is CSV")
cmd.Flags().DurationVar(&opts.FlushEvery, "flush-every", time.Second, "Buffer flush interval for newline delimited JSON streams of unknown length")
cmd.Flags().DurationVar(&opts.FlushEvery, "flush-every", time.Second*5, "Buffer flush interval for batchable data")
cmd.Flags().UintVarP(&opts.BatchSize, "batch-size", "b", 10_000, "Batch size to aim for")
cmd.Flags().StringVarP(&opts.contentType, "content-type", "t", "", "Content type of the data to ingest (will auto-detect if not set, must be set if content encoding is set and content type is not identity)")
cmd.Flags().StringVarP(&opts.contentEncoding, "content-encoding", "e", axiom.Identity.String(), "Content encoding of the data to ingest")
cmd.Flags().StringSliceVarP(&opts.labels, "label", "l", nil, "Labels to attach to the ingested events, server side")
Expand All @@ -212,9 +221,11 @@ func NewCmd(f *cmdutil.Factory) *cobra.Command {
_ = cmd.RegisterFlagCompletionFunc("timestamp-format", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("delimiter", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("flush-every", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("batch-size", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("content-type", contentTypeCompletion)
_ = cmd.RegisterFlagCompletionFunc("content-encoding", contentEncodingCompletion)
_ = cmd.RegisterFlagCompletionFunc("label", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("csv-fields", cmdutil.NoCompletion)
_ = cmd.RegisterFlagCompletionFunc("continue-on-error", cmdutil.NoCompletion)

if opts.IO.IsStdinTTY() {
Expand Down Expand Up @@ -265,7 +276,7 @@ func complete(ctx context.Context, opts *options) error {
}, &opts.Dataset, opts.IO.SurveyIO())
}

func run(ctx context.Context, opts *options, flushEverySet, csvFieldsSet bool) error {
func run(ctx context.Context, opts *options, flushEverySet, batchSizeSet, csvFieldsSet bool) error {
client, err := opts.Client(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -305,20 +316,23 @@ func run(ctx context.Context, opts *options, flushEverySet, csvFieldsSet bool) e
typ = opts.ContentType
}

if flushEverySet && typ != axiom.NDJSON {
return cmdutil.NewFlagErrorf("--flush-every not valid when content type is not newline delimited JSON")
}
if opts.Delimiter != "" && typ != axiom.CSV {
return cmdutil.NewFlagErrorf("--delimier/-d not valid when content type is not CSV")
}

var (
batchable = typ == axiom.NDJSON || (typ == axiom.CSV && csvFieldsSet)
batchable = (typ == axiom.NDJSON || (typ == axiom.CSV && csvFieldsSet)) &&
opts.ContentEncoding == axiom.Identity
ingestRes *ingest.Status
)
if filename == "stdin" && batchable && opts.ContentEncoding == axiom.Identity {
if batchable {
ingestRes, err = ingestEvery(ctx, client, r, typ, opts)
} else {
if flushEverySet {
return cmdutil.NewFlagErrorf("--flush-every not valid when data is not batchable")
} else if batchSizeSet {
return cmdutil.NewFlagErrorf("--batch-size not valid when data is not batchable")
}
ingestRes, err = ingestReader(ctx, client, r, typ, opts)
}

Expand Down Expand Up @@ -375,17 +389,16 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, typ axi
defer t.Stop()

readers := make(chan io.Reader)

go func() {
defer close(readers)

// Add first reader.
pr, pw := io.Pipe()
readers <- pr

// Start with a 64 byte buffer, check up until 1 MB per line.
// Start with a 1 KB buffer, check up until 1 MB per line.
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 64), 1024*1024)
scanner.Buffer(make([]byte, 1024), 1024*1024)
scanner.Split(splitLinesMulti)

// We need to scan in a go func to make sure we don't block on
Expand Down Expand Up @@ -414,23 +427,35 @@ func ingestEvery(ctx context.Context, client *axiom.Client, r io.Reader, typ axi
}
}()

var lineCount uint
flushBatch := func() {
if err := pw.Close(); err != nil {
return
}

pr, pw = io.Pipe()
readers <- pr

lineCount = 0
t.Reset(opts.FlushEvery)
}
for {
select {
case <-ctx.Done():
_ = pw.CloseWithError(ctx.Err())
return
case <-t.C:
if err := pw.Close(); err != nil {
return
flushBatch()
case line := <-lines:
if lineCount >= opts.BatchSize {
flushBatch()
}

pr, pw = io.Pipe()
readers <- pr
case line := <-lines:
if _, err := pw.Write(line); err != nil {
_ = pw.CloseWithError(err)
return
}
lineCount++
case <-done:
_ = pw.Close()
return
Expand Down Expand Up @@ -483,7 +508,7 @@ func ingestReader(ctx context.Context, client *axiom.Client, r io.Reader, typ ax
ingestOptions = append(ingestOptions, opts.Labels...)
ingestOptions = append(ingestOptions, opts.CSVFields...)

res, err := client.Datasets.Ingest(ctx, opts.Dataset, r, typ, enc, ingestOptions...)
res, err := client.Ingest(ctx, opts.Dataset, r, typ, enc, ingestOptions...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func run(ctx context.Context, opts *options) error {
progStop := opts.IO.StartActivityIndicator()
defer progStop()

res, err := client.Datasets.Query(ctx, opts.Query,
res, err := client.Query(ctx, opts.Query,
query.SetStartTime(opts.startTime),
query.SetEndTime(opts.endTime),
)
Expand Down

0 comments on commit 5a1d546

Please sign in to comment.