Skip to content

Commit

Permalink
refactor: Add NewFluxAllocator as an injection point for the GcAllocator
Browse files Browse the repository at this point in the history
This removes GcAllocator from all tests but still provides a way to inject it temporarily
for testing.
  • Loading branch information
Markus Westerlind committed Apr 7, 2022
1 parent 6ba7340 commit f18e259
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 32 deletions.
2 changes: 1 addition & 1 deletion array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestString(t *testing.T) {
}

func TestNewStringFromBinaryArray(t *testing.T) {
alloc := &fluxmemory.GcAllocator{ResourceAllocator: &fluxmemory.ResourceAllocator{}}
alloc := fluxmemory.NewFluxAllocator(nil)
// Need to use the Apache binary builder to be able to create an actual
// Arrow Binary array.
sb := apachearray.NewBinaryBuilder(alloc, array.StringType)
Expand Down
4 changes: 2 additions & 2 deletions compiler/vectorized_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestVectorizedFns(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
checked := arrow.NewCheckedAllocator(memory.DefaultAllocator)
mem := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{Allocator: checked}}
mem := memory.NewFluxAllocator(checked)

pkg, err := runtime.AnalyzeSource(tc.fn)
if err != nil {
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestVectorizedFns(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}

want := vectorizedObjectFromMap(tc.want, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
want := vectorizedObjectFromMap(tc.want, memory.NewFluxAllocator(nil))
if !cmp.Equal(want, got, CmpOptions...) {
t.Errorf("unexpected value -want/+got\n%s", cmp.Diff(want, got, CmpOptions...))
}
Expand Down
12 changes: 3 additions & 9 deletions execute/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ func TestTransportDataset_Process(t *testing.T) {
dataset.AddTransformation(transport)

mem := arrowmem.NewCheckedAllocator(memory.DefaultAllocator)
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{
Allocator: mem,
}}
alloc := memory.NewFluxAllocator(mem)

defer func() {
alloc.GC()
Expand Down Expand Up @@ -101,9 +99,7 @@ func TestTransportDataset_AddTransformation(t *testing.T) {

mem := arrowmem.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{
Allocator: mem,
}}
alloc := memory.NewFluxAllocator(mem)

defer func() {
alloc.GC()
Expand Down Expand Up @@ -206,9 +202,7 @@ func TestTransportDataset_MultipleDownstream(t *testing.T) {

mem := arrowmem.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{
Allocator: mem,
}}
alloc := memory.NewFluxAllocator(mem)

defer func() {
alloc.GC()
Expand Down
4 changes: 2 additions & 2 deletions execute/executetest/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (tt TableTest) run(t *testing.T, name string, f func(tt *tableTest)) {
TableTest: tt,
t: t,
logger: zaptest.NewLogger(t),
alloc: &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}},
alloc: memory.NewFluxAllocator(nil),
})
})
}
Expand All @@ -675,7 +675,7 @@ type tableTest struct {
TableTest
t *testing.T
logger *zap.Logger
alloc *memory.GcAllocator
alloc memory.FluxAllocator
}

