Skip to content

Commit

Permalink
feat(stdlib): add experimental.preview function (#4725)
Browse files Browse the repository at this point in the history
The preview function will limit the number of rows and the number of
tables that can be returned by a transformation. It will discard any
remaining tables/rows.
  • Loading branch information
jsternberg authored May 10, 2022
1 parent 53d8de8 commit 9d27bb9
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 1 deletion.
3 changes: 2 additions & 1 deletion libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ var sourceHashes = map[string]string{
"stdlib/experimental/csv/csv.flux": "94ae72b5fb50b8d65e9b384aebaa890f04bc914bb344ea751a64f729b5c849fe",
"stdlib/experimental/distinct_test.flux": "ebc7f2bf1bc49266a8809f8a3e5cffe5467ac2774c14d40e50c85d14f8c6cd87",
"stdlib/experimental/durations_test.flux": "6394bbfc88ab51a42cfee4a0f1e6ed327b860662922dd99aa6f99ebda5b167ee",
"stdlib/experimental/experimental.flux": "20b6d5ca5adb23de2318b2d1ebe75ac8742dcdd6928a0015e3234310d491b8c3",
"stdlib/experimental/experimental.flux": "b542cd7dbba4f35a1da937f377a26c3fd5704f8304dda676758691084b7dc498",
"stdlib/experimental/experimental_test.flux": "206cbb54d35da80a6736dec990219f5bbabcdc3c65fbb9d1041058c8aae3450d",
"stdlib/experimental/fill_test.flux": "adc4f95f746ccdaf4a0843633753e723a4524ea0afbfb2cd6b70726d855ebfb8",
"stdlib/experimental/first_test.flux": "3dffabf42ff18aa9a6515010e3496f13a44e0429c38636f6962361dbc8693c3f",
Expand Down Expand Up @@ -200,6 +200,7 @@ var sourceHashes = map[string]string{
"stdlib/experimental/oee/apq_test.flux": "6a64721efd123bbfd40ca90cbebc7cfd2dccbd651da5e1ee4d33e0833256be60",
"stdlib/experimental/oee/computeapq_test.flux": "a5069718fb11cee0ffc8b345ead279ccb7f4bcc4644ffdc8d641c310be1969ec",
"stdlib/experimental/oee/oee.flux": "30d6ce4144d1425cb9b294bd17fd73395d2a7c87b86c089aa89e73969a5b98f3",
"stdlib/experimental/preview_test.flux": "cca570d25b17ed201a0ecc7ebf9e547ccff2aa0814a3ac49f12faa938cbdaf73",
"stdlib/experimental/prometheus/prometheus.flux": "e0b3df509c8f522edee1081e5e3907ce9628f783d33b47c2e2d8ffb766f3f948",
"stdlib/experimental/prometheus/prometheus_histogramQuantile_test.flux": "001fc6f403990519d3c8797ce960c8f64d9292db95fb8461cb3f91fcbe627ebb",
"stdlib/experimental/quantile_test.flux": "794f9902574b9fa5c1eae6ff2d48a9425ca3ba228692d8e6052c673d6418428b",
Expand Down
30 changes: 30 additions & 0 deletions stdlib/experimental/experimental.flux
Original file line number Diff line number Diff line change
Expand Up @@ -1252,3 +1252,33 @@ builtin histogram : (
bins: [float],
?normalize: bool,
) => stream[{T with _value: float, le: float}]

// preview limits the number of rows and tables in the stream.
//
// Included group keys are not deterministic and depends on the order
// that the engine sends them.
//
// ## Parameters
// - nrows: Maximum number of rows per table to return. Default is `5`.
//
// - ntables: Maximum number of tables to return.
// Default is `5`.
//
// - tables: Input data. Default is piped-forward data (`<-`).
//
// ## Examples
//
// ### Preview data output
// ```
// import "experimental"
// import "sampledata"
//
// sampledata.int()
// |> experimental.preview()
// ```
//
// ## Metadata
// introduced: NEXT
// tags: transformations
//
builtin preview : (<-tables: stream[A], ?nrows: int, ?ntables: int) => stream[A] where A: Record
143 changes: 143 additions & 0 deletions stdlib/experimental/preview.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package experimental

import (
"fmt"

"github.com/apache/arrow/go/v7/arrow/memory"
"github.com/influxdata/flux"
"github.com/influxdata/flux/array"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/table"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/runtime"
)

const PreviewKind = "experimental.preview"

type PreviewOpSpec struct {
NRows int64
NTables int64
}

func init() {
previewSignature := runtime.MustLookupBuiltinType("experimental", "preview")

runtime.RegisterPackageValue("experimental", "preview", flux.MustValue(flux.FunctionValue(PreviewKind, createPreviewOpSpec, previewSignature)))
plan.RegisterProcedureSpec(PreviewKind, newPreviewProcedure, PreviewKind)
execute.RegisterTransformation(PreviewKind, createPreviewTransformation)
}

func createPreviewOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
if err := a.AddParentFromArgs(args); err != nil {
return nil, err
}

spec := new(PreviewOpSpec)
if nrows, ok, err := args.GetInt("nrows"); err != nil {
return nil, err
} else if ok {
spec.NRows = nrows
} else {
spec.NRows = 5
}

if ntables, ok, err := args.GetInt("ntables"); err != nil {
return nil, err
} else if ok {
spec.NTables = ntables
} else {
spec.NTables = 5
}
return spec, nil
}

func (s *PreviewOpSpec) Kind() flux.OperationKind {
return PreviewKind
}

type PreviewProcedureSpec struct {
plan.DefaultCost
NRows int64
NTables int64
}

func newPreviewProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
s, ok := qs.(*PreviewOpSpec)
if !ok {
return nil, fmt.Errorf("invalid spec type %T", qs)
}
p := &PreviewProcedureSpec{
NRows: s.NRows,
NTables: s.NTables,
}
return p, nil
}

func (s *PreviewProcedureSpec) Kind() plan.ProcedureKind {
return PreviewKind
}
func (s *PreviewProcedureSpec) Copy() plan.ProcedureSpec {
ns := *s
return &ns
}

func createPreviewTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error) {
s, ok := spec.(*PreviewProcedureSpec)
if !ok {
return nil, nil, fmt.Errorf("invalid spec type %T", spec)
}
return NewPreviewTransformation(id, s, a.Allocator())
}

