Skip to content

Commit

Permalink
refactor: Add NewFluxAllocator as an injection point for the GcAlloca…
Browse files Browse the repository at this point in the history
…tor (#4651)

* refactor: Remove unused code

* refactor: Add NewFluxAllocator as an injection point for the GcAllocator

This removes GcAllocator from all tests but still provides a way to inject it temporarily
for testing.

* refactor: Address review comments

* chore: Address review comments

* chore: Address review comments
  • Loading branch information
Markus Westerlind authored Apr 13, 2022
1 parent e237147 commit b48a01b
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 114 deletions.
23 changes: 1 addition & 22 deletions array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,27 +121,8 @@ func TestString(t *testing.T) {
}
}

type A struct {
}

type B struct {
A
}

type C interface {
C() string
}

func (*A) C() string {
return "A"
}

func (*B) C() string {
return "B"
}

func TestNewStringFromBinaryArray(t *testing.T) {
alloc := &fluxmemory.GcAllocator{ResourceAllocator: &fluxmemory.ResourceAllocator{}}
alloc := fluxmemory.NewResourceAllocator(nil)
// Need to use the Apache binary builder to be able to create an actual
// Arrow Binary array.
sb := apachearray.NewBinaryBuilder(alloc, array.StringType)
Expand All @@ -164,8 +145,6 @@ func TestNewStringFromBinaryArray(t *testing.T) {
a.Release()
s.Release()

alloc.GC()

if want, got := int64(0), alloc.Allocated(); want != got {
t.Errorf("epxected allocated to be %v, was %v", want, got)
}
Expand Down
5 changes: 2 additions & 3 deletions compiler/vectorized_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestVectorizedFns(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
checked := arrow.NewCheckedAllocator(memory.DefaultAllocator)
mem := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{Allocator: checked}}
mem := memory.NewResourceAllocator(checked)

ctx := context.Background()
ctx = feature.Inject(
Expand Down Expand Up @@ -151,15 +151,14 @@ func TestVectorizedFns(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}

want := vectorizedObjectFromMap(tc.want, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
want := vectorizedObjectFromMap(tc.want, memory.NewResourceAllocator(nil))
if !cmp.Equal(want, got, CmpOptions...) {
t.Errorf("unexpected value -want/+got\n%s", cmp.Diff(want, got, CmpOptions...))
}

got.Release()
input.Release()

mem.GC()
checked.AssertSize(t, 0)
})
}
Expand Down
15 changes: 3 additions & 12 deletions execute/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,9 @@ func TestTransportDataset_Process(t *testing.T) {
dataset.AddTransformation(transport)

mem := arrowmem.NewCheckedAllocator(memory.DefaultAllocator)
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{
Allocator: mem,
}}
alloc := memory.NewResourceAllocator(mem)

defer func() {
alloc.GC()
mem.AssertSize(t, 0)
}()

Expand Down Expand Up @@ -101,12 +98,9 @@ func TestTransportDataset_AddTransformation(t *testing.T) {

mem := arrowmem.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{
Allocator: mem,
}}
alloc := memory.NewResourceAllocator(mem)

defer func() {
alloc.GC()
mem.AssertSize(t, 0)
}()

Expand Down Expand Up @@ -206,12 +200,9 @@ func TestTransportDataset_MultipleDownstream(t *testing.T) {

mem := arrowmem.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{
Allocator: mem,
}}
alloc := memory.NewResourceAllocator(mem)

defer func() {
alloc.GC()
mem.AssertSize(t, 0)
}()

Expand Down
6 changes: 2 additions & 4 deletions execute/executetest/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (tt TableTest) run(t *testing.T, name string, f func(tt *tableTest)) {
TableTest: tt,
t: t,
logger: zaptest.NewLogger(t),
alloc: &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}},
alloc: memory.NewResourceAllocator(nil),
})
})
}
Expand All @@ -675,7 +675,7 @@ type tableTest struct {
TableTest
t *testing.T
logger *zap.Logger
alloc *memory.GcAllocator
alloc *memory.ResourceAllocator
}

