Skip to content

Commit

Permalink
ARROW-17475: [Go] Function interface and Registry impl (apache#13924)
Browse files Browse the repository at this point in the history
Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored and zagto committed Oct 7, 2022
1 parent ad9a63c commit c0ea0e1
Show file tree
Hide file tree
Showing 8 changed files with 547 additions and 0 deletions.
1 change: 1 addition & 0 deletions dev/release/rat_exclude_files.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ go/arrow/cdata/test/go.sum
go/arrow/unionmode_string.go
go/arrow/compute/go.sum
go/arrow/compute/datumkind_string.go
go/arrow/compute/funckind_string.go
go/*.tmpldata
go/*.s
go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go
Expand Down
29 changes: 29 additions & 0 deletions go/arrow/compute/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package compute is a native-go implementation of an Acero-like
// arrow compute engine.
//
// While consumers of Arrow that are able to use CGO could utilize the
// C Data API (using the cdata package) and could link against the
// acero library directly, there are consumers who cannot use CGO. This
// is an attempt to provide for those users, and in general create a
// native-go arrow compute engine.
//
// Everything in this package should be considered Experimental for now.
package compute

//go:generate stringer -type=FuncKind -linecomment
4 changes: 4 additions & 0 deletions go/arrow/compute/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ type FunctionOptionsEqual interface {
Equals(FunctionOptions) bool
}

type FunctionOptionsCloneable interface {
Clone() FunctionOptions
}

type MakeStructOptions struct {
FieldNames []string `compute:"field_names"`
FieldNullability []bool `compute:"field_nullability"`
Expand Down
27 changes: 27 additions & 0 deletions go/arrow/compute/funckind_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions go/arrow/compute/functions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compute

import (
"context"
)

type Function interface {
Name() string
Kind() FuncKind
Arity() Arity
Doc() FunctionDoc
NumKernels() int
Execute(context.Context, FunctionOptions, ...Datum) (Datum, error)
DefaultOptions() FunctionOptions
Validate() error
}

type Arity struct {
NArgs int
IsVarArgs bool
}

func Nullary() Arity { return Arity{0, false} }
func Unary() Arity { return Arity{1, false} }
func Binary() Arity { return Arity{2, false} }
func Ternary() Arity { return Arity{3, false} }
func VarArgs(minArgs int) Arity { return Arity{minArgs, true} }

type FunctionDoc struct {
Summary string
Description string
ArgNames []string
OptionsClass string
OptionsRequired bool
}

var EmptyFuncDoc FunctionDoc

type FuncKind int8

const (
FuncScalar FuncKind = iota // Scalar
FuncVector // Vector
FuncScalarAgg // ScalarAggregate
FuncHashAgg // HashAggregate
FuncMeta // Meta
)
46 changes: 46 additions & 0 deletions go/arrow/compute/functions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compute_test

import (
"testing"

"github.com/apache/arrow/go/v10/arrow/compute"
"github.com/stretchr/testify/assert"
)

func TestArityBasics(t *testing.T) {
nullary := compute.Nullary()
assert.Equal(t, 0, nullary.NArgs)
assert.False(t, nullary.IsVarArgs)

unary := compute.Unary()
assert.Equal(t, 1, unary.NArgs)
assert.False(t, unary.IsVarArgs)

binary := compute.Binary()
assert.Equal(t, 2, binary.NArgs)
assert.False(t, binary.IsVarArgs)

ternary := compute.Ternary()
assert.Equal(t, 3, ternary.NArgs)
assert.False(t, ternary.IsVarArgs)

varargs := compute.VarArgs(2)
assert.Equal(t, 2, varargs.NArgs)
assert.True(t, varargs.IsVarArgs)
}
201 changes: 201 additions & 0 deletions go/arrow/compute/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compute

import (
"sync"

"github.com/apache/arrow/go/v10/arrow/internal/debug"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)

type FunctionRegistry interface {
CanAddFunction(fn Function, allowOverwrite bool) bool
AddFunction(fn Function, allowOverwrite bool) bool
CanAddAlias(target, source string) bool
AddAlias(target, source string) bool
GetFunction(name string) (Function, bool)
GetFunctionNames() []string
NumFunctions() int

canAddFuncName(string, bool) bool
}

var (
registry FunctionRegistry
once sync.Once
)

func GetFunctionRegistry() FunctionRegistry {
once.Do(func() {
registry = NewRegistry()
// initialize the others
})
return registry
}

func NewRegistry() FunctionRegistry {
return &funcRegistry{
nameToFunction: make(map[string]Function)}
}

func NewChildRegistry(parent FunctionRegistry) FunctionRegistry {
return &funcRegistry{
parent: parent.(*funcRegistry),
nameToFunction: make(map[string]Function)}
}

type funcRegistry struct {
parent *funcRegistry

mx sync.RWMutex
nameToFunction map[string]Function
}

func (reg *funcRegistry) getLocker(add bool) sync.Locker {
if add {
return &reg.mx
}
return reg.mx.RLocker()
}

func (reg *funcRegistry) CanAddFunction(fn Function, allowOverwrite bool) bool {
if reg.parent != nil && !reg.parent.CanAddFunction(fn, allowOverwrite) {
return false
}

return reg.doAddFunction(fn, allowOverwrite, false)
}

func (reg *funcRegistry) AddFunction(fn Function, allowOverwrite bool) bool {
if reg.parent != nil && !reg.parent.CanAddFunction(fn, allowOverwrite) {
return false
}

return reg.doAddFunction(fn, allowOverwrite, true)
}

func (reg *funcRegistry) CanAddAlias(target, source string) bool {
if reg.parent != nil && !reg.parent.canAddFuncName(target, false) {
return false
}
return reg.doAddAlias(target, source, false)
}

func (reg *funcRegistry) AddAlias(target, source string) bool {
if reg.parent != nil && !reg.parent.canAddFuncName(target, false) {
return false
}

return reg.doAddAlias(target, source, true)
}

func (reg *funcRegistry) GetFunction(name string) (Function, bool) {
reg.mx.RLock()
defer reg.mx.RUnlock()

if fn, ok := reg.nameToFunction[name]; ok {
return fn, ok
}

if reg.parent != nil {
return reg.parent.GetFunction(name)
}

return nil, false
}

func (reg *funcRegistry) GetFunctionNames() (out []string) {
if reg.parent != nil {
out = reg.parent.GetFunctionNames()
} else {
out = make([]string, 0, len(reg.nameToFunction))
}
reg.mx.RLock()
defer reg.mx.RUnlock()

out = append(out, maps.Keys(reg.nameToFunction)...)
slices.Sort(out)
return
}

func (reg *funcRegistry) NumFunctions() (n int) {
if reg.parent != nil {
n = reg.parent.NumFunctions()
}
reg.mx.RLock()
defer reg.mx.RUnlock()
return n + len(reg.nameToFunction)
}

func (reg *funcRegistry) canAddFuncName(name string, allowOverwrite bool) bool {
if reg.parent != nil {
reg.parent.mx.RLock()
defer reg.parent.mx.RUnlock()

if !reg.parent.canAddFuncName(name, allowOverwrite) {
return false
}
}
if !allowOverwrite {
_, ok := reg.nameToFunction[name]
return !ok
}
return true
}

func (reg *funcRegistry) doAddFunction(fn Function, allowOverwrite bool, add bool) bool {
debug.Assert(fn.Validate() == nil, "invalid function")

lk := reg.getLocker(add)
lk.Lock()
defer lk.Unlock()

name := fn.Name()
if !reg.canAddFuncName(name, allowOverwrite) {
return false
}

if add {
reg.nameToFunction[name] = fn
}
return true
}

func (reg *funcRegistry) doAddAlias(target, source string, add bool) bool {
// source name must exist in the registry or the parent
// check outside the mutex, in case GetFunction has a mutex
// acquisition
fn, ok := reg.GetFunction(source)
if !ok {
return false
}

lk := reg.getLocker(add)
lk.Lock()
defer lk.Unlock()

if !reg.canAddFuncName(target, false) {
return false
}

if add {
reg.nameToFunction[target] = fn
}
return true
}
Loading

0 comments on commit c0ea0e1

Please sign in to comment.