type previewTransformation struct {
nrows int64
ntables int64
}

func NewPreviewTransformation(id execute.DatasetID, spec *PreviewProcedureSpec, mem memory.Allocator) (execute.Transformation, execute.Dataset, error) {
tr := &previewTransformation{
nrows: spec.NRows,
ntables: spec.NTables,
}
return execute.NewNarrowStateTransformation(id, tr, mem)
}

func (t *previewTransformation) Process(chunk table.Chunk, state interface{}, d *execute.TransportDataset, mem memory.Allocator) (interface{}, bool, error) {
n, ok := state.(int64)
if !ok {
if t.ntables == 0 {
return nil, false, nil
}
t.ntables--
n = t.nrows
}

if int64(chunk.Len()) <= n {
chunk.Retain()
if err := d.Process(chunk); err != nil {
return nil, false, err
}
n -= int64(chunk.Len())
return n, true, nil
}

buffer := arrow.TableBuffer{
GroupKey: chunk.Key(),
Columns: chunk.Cols(),
Values: make([]array.Array, chunk.NCols()),
}
for i := range chunk.Cols() {
buffer.Values[i] = arrow.Slice(chunk.Values(i), 0, n)
}

out := table.ChunkFromBuffer(buffer)
if err := d.Process(out); err != nil {
return nil, false, err
}
return 0, true, nil
}

func (t *previewTransformation) Close() error {
return nil
}
85 changes: 85 additions & 0 deletions stdlib/experimental/preview_test.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package experimental_test


import "array"
import "experimental"
import "testing"

inData =
array.from(
rows: [
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 1.0},
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 2.0},
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 3.0},
{_time: 2022-05-09T00:00:00Z, t0: "b", _value: 4.0},
{_time: 2022-05-09T00:00:00Z, t0: "b", _value: 5.0},
{_time: 2022-05-09T00:00:00Z, t0: "b", _value: 6.0},
{_time: 2022-05-09T00:00:00Z, t0: "c", _value: 7.0},
{_time: 2022-05-09T00:00:00Z, t0: "c", _value: 8.0},
{_time: 2022-05-09T00:00:00Z, t0: "c", _value: 9.0},
],
)
|> group(columns: ["t0"])

testcase basic {
want =
array.from(
rows: [
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 1.0},
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 2.0},
{_time: 2022-05-09T00:00:00Z, t0: "b", _value: 4.0},
{_time: 2022-05-09T00:00:00Z, t0: "b", _value: 5.0},
],
)
|> group(columns: ["t0"])

got =
inData
|> experimental.preview(nrows: 2, ntables: 2)

testing.diff(got, want) |> yield()
}

testcase multi_buffer {
want =
array.from(
rows: [
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 1.0},
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 2.0},
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 3.0},
{_time: 2022-05-09T00:00:00Z, t0: "b", _value: 4.0},
{_time: 2022-05-09T00:00:00Z, t0: "b", _value: 5.0},
],
)

got =
inData
|> group()
|> experimental.preview()

testing.diff(got, want) |> yield()
}

testcase small {
want =
array.from(
rows: [
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 1.0},
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 2.0},
{_time: 2022-05-09T00:00:00Z, t0: "a", _value: 3.0},
{_time: 2022-05-09T00:00:00Z, t0: "b", _value: 4.0},
{_time: 2022-05-09T00:00:00Z, t0: "b", _value: 5.0},
{_time: 2022-05-09T00:00:00Z, t0: "b", _value: 6.0},
{_time: 2022-05-09T00:00:00Z, t0: "c", _value: 7.0},
{_time: 2022-05-09T00:00:00Z, t0: "c", _value: 8.0},
{_time: 2022-05-09T00:00:00Z, t0: "c", _value: 9.0},
],
)
|> group(columns: ["t0"])

got =
inData
|> experimental.preview()

testing.diff(got, want) |> yield()
}

0 comments on commit 9d27bb9

Please sign in to comment.