Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support interface value converted to parquet #4

Merged
merged 5 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,12 @@ func (table boolFuncTable) MinMaxSize(minVal interface{}, maxVal interface{}, va
type int32FuncTable struct{}

func (_ int32FuncTable) LessThan(a interface{}, b interface{}) bool {
return a.(int32) < b.(int32)
switch a.(type) {
case int32, int16, int8:
return a.(int32) < b.(int32)
default:
return int32(a.(uint32)) < int32(b.(uint32))
}
}

func (table int32FuncTable) MinMaxSize(minVal interface{}, maxVal interface{}, val interface{}) (interface{}, interface{}, int32) {
Expand All @@ -820,7 +825,12 @@ func (table int32FuncTable) MinMaxSize(minVal interface{}, maxVal interface{}, v
type uint32FuncTable struct{}

func (_ uint32FuncTable) LessThan(a interface{}, b interface{}) bool {
return uint32(a.(int32)) < uint32(b.(int32))
switch a.(type) {
case int32, int16, int8:
return uint32(a.(int32)) < uint32(b.(int32))
default:
return a.(uint32) < b.(uint32)
}
}

func (table uint32FuncTable) MinMaxSize(minVal interface{}, maxVal interface{}, val interface{}) (interface{}, interface{}, int32) {
Expand All @@ -830,7 +840,12 @@ func (table uint32FuncTable) MinMaxSize(minVal interface{}, maxVal interface{},
type int64FuncTable struct{}

func (_ int64FuncTable) LessThan(a interface{}, b interface{}) bool {
return a.(int64) < b.(int64)
switch a.(type) {
case int64:
return a.(int64) < b.(int64)
default:
return int64(a.(uint64)) < int64(b.(uint64))
}
}

func (table int64FuncTable) MinMaxSize(minVal interface{}, maxVal interface{}, val interface{}) (interface{}, interface{}, int32) {
Expand All @@ -840,7 +855,12 @@ func (table int64FuncTable) MinMaxSize(minVal interface{}, maxVal interface{}, v
type uint64FuncTable struct{}

func (_ uint64FuncTable) LessThan(a interface{}, b interface{}) bool {
return uint64(a.(int64)) < uint64(b.(int64))
switch a.(type) {
case uint64:
return a.(uint64) < b.(uint64)
default:
return uint64(a.(int64)) < uint64(b.(int64))
}
}

func (table uint64FuncTable) MinMaxSize(minVal interface{}, maxVal interface{}, val interface{}) (interface{}, interface{}, int32) {
Expand Down
16 changes: 8 additions & 8 deletions common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,14 @@ func TestCmp(t *testing.T) {
{"int_8 2", int32(1), int32(2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_INT_16), true},
{"int_8 3", int32(1), int32(2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_INT_32), true},
{"int_8 4", int64(1), int64(2), parquet.TypePtr(parquet.Type_INT64), parquet.ConvertedTypePtr(parquet.ConvertedType_INT_64), true},

{"uint_8 1", int32(1), int32(2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_8), true},
{"uint_8 2", int32(1), int32(-2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_8), true},
{"uint_8 3", int32(-1), int32(-2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_8), false},
{"uint_8 4", int32(-2), int32(-1), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_8), true},
{"uint_16 1", int32(1), int32(2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_16), true},
{"uint_16 2", int32(1), int32(2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_32), true},
{"uint_16 3", int64(1), int64(2), parquet.TypePtr(parquet.Type_INT64), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_64), true},
{"int_8 5", int32(1), int32(-2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_INT_8), false},
{"int_8 6", int32(-1), int32(-2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_INT_8), false},
{"int_8 7", int32(-2), int32(-1), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_INT_8), true},

{"uint_8 1", uint32(1), uint32(2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_8), true},
{"uint_16 1", uint32(1), uint32(2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_16), true},
{"uint_16 2", uint32(1), uint32(2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_32), true},
{"uint_16 3", uint64(1), uint64(2), parquet.TypePtr(parquet.Type_INT64), parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_64), true},

{"date 1", int32(1), int32(2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_DATE), true},
{"time_millis 1", int32(1), int32(2), parquet.TypePtr(parquet.Type_INT32), parquet.ConvertedTypePtr(parquet.ConvertedType_TIME_MILLIS), true},
Expand Down
23 changes: 21 additions & 2 deletions encoding/binarywrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@ import (

//LittleEndian

func GetUint32(n interface{}) uint32 {
switch n.(type) {
case int, int8, int16, int32:
return uint32(n.(int32))
default:
return n.(uint32)
}
}

func BinaryWriteINT32(w io.Writer, nums []interface{}) {
buf := make([]byte, len(nums)*4)
for i, n := range nums {
v := uint32(n.(int32))
v := GetUint32(n)
buf[i*4+0] = byte(v)
buf[i*4+1] = byte(v >> 8)
buf[i*4+2] = byte(v >> 16)
Expand All @@ -19,10 +28,20 @@ func BinaryWriteINT32(w io.Writer, nums []interface{}) {
w.Write(buf)
}

func GetUint64(n interface{}) uint64 {
switch n.(type) {
case int, int8, int16, int32, int64:
return uint64(n.(int64))
default:
return n.(uint64)
}

}

func BinaryWriteINT64(w io.Writer, nums []interface{}) {
buf := make([]byte, len(nums)*8)
for i, n := range nums {
v := uint64(n.(int64))
v := GetUint64(n)
buf[i*8+0] = byte(v)
buf[i*8+1] = byte(v >> 8)
buf[i*8+2] = byte(v >> 16)
Expand Down
54 changes: 50 additions & 4 deletions example/proto_write.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"fmt"
"log"

"github.com/AppliedIntuition/parquet-go/writer"
Expand Down Expand Up @@ -46,29 +47,74 @@ type ProtoMessage struct {
Status JobStatus
IntVal int32
}
type TestInterface interface {
foo()
}

type TestInterfaceImpl1 struct {
Bar string
}

type TestInterfaceImpl2 struct {
Test string
NestedInterface TestInterface
}

func (t *TestInterfaceImpl1) foo() {
fmt.Println(t.Bar)
}

func (t *TestInterfaceImpl2) foo() {
fmt.Print(t.Test)
}

type TestInterfaceStruct struct {
Val TestInterface
NestedVal TestInterface
Arr [][]TestInterface
Message ProtoMessage
UintVal uint
UintVal32 uint32
UintVal64 uint64
}

func main() {
protoMessages := []interface{}{
protoMessages := []ProtoMessage{
ProtoMessage{Timestamp: timestamppb.Timestamp{Seconds: 1, Nanos: 1000000}, Status: JobStatus_RUNNING, IntVal: 1},
ProtoMessage{Timestamp: timestamppb.Timestamp{Seconds: 2, Nanos: 1000000}, Status: JobStatus_ENQUEUED, IntVal: 2},
ProtoMessage{Timestamp: timestamppb.Timestamp{Seconds: 3, Nanos: 1000000}, Status: JobStatus_COMPLETED, IntVal: 3},
ProtoMessage{Timestamp: timestamppb.Timestamp{Seconds: 4, Nanos: 1000000}, Status: JobStatus_ERRORED, IntVal: 4},
ProtoMessage{Timestamp: timestamppb.Timestamp{Seconds: 5, Nanos: 1000000}, Status: JobStatus_CANCELLED, IntVal: 5},
ProtoMessage{Timestamp: timestamppb.Timestamp{Seconds: 6, Nanos: 1000000}, Status: JobStatus_UPSTREAM_NOT_PROCESSED, IntVal: 6},
}
impl2 := TestInterfaceImpl2{
Test: "test",
NestedInterface: &TestInterfaceImpl1{Bar: "bar1"},
}
impl1 := TestInterfaceImpl1{Bar: "bar2"}

vals := make([]TestInterfaceStruct, 6)
for index, message := range protoMessages {
vals[index] = TestInterfaceStruct{
Val: &impl1,
NestedVal: &impl2,
Arr: [][]TestInterface{{&impl2, &impl2}, {&impl2}},
Message: message,
}
}

fw, err := local.NewLocalFileWriter("output/proto_message.parquet")
if err != nil {
log.Println("Can't create file", err)
return
}
pw, err := writer.NewParquetWriterFromProto(fw, new(ProtoMessage), 1)
pw, err := writer.NewParquetWriterFromProto(fw, &vals[0], 1)
if err != nil {
log.Println("Can't create parquet writer", err)
return
}
for _, message := range protoMessages {
if err = pw.Write(message); err != nil {
for _, val := range vals {
if err = pw.Write(val); err != nil {
log.Println("Write error", err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion example/testelement_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
log.Println("Can't create file", err)
return
}
schemaHandler, err := schema.NewSchemaHandlerFromProtoStruct(new(testNestedElem))
schemaHandler, err := schema.NewSchemaHandlerFromStruct(new(testNestedElem))
if err != nil {
log.Println("failed to create the schema handler: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/klauspost/compress v1.16.7
github.com/pierrec/lz4/v4 v4.1.15
github.com/stretchr/testify v1.8.0
github.com/xitongsys/parquet-go v1.6.3-0.20231102094431-8ca067b2bd32 // indirect
github.com/xitongsys/parquet-go v1.6.3-0.20231102094431-8ca067b2bd32
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
google.golang.org/protobuf v1.32.0 // indirect
google.golang.org/protobuf v1.32.0
)
Loading