Skip to content

Commit

Permalink
Merge pull request #322 from TileDB-Inc/smr/sc-49723/add-array-consol…
Browse files Browse the repository at this point in the history
…idate-fragments

Add binding for tiledb_array_consolidate_fragments.
  • Loading branch information
shaunrd0 authored Jul 19, 2024
2 parents a26aa12 + 40f7cbf commit 8fda6ba
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 16 deletions.
8 changes: 2 additions & 6 deletions array.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,12 +1098,8 @@ func DeleteFragmentsList(tdbCtx *Context, uri string, fragmentURIs []string) err
curi := C.CString(uri)
defer C.free(unsafe.Pointer(curi))

var list []*C.char
for _, furi := range fragmentURIs {
cfuri := C.CString(furi)
defer C.free(unsafe.Pointer(cfuri))
list = append(list, cfuri)
}
list, freeMemory := cStringArray(fragmentURIs)
defer freeMemory()

ret := C.tiledb_array_delete_fragments_list(tdbCtx.tiledbContext, curi, (**C.char)(unsafe.Pointer(&list[0])), C.size_t(len(list)))
if ret != C.TILEDB_OK {
Expand Down
29 changes: 28 additions & 1 deletion array_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ package tiledb
#include <stdlib.h>
*/
import "C"
import "fmt"
import (
"fmt"
"runtime"
"unsafe"
)

// ConsolidationPlan is a consolidation plan for array
type ConsolidationPlan struct {
Expand Down Expand Up @@ -94,3 +98,26 @@ func (cp *ConsolidationPlan) DumpJSON() (string, error) {

return json, nil
}

// ConsolidateFragments consolidates an explicit list of fragments in an array into a single fragment.
// You must first finalize all queries to the array before consolidation can
// begin (as consolidation temporarily acquires an exclusive lock on the array).
func (a *Array) ConsolidateFragments(config *Config, fragmentList []string) error {
if config == nil {
return fmt.Errorf("Config must not be nil for Consolidate")
}

curi := C.CString(a.uri)
defer C.free(unsafe.Pointer(curi))

list, freeMemory := cStringArray(fragmentList)
defer freeMemory()

ret := C.tiledb_array_consolidate_fragments(a.context.tiledbContext, curi, (**C.char)(slicePtr(list)), C.uint64_t(len(list)), config.tiledbConfig)
if ret != C.TILEDB_OK {
return fmt.Errorf("Error consolidating tiledb array fragment list: %s", a.context.LastError())
}

runtime.KeepAlive(config)
return nil
}
84 changes: 75 additions & 9 deletions array_experimental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/stretchr/testify/require"
)

func TestGetConsolidationPlan(t *testing.T) {
// Create an 1d array
func create1DTestArray(t *testing.T) *Array {
// Create a 1d array

// Create configuration
config, err := NewConfig()
Expand Down Expand Up @@ -63,10 +63,12 @@ func TestGetConsolidationPlan(t *testing.T) {
err = array.Create(arraySchema)
require.NoError(t, err)

// Write to array
return array
}

func write1DTestArray(t *testing.T, array *Array, data []int32) {
// Open array for writing
err = array.Open(TILEDB_WRITE)
err := array.Open(TILEDB_WRITE)
require.NoError(t, err)

// Create subarray
Expand All @@ -76,15 +78,14 @@ func TestGetConsolidationPlan(t *testing.T) {
require.NoError(t, err)

// Create write query
query, err := NewQuery(context, array)
query, err := NewQuery(array.context, array)
require.NoError(t, err)
assert.NotNil(t, query)
err = query.SetSubarray(subarray)
require.NoError(t, err)

// Initialize the data buffer
bufferV := []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
_, err = query.SetDataBuffer("v", bufferV)
_, err = query.SetDataBuffer("v", data)
require.NoError(t, err)

// Submit write query
Expand All @@ -99,6 +100,12 @@ func TestGetConsolidationPlan(t *testing.T) {
// close array
err = array.Close()
require.NoError(t, err)
}

func TestGetConsolidationPlan(t *testing.T) {
array := create1DTestArray(t)

write1DTestArray(t, array, []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})

checkConsolidationPlan := func(t *testing.T, cplan *ConsolidationPlan) {
numNodes, err := cplan.NumNodes()
Expand All @@ -113,15 +120,15 @@ func TestGetConsolidationPlan(t *testing.T) {
require.NoError(t, err)

// fragment uris in the plan are relative
fullPath := filepath.Join(tmpArrayPath, "__fragments", fragmentURI)
fullPath := filepath.Join(array.uri, "__fragments", fragmentURI)
_, err = os.Stat(fullPath)
require.NoError(t, err)
}
}

tdbCtx, err := NewContext(nil)
require.NoError(t, err)
arr, err := NewArray(tdbCtx, tmpArrayPath)
arr, err := NewArray(tdbCtx, array.uri)
require.NoError(t, err)
require.NoError(t, arr.Open(TILEDB_READ))
t.Cleanup(func() { arr.Close() })
Expand All @@ -131,3 +138,62 @@ func TestGetConsolidationPlan(t *testing.T) {

checkConsolidationPlan(t, cplan)
}

func TestConsolidateFragments(t *testing.T) {
// The test is skipped pending a core release for 2.25.0 that includes this fix:
// https://github.com/TileDB-Inc/TileDB/pull/5135
t.Skip("Skipping fragment list consolidation SC-51140")

array := create1DTestArray(t)

numFrags := 5
for i := 0; i < numFrags; i++ {
write1DTestArray(t, array, []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
}

fragmentInfo, err := NewFragmentInfo(array.context, array.uri)
require.NoError(t, err)

err = fragmentInfo.Load()
require.NoError(t, err)

fragInfoNum, err := fragmentInfo.GetFragmentNum()
require.NoError(t, err)
require.EqualValues(t, numFrags, fragInfoNum)
fragUris := make([]string, numFrags)
for i := 0; i < numFrags; i++ {
uri, err := fragmentInfo.GetFragmentURI(uint32(i))
require.NoError(t, err)
fragUris[i] = uri
}

// Default consolidation mode is 'fragments'.
config, err := array.context.Config()
require.NoError(t, err)

err = array.ConsolidateFragments(config, fragUris)
require.NoError(t, err)

// Check that the new consolidated fragment was created.
err = fragmentInfo.Load()
require.NoError(t, err)
fragInfoNum, err = fragmentInfo.GetFragmentNum()
require.NoError(t, err)
fragToVacuumNum, err := fragmentInfo.GetToVacuumNum()
require.NoError(t, err)
require.EqualValues(t, numFrags, fragToVacuumNum)
require.Equal(t, uint32(1), fragInfoNum)

err = array.Vacuum(config)
require.NoError(t, err)

// Check for one fragment after vacuum.
err = fragmentInfo.Load()
require.NoError(t, err)
fragInfoNum, err = fragmentInfo.GetFragmentNum()
require.NoError(t, err)
fragToVacuumNum, err = fragmentInfo.GetToVacuumNum()
require.NoError(t, err)
require.Equal(t, uint32(1), fragInfoNum)
require.Equal(t, uint32(0), fragToVacuumNum)
}
19 changes: 19 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package tiledb

/*
#include <stdlib.h>
*/
import "C"
import (
"unsafe"
)
Expand All @@ -18,3 +22,18 @@ type scalarType interface {
func slicePtr[T any](slc []T) unsafe.Pointer {
return unsafe.Pointer(unsafe.SliceData(slc))
}

// cStringArray takes an array of Go strings and converts it to an array of CStrings.
// The function returned should be deferred by the caller to free allocated memory.
func cStringArray(stringList []string) ([]*C.char, func()) {
list := make([]*C.char, len(stringList))
for i, str := range stringList {
list[i] = C.CString(str)
}

return list, func() {
for _, str := range list {
C.free(unsafe.Pointer(str))
}
}
}

0 comments on commit 8fda6ba

Please sign in to comment.