func (tt *tableTest) do(f func(tbl flux.Table) error) {
Expand Down Expand Up @@ -710,8 +710,6 @@ func (tt *tableTest) finish(allocatorUsed bool) {
}
}

tt.alloc.GC()

// Verify that all memory is correctly released if we use the table properly.
if got := tt.alloc.Allocated(); got != 0 {
tt.t.Errorf("caught memory leak: %d bytes were not released", got)
Expand Down
11 changes: 5 additions & 6 deletions execute/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func TestColListTable(t *testing.T) {

func TestColListTable_AppendNil(t *testing.T) {
key := execute.NewGroupKey(nil, nil)
tb := execute.NewColListTableBuilder(key, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
tb := execute.NewColListTableBuilder(key, memory.NewResourceAllocator(nil))

// Add a column for the value.
idx, _ := tb.AddCol(flux.ColMeta{
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestColListTable_AppendNil(t *testing.T) {

func TestColListTable_SetNil(t *testing.T) {
key := execute.NewGroupKey(nil, nil)
tb := execute.NewColListTableBuilder(key, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
tb := execute.NewColListTableBuilder(key, memory.NewResourceAllocator(nil))

// Add a column for the value.
idx, _ := tb.AddCol(flux.ColMeta{
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestColListTable_SetNil(t *testing.T) {
}

func TestCopyTable(t *testing.T) {
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}}
alloc := memory.NewResourceAllocator(nil)

input, err := gen.Input(context.Background(), gen.Schema{
Tags: []gen.Tag{
Expand Down Expand Up @@ -363,7 +363,6 @@ func TestCopyTable(t *testing.T) {
buf.Done()
}

alloc.GC()
// Ensure there has been no memory leak.
if got, want := alloc.Allocated(), int64(0); got != want {
t.Errorf("memory leak -want/+got:\n\t- %d\n\t+ %d", want, got)
Expand Down Expand Up @@ -450,7 +449,7 @@ func TestColListTableBuilder_AppendValues(t *testing.T) {
name: "Strings",
typ: flux.TString,
values: func() array.Array {
b := arrow.NewStringBuilder(&memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
b := arrow.NewStringBuilder(memory.NewResourceAllocator(nil))
b.Append("a")
b.Append("d")
b.AppendNull()
Expand Down Expand Up @@ -518,7 +517,7 @@ func TestColListTableBuilder_AppendValues(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
key := execute.NewGroupKey(nil, nil)
b := execute.NewColListTableBuilder(key, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
b := execute.NewColListTableBuilder(key, memory.NewResourceAllocator(nil))
if _, err := b.AddCol(flux.ColMeta{Label: "_value", Type: tt.typ}); err != nil {
t.Fatal(err)
}
Expand Down
16 changes: 8 additions & 8 deletions lang/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,19 +288,19 @@ func (p *Program) Start(ctx context.Context, alloc memory.Allocator) (flux.Query
}
}

var statsAlloc memory.StatsAllocator
if feature.SetfinalizerMemoryTracking().Enabled(ctx) {
statsAlloc = &memory.GcAllocator{ResourceAllocator: resourceAlloc}
alloc = memory.NewGcAllocator(resourceAlloc)
} else {
statsAlloc = resourceAlloc
alloc = resourceAlloc
}

q := &query{
ctx: cctx,
results: results,
alloc: statsAlloc,
span: s,
cancel: cancel,
ctx: cctx,
results: results,
allocatorStats: resourceAlloc,
alloc: alloc,
span: s,
cancel: cancel,
stats: flux.Statistics{
Metadata: make(metadata.Metadata),
},
Expand Down
21 changes: 11 additions & 10 deletions lang/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import (

// query implements the flux.Query interface.
type query struct {
ctx context.Context
results chan flux.Result
stats flux.Statistics
alloc memory.StatsAllocator
span opentracing.Span
cancel func()
err error
wg sync.WaitGroup
ctx context.Context
results chan flux.Result
stats flux.Statistics
allocatorStats *memory.ResourceAllocator
alloc memory.Allocator
span opentracing.Span
cancel func()
err error
wg sync.WaitGroup
}

func (q *query) Results() <-chan flux.Result {
Expand All @@ -29,8 +30,8 @@ func (q *query) Results() <-chan flux.Result {
func (q *query) Done() {
q.cancel()
q.wg.Wait()
q.stats.MaxAllocated = q.alloc.MaxAllocated()
q.stats.TotalAllocated = q.alloc.TotalAllocated()
q.stats.MaxAllocated = q.allocatorStats.MaxAllocated()
q.stats.TotalAllocated = q.allocatorStats.TotalAllocated()
if q.span != nil {
q.span.Finish()
q.span = nil
Expand Down
53 changes: 18 additions & 35 deletions memory/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ type Allocator interface {
Account(size int) error
}

type StatsAllocator interface {
Allocator

MaxAllocated() int64
TotalAllocated() int64
}

// GoAllocator implements a version of the allocator that uses native Go
// slices. It does not track or limit the amount of memory that is used.
type GoAllocator struct {
Expand Down Expand Up @@ -72,6 +65,12 @@ type ResourceAllocator struct {
Allocator memory.Allocator
}

func NewResourceAllocator(allocator memory.Allocator) *ResourceAllocator {
return &ResourceAllocator{
Allocator: allocator,
}
}

// Allocate will ensure that the requested memory is available and
// record that it is in use.
func (a *ResourceAllocator) Allocate(size int) []byte {
Expand Down Expand Up @@ -248,19 +247,25 @@ func (a *ResourceAllocator) allocator() memory.Allocator {
}

type GcAllocator struct {
*ResourceAllocator
mem Allocator
}

func NewGcAllocator(mem Allocator) *GcAllocator {
return &GcAllocator{
mem: mem,
}
}

func (a *GcAllocator) Allocate(size int) []byte {
bs := a.ResourceAllocator.Allocate(size)
bs := a.mem.Allocate(size)
if len(bs) > 0 {
l := len(bs)
runtime.SetFinalizer(&bs[0], func(b *byte) {
if l != 0 {
// Allows us to reconstruct the slice without creating a circular dependency between
// the finalizer and the slice
bs2 := unsafe.Slice(b, l)
a.ResourceAllocator.Free(bs2)
a.mem.Free(bs2)
l = 0
}
})
Expand All @@ -269,7 +274,7 @@ func (a *GcAllocator) Allocate(size int) []byte {
}

func (a *GcAllocator) Reallocate(size int, b []byte) []byte {
bs := a.ResourceAllocator.Reallocate(size, b)
bs := a.mem.Reallocate(size, b)

// If the reallocation extended `b` the previous finalizer are still attached
if len(bs) > 0 {
Expand All @@ -280,7 +285,7 @@ func (a *GcAllocator) Reallocate(size int, b []byte) []byte {
// Allows us to reconstruct the slice without creating a circular dependency between
// the finalizer and the slice
bs2 := unsafe.Slice(b, l)
a.ResourceAllocator.Free(bs2)
a.mem.Free(bs2)
l = 0
}
})
Expand All @@ -293,29 +298,7 @@ func (a *GcAllocator) Free(b []byte) {

}

func (a *GcAllocator) Account(size int) error { return a.ResourceAllocator.Account(size) }

func (a *GcAllocator) MaxAllocated() int64 {
return a.ResourceAllocator.MaxAllocated()
}

func (a *GcAllocator) TotalAllocated() int64 {
return a.ResourceAllocator.TotalAllocated()
}

// Forces "all" unreachable memory to be garbage collected
func (mem *GcAllocator) GC() {
prev := int64(0)
for i := 0; i < 30; i++ {
// This does not clear all unreachable memory so we need to loop until everything unreachable
// has been cleared out
runtime.GC()
if mem.Allocated() == prev {
break
}
prev = mem.Allocated()
}
}
func (a *GcAllocator) Account(size int) error { return a.mem.Account(size) }

// Manager will manage the memory allowed for the Allocator.
// The Allocator may use the Manager to request additional memory or to
Expand Down
Loading

0 comments on commit b48a01b

Please sign in to comment.