Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logging #7

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions duck/data/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ const metadataKeyRefID = "refId" // added to the table metadata
// All fields of a Frame must be of the same length or an error is returned.
func MarshalArrow(f *data.Frame) (*arrow.Schema, error) {
if _, err := f.RowLen(); err != nil {
logger.Error("failed to get row length", "error", err)
return nil, err
}

arrowFields, err := buildArrowFields(f)
if err != nil {
logger.Error("failed to build arrow fields", "error", err)
return nil, err
}

schema, err := buildArrowSchema(f, arrowFields)
if err != nil {
logger.Error("failed to build arrow schema", "error", err)
return nil, err
}

Expand All @@ -48,6 +51,7 @@ func buildArrowFields(f *data.Frame) ([]arrow.Field, error) {

if field.Labels != nil {
if fieldMeta[metadataKeyLabels], err = toJSONString(field.Labels); err != nil {
logger.Error("failed to serialize labels", "error", err)
return nil, err
}
}
Expand All @@ -56,6 +60,7 @@ func buildArrowFields(f *data.Frame) ([]arrow.Field, error) {
if field.Config != nil {
str, err := toJSONString(field.Config)
if err != nil {
logger.Error("failed to serialize field config", "error", err)
return nil, err
}
fieldMeta[metadataKeyConfig] = str
Expand All @@ -81,6 +86,7 @@ func buildArrowSchema(f *data.Frame, fs []arrow.Field) (*arrow.Schema, error) {
if f.Meta != nil {
str, err := toJSONString(f.Meta)
if err != nil {
logger.Error("failed to serialize frame meta", "error", err)
return nil, err
}
tableMetaMap["meta"] = str
Expand Down Expand Up @@ -168,6 +174,7 @@ func fieldToArrow(f *data.Field) (arrow.DataType, bool, error) {
return &arrow.BinaryType{}, true, nil

default:
logger.Error("unsupported type for conversion to arrow", "type", f.Type())
return nil, false, fmt.Errorf("unsupported type for conversion to arrow: %T", f.Type())
}
}
Expand All @@ -177,6 +184,7 @@ func fieldToArrow(f *data.Field) (arrow.DataType, bool, error) {
func toJSONString(val interface{}) (string, error) {
b, err := json.Marshal(val)
if err != nil {
logger.Error("failed to marshal value to json", "error", err)
return "", err
}
return string(b), nil
Expand Down
1 change: 1 addition & 0 deletions duck/data/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const layout = "2006-01-02 15:04:05-07"
func parseDate(s string) (time.Time, error) {
t, err := time.Parse(layout, s)
if err != nil {
logger.Error("failed to parse time", "error", err)
return t, err
}
return t.UTC(), nil
Expand Down
18 changes: 17 additions & 1 deletion duck/data/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ import (
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/parquet"
"github.com/apache/arrow/go/v15/parquet/pqarrow"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
)

var logger = log.DefaultLogger

func ToParquet(frames []*data.Frame, chunk int) (map[string]string, error) {
dirs := map[string]string{}
frameIndex := framesByRef(frames)
Expand All @@ -38,6 +41,7 @@ func ToParquet(frames []*data.Frame, chunk int) (map[string]string, error) {

dir, err := os.MkdirTemp("", "duck")
if err != nil {
logger.Error("failed to create temp dir", "error", err)
return nil, err
}

Expand All @@ -47,6 +51,7 @@ func ToParquet(frames []*data.Frame, chunk int) (map[string]string, error) {

schema, err := MarshalArrow(frame)
if err != nil {
logger.Error("failed to marshal arrow schema", "error", err)
return nil, err
}

Expand All @@ -64,10 +69,14 @@ func ToParquet(frames []*data.Frame, chunk int) (map[string]string, error) {
defer wg.Done()
raw, err := json.Marshal(chunk)
if err != nil {
logger.Error("failed to marshal chunk", "error", err)
return err
}
name := fmt.Sprintf("%s%d", frame.RefID, idx)
_, _, err = write(dir, name, schema, raw)
if err != nil {
logger.Error("failed to write parquet file", "error", err)
}
return err
}(chunk, i)
}
Expand All @@ -78,6 +87,7 @@ func ToParquet(frames []*data.Frame, chunk int) (map[string]string, error) {
}()

for err := range errCh {
logger.Error("failed to write chunk", "error", err)
return nil, err
}

Expand All @@ -86,12 +96,14 @@ func ToParquet(frames []*data.Frame, chunk int) (map[string]string, error) {

raw, err := json.Marshal(data)
if err != nil {
logger.Error("parquet failed to marshal frame data to raw data", "error", err)
return nil, err
}

name := fmt.Sprintf("%s%d", frame.RefID, i)
_, _, err = write(dir, name, schema, raw)
if err != nil {
logger.Error("parquet failed to write parquet file", "error", err)
return nil, err
}
}
Expand All @@ -116,6 +128,7 @@ func write(dir string, name string, schema *arrow.Schema, jsonData []byte) (stri
filename := path.Join(dir, name+".parquet")
output, err := os.Create(filename)
if err != nil {
logger.Error("failed to create parquet file", "file", filename, "error", err)
return "", "", err
}

Expand All @@ -124,22 +137,25 @@ func write(dir string, name string, schema *arrow.Schema, jsonData []byte) (stri
writerProps := parquet.NewWriterProperties()
writer, err := pqarrow.NewFileWriter(schema, output, writerProps, pqarrow.DefaultWriterProps())
if err != nil {
logger.Error("failed to create parquet writer", "error", err)
return "", "", err
}
r := bytes.NewReader(jsonData)
record, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, r)
if err != nil {
logger.Error("failed to create record from json", "error", err)
return "", "", err
}

err = writer.Write(record)
if err != nil {
logger.Error("failed to write record", "error", err)
return "", "", err
}

err = writer.Close()
if err != nil {
fmt.Println("failed to close writer")
logger.Error("failed to close writer", "error", err)
return dir, output.Name(), nil
}
return dir, output.Name(), nil
Expand Down
1 change: 1 addition & 0 deletions duck/data/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestWrite(t *testing.T) {
}

func TestRead(t *testing.T) {
t.Skip() // need parquet file to test
fmt.Println("test")
var b bytes.Buffer
b.Write([]byte(".mode json \n"))
Expand Down
13 changes: 12 additions & 1 deletion duck/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
"os/exec"
"strings"

"github.com/grafana/grafana-plugin-sdk-go/backend/log"
sdk "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/data/framestruct"
"github.com/scottlepp/go-duck/duck/data"
)

var logger = log.DefaultLogger

type DuckDB struct {
Name string
Mode string
Expand Down Expand Up @@ -77,9 +80,11 @@ func (d *DuckDB) RunCommands(commands []string) (string, error) {
err := cmd.Run()
if err != nil {
message := err.Error() + stderr.String()
logger.Error("error running command", "cmd", b.String(), "message", message, "error", err)
return "", errors.New(message)
}
if stderr.String() != "" {
logger.Error("error running command", "cmd", b.String(), "error", stderr.String())
return "", errors.New(stderr.String())
}

Expand All @@ -103,32 +108,36 @@ func (d *DuckDB) Query(query string) (string, error) {
func (d *DuckDB) QueryFrames(name string, query string, frames []*sdk.Frame) (string, error) {
dirs, err := data.ToParquet(frames, d.Chunk)
if err != nil {
logger.Error("error converting to parquet", "error", err)
return "", err
}

defer func() {
for _, dir := range dirs {
err := os.RemoveAll(dir)
if err != nil {
fmt.Println("failed to remove parquet files")
logger.Error("failed to remove parquet files", "error", err)
}
}
}()

commands := []string{}
created := map[string]bool{}
logger.Debug("starting to create views from frames", "frames", len(frames))
for _, frame := range frames {
if created[frame.RefID] {
continue
}
cmd := fmt.Sprintf("CREATE VIEW %s AS (SELECT * from '%s/*.parquet');", frame.RefID, dirs[frame.RefID])
logger.Debug("creating view", "cmd", cmd)
commands = append(commands, cmd)
created[frame.RefID] = true
}

commands = append(commands, query)
res, err := d.RunCommands(commands)
if err != nil {
logger.Error("error running commands", "error", err)
return "", err
}
return res, nil
Expand Down Expand Up @@ -172,11 +181,13 @@ func resultsToFrame(name string, res string, f *sdk.Frame, frames []*sdk.Frame)
var results []map[string]any
err := json.Unmarshal([]byte(res), &results)
if err != nil {
logger.Error("error unmarshalling results", "error", err)
return err
}
converters := data.Converters(frames)
resultsFrame, err := framestruct.ToDataFrame(name, results, converters...)
if err != nil {
logger.Error("error converting results to frame", "error", err)
return err
}

Expand Down
29 changes: 17 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ go 1.21
require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v15 v15.0.0
github.com/apache/arrow/go/v15 v15.0.2
github.com/cheekybits/genny v1.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand All @@ -26,14 +25,14 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/grpc v1.60.1 // indirect
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand All @@ -43,11 +42,17 @@ require (
github.com/apache/thrift v0.17.0 // indirect
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/golang/snappy v0.0.4 // indirect
github.com/grafana/grafana-plugin-sdk-go v0.212.0
github.com/grafana/grafana-plugin-sdk-go v0.231.0
github.com/klauspost/compress v1.16.7 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
)

require github.com/rivo/uniseg v0.1.0 // indirect
require (
github.com/fatih/color v1.15.0 // indirect
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/rivo/uniseg v0.1.0 // indirect
)
Loading