func (tt *tableTest) do(f func(tbl flux.Table) error) {
Expand Down
10 changes: 5 additions & 5 deletions execute/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func TestColListTable(t *testing.T) {

func TestColListTable_AppendNil(t *testing.T) {
key := execute.NewGroupKey(nil, nil)
tb := execute.NewColListTableBuilder(key, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
tb := execute.NewColListTableBuilder(key, memory.NewFluxAllocator(nil))

// Add a column for the value.
idx, _ := tb.AddCol(flux.ColMeta{
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestColListTable_AppendNil(t *testing.T) {

func TestColListTable_SetNil(t *testing.T) {
key := execute.NewGroupKey(nil, nil)
tb := execute.NewColListTableBuilder(key, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
tb := execute.NewColListTableBuilder(key, memory.NewFluxAllocator(nil))

// Add a column for the value.
idx, _ := tb.AddCol(flux.ColMeta{
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestColListTable_SetNil(t *testing.T) {
}

func TestCopyTable(t *testing.T) {
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}}
alloc := memory.NewFluxAllocator(nil)

input, err := gen.Input(context.Background(), gen.Schema{
Tags: []gen.Tag{
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestColListTableBuilder_AppendValues(t *testing.T) {
name: "Strings",
typ: flux.TString,
values: func() array.Array {
b := arrow.NewStringBuilder(&memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
b := arrow.NewStringBuilder(memory.NewFluxAllocator(nil))
b.Append("a")
b.Append("d")
b.AppendNull()
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestColListTableBuilder_AppendValues(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
key := execute.NewGroupKey(nil, nil)
b := execute.NewColListTableBuilder(key, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
b := execute.NewColListTableBuilder(key, memory.NewFluxAllocator(nil))
if _, err := b.AddCol(flux.ColMeta{Label: "_value", Type: tt.typ}); err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion lang/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (p *Program) Start(ctx context.Context, alloc memory.Allocator) (flux.Query
}
}

var statsAlloc memory.StatsAllocator
var statsAlloc memory.FluxAllocator
if feature.SetfinalizerMemoryTracking().Enabled(ctx) {
statsAlloc = &memory.GcAllocator{ResourceAllocator: resourceAlloc}
} else {
Expand Down
2 changes: 1 addition & 1 deletion lang/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type query struct {
ctx context.Context
results chan flux.Result
stats flux.Statistics
alloc memory.StatsAllocator
alloc memory.FluxAllocator
span opentracing.Span
cancel func()
err error
Expand Down
19 changes: 18 additions & 1 deletion memory/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ type Allocator interface {
Account(size int) error
}

type StatsAllocator interface {
type FluxAllocator interface {
Allocator

Allocated() int64
MaxAllocated() int64
TotalAllocated() int64

GC()
}

// GoAllocator implements a version of the allocator that uses native Go
Expand Down Expand Up @@ -72,6 +75,17 @@ type ResourceAllocator struct {
Allocator memory.Allocator
}

func NewFluxAllocator(allocator memory.Allocator) FluxAllocator {
// Avoid nesting multiple ResourceAllocator
resourceAlloc, ok := allocator.(*ResourceAllocator)
if !ok {
resourceAlloc = &ResourceAllocator{
Allocator: allocator,
}
}
return resourceAlloc
}

// Allocate will ensure that the requested memory is available and
// record that it is in use.
func (a *ResourceAllocator) Allocate(size int) []byte {
Expand Down Expand Up @@ -239,6 +253,9 @@ func (a *ResourceAllocator) requestMemory(allocated, want int64) error {
}, codes.ResourceExhausted)
}

func (mem *ResourceAllocator) GC() {
}

// allocator returns the underlying memory.Allocator that should be used.
func (a *ResourceAllocator) allocator() memory.Allocator {
if a.Allocator == nil {
Expand Down
18 changes: 9 additions & 9 deletions memory/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestAllocator_Allocate(t *testing.T) {
mem := arrowmemory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

allocator := &memory.GcAllocator{&memory.ResourceAllocator{Allocator: mem}}
allocator := memory.NewFluxAllocator(mem)
b := allocator.Allocate(64)

assert.Equal(t, 64, mem.CurrentAlloc(), "unexpected memory allocation.")
Expand All @@ -42,7 +42,7 @@ func TestAllocator_Reallocate(t *testing.T) {
mem := arrowmemory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

allocator := &memory.GcAllocator{&memory.ResourceAllocator{Allocator: mem}}
allocator := memory.NewFluxAllocator(mem)
b := allocator.Allocate(64)

assert.Equal(t, 64, mem.CurrentAlloc(), "unexpected memory allocation.")
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestAllocator_Reallocate(t *testing.T) {
}

func TestAllocator_MaxAfterFree(t *testing.T) {
allocator := &memory.GcAllocator{&memory.ResourceAllocator{}}
allocator := memory.NewFluxAllocator(nil)
if err := allocator.Account(64); err != nil {
t.Fatalf("unexpected error: %s", err)
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestAllocator_MaxAfterFree(t *testing.T) {

func TestAllocator_Limit(t *testing.T) {
maxLimit := int64(64)
allocator := &memory.GcAllocator{&memory.ResourceAllocator{Limit: &maxLimit}}
allocator := memory.NewFluxAllocator(&memory.ResourceAllocator{Limit: &maxLimit})
if err := allocator.Account(64); err != nil {
t.Fatalf("unexpected error: %s", err)
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestAllocator_Limit(t *testing.T) {
}

func TestAllocator_Free(t *testing.T) {
allocator := &memory.GcAllocator{&memory.ResourceAllocator{}}
allocator := memory.NewFluxAllocator(nil)
if err := allocator.Account(64); err != nil {
t.Fatalf("unexpected error: %s", err)
}
Expand Down Expand Up @@ -238,10 +238,10 @@ func TestAllocator_RequestMemory(t *testing.T) {

// Set the Limit to 64 and allocate 32 bytes of it.
// This should not request more memory from the manager.
allocator := &memory.GcAllocator{&memory.ResourceAllocator{
allocator := &memory.ResourceAllocator{
Limit: func(v int64) *int64 { return &v }(64),
Manager: manager,
}}
}
if err := allocator.Account(32); err != nil {
t.Fatalf("unexpected error: %s", err)
}
Expand Down Expand Up @@ -313,10 +313,10 @@ func TestAllocator_RequestMemory_Concurrently(t *testing.T) {

// Set the Limit to 64 and allocate 32 bytes of it.
// This should not request more memory from the manager.
allocator := &memory.GcAllocator{&memory.ResourceAllocator{
allocator := &memory.ResourceAllocator{
Limit: func(v int64) *int64 { return &v }(0),
Manager: manager,
}}
}
var wg sync.WaitGroup
for i := 0; i < 128; i++ {
wg.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion values/vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestVectorTypes(t *testing.T) {
},
}
for _, tc := range testCases {
mem := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}}
mem := memory.NewFluxAllocator(nil)
got := NewVectorFromElements(mem, tc.input...)

if !got.ElementType().Equal(tc.wantType) {
Expand Down

0 comments on commit f18e259

Please sign in to comment.