Skip to content

Commit

Permalink
[C++, python] Optimizing indexer for panda create and pyarrow lookup
Browse files Browse the repository at this point in the history
-Removed std::vector based lookup function to speedup panda's lookup
-Add a speiclized lookup for pyarrow

Signed-off-by: Behnam Robatmili <[email protected]>
  • Loading branch information
beroy committed Feb 27, 2024
1 parent e74810a commit 3ac83b3
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 40 deletions.
139 changes: 99 additions & 40 deletions apis/python/src/tiledbsoma/reindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/

#include <tiledbsoma/reindexer/reindexer.h>

#include <tiledbsoma/utils/carrow.h>
#include "common.h"

#define DENUM(x) .value(#x, TILEDB_##x)
Expand All @@ -40,6 +40,94 @@ namespace libtiledbsomacpp {
namespace py = pybind11;
using namespace py::literals;
using namespace tiledbsoma;
#include <iostream>

py::array_t<int64_t> get_indexer_py_arrow(
IntIndexer& indexer, py::object py_arrow_array);
/***
* Handle general lookup for Re-indexer
* @param indexer reference to the indexer
* @param lookups input values to be looked up
* @return looked up values
*/
py::array_t<int64_t> get_indexer_general(
IntIndexer& indexer, py::array_t<int64_t> lookups) {
auto input_buffer = lookups.request();
int64_t* input_ptr = static_cast<int64_t*>(input_buffer.ptr);
size_t size = input_buffer.shape[0];
auto results = py::array_t<int64_t>(size);
auto results_buffer = results.request();
size_t results_size = results_buffer.shape[0];
int64_t* results_ptr = static_cast<int64_t*>(results_buffer.ptr);
indexer.lookup(input_ptr, results_ptr, size);
return results;
}

/***
* Helper function to provide data and schema for an arrow object
* @param object python object
* @param arrow_array extracted array data
* @param arrow_schema extracted array schema
*/
//
void extract_py_array_schema(
const pybind11::handle object,
ArrowArray& arrow_array,
ArrowSchema& arrow_schema) {
uintptr_t arrow_schema_ptr = (uintptr_t)(&arrow_schema);
uintptr_t arrow_array_ptr = (uintptr_t)(&arrow_array);

// Using array._export_to_c to get arrow array and schema.
object.attr("_export_to_c")(arrow_array_ptr, arrow_schema_ptr);
}

/***
* Handle pyarrow-based lookup for Re-indexer
* @param indexer reference to the indexer
* @py_arrow_array pyarrow inputs to be looked up
* @return looked up values
*/
py::array_t<int64_t> get_indexer_py_arrow(
IntIndexer& indexer, py::object py_arrow_array) {
// Handle the general case.
if (!py::hasattr(py_arrow_array, "_export_to_c")) {
return get_indexer_general(indexer, py_arrow_array);
}
py::list array_chunks;
if (py::hasattr(py_arrow_array, "chunks")) {
array_chunks = py_arrow_array.attr("chunks").cast<py::list>();
} else {
array_chunks.append(py_arrow_array);
}

// Calculate the total size of the input chunked array.
int total_size = 0;
for (const pybind11::handle array : array_chunks) {
ArrowSchema arrow_schema;
ArrowArray arrow_array;
extract_py_array_schema(array, arrow_array, arrow_schema);
total_size += arrow_array.length;
}

// Allocate the output
auto results = py::array_t<int64_t>(total_size);
auto results_buffer = results.request();
size_t results_size = results_buffer.shape[0];
int64_t* results_ptr = static_cast<int64_t*>(results_buffer.ptr);

// Write output (one chunk at a time)
int write_offset = 0;
for (const pybind11::handle array : array_chunks) {
ArrowSchema arrow_schema;
ArrowArray arrow_array;
extract_py_array_schema(array, arrow_array, arrow_schema);
auto input_ptr = (int64_t*)arrow_array.buffers[1];
indexer.lookup(
input_ptr, results_ptr + write_offset, arrow_array.length);
write_offset += arrow_array.length;
}
return results;
}

void load_reindexer(py::module& m) {
// Efficient C++ re-indexing (aka hashing unique key values to an index
Expand All @@ -57,48 +145,19 @@ void load_reindexer(py::module& m) {
size_t length = buffer.shape[0];
indexer.map_locations(keys.data(), keys.size(), num_threads);
})
.def(
"map_locations",
[](IntIndexer& indexer,
std::vector<int64_t> keys,
int num_threads) {
indexer.map_locations(keys.data(), keys.size(), num_threads);
})
// Perform lookup for a large input array of keys and return the looked
// up value array (passing ownership from C++ to python)
// Perform lookup for a large input array of keys and writes the
// looked up values into previously allocated array (works for the
// cases in which python and R pre-allocate the array)
.def(
"get_indexer",
[](IntIndexer& indexer, py::array_t<int64_t> lookups) {
auto input_buffer = lookups.request();
int64_t* input_ptr = static_cast<int64_t*>(input_buffer.ptr);
size_t size = input_buffer.shape[0];
auto results = py::array_t<int64_t>(size);
auto results_buffer = results.request();
size_t results_size = results_buffer.shape[0];

int64_t* results_ptr = static_cast<int64_t*>(
results_buffer.ptr);

indexer.lookup(input_ptr, results_ptr, size);
return results;
return get_indexer_general(indexer, lookups);
})
// Perform lookup for a large input array of keys and writes the looked
// up values into previously allocated array (works for the cases in
// which python and R pre-allocate the array)
.def(
"get_indexer",
[](IntIndexer& indexer,
py::array_t<int64_t> lookups,
py::array_t<int64_t>& results) {
auto input_buffer = lookups.request();
int64_t* input_ptr = static_cast<int64_t*>(input_buffer.ptr);
size_t size = input_buffer.shape[0];

auto results_buffer = results.request();
int64_t* results_ptr = static_cast<int64_t*>(
results_buffer.ptr);
size_t results_size = input_buffer.shape[0];
indexer.lookup(input_ptr, input_ptr, size);
});
// If the input is not arrow (does not have _export_to_c attribute),
// it will be handled using a general input method.
.def("get_indexer", [](IntIndexer& indexer, py::object py_arrow_array) {
return get_indexer_py_arrow(indexer, py_arrow_array);
});
}

} // namespace libtiledbsomacpp
24 changes: 24 additions & 0 deletions apis/python/tests/test_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa
import pytest

from tiledbsoma._index_util import tiledbsoma_build_index
Expand Down Expand Up @@ -61,6 +62,29 @@ def test_duplicate_key_indexer_error(
],
),
(list(range(1, 10000)), list(range(1, 10000))),
(np.array(range(1, 10000)), np.array(range(1, 10000))),
(pa.array(range(1, 10000)), pa.array(range(1, 10000))),
(pd.array(range(1, 10000)), pd.array(range(1, 10000))),
(
pa.chunked_array(
[
list(range(1, 10000)),
list(range(10000, 20000)),
list(range(30000, 40000)),
]
),
pa.chunked_array(
[
list(range(1, 10000)),
list(range(10000, 20000)),
list(range(30000, 40000)),
]
),
),
(
pd.Series(list(range(1, 10000)), copy=False),
pd.Series(list(range(1, 10000)), copy=False),
),
],
)
def test_indexer(keys: np.array, lookups: np.array):
Expand Down
92 changes: 92 additions & 0 deletions apis/python/tests/test_indexer_dtatye_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from time import perf_counter

import numpy as np
import pandas as pd
import pyarrow as pa

from tiledbsoma._index_util import tiledbsoma_build_index
from tiledbsoma.options import SOMATileDBContext
from tiledbsoma.options._soma_tiledb_context import _validate_soma_tiledb_context


def build(keys, pandas):
context = _validate_soma_tiledb_context(SOMATileDBContext())
if pandas:
indexer = pd.Index(keys)
indexer.get_indexer([1])
else:
perf_counter()
indexer = tiledbsoma_build_index(keys, context=context)
return indexer


def lookup(indexer, lookups):
results = indexer.get_indexer(lookups)
return results


def run(data_type, keys, lookups, pandas):
start_time = perf_counter()
indexer = build(keys, pandas)
build_time = perf_counter()
lookup(indexer, lookups)
lookup_time = perf_counter()
if pandas:
name = "pandas"
else:
name = "reindexer"
print(
f"{data_type}: Setup time: {name}: {build_time - start_time}: Lookup time: {lookup_time - build_time}"
)


def indexer_test_build(data_type, keys, lookups):
run(data_type, keys, lookups, False)
run(data_type, keys, lookups, True)


def main():
# Creating keys and lookup values of different types (np.array, pa.array, pa.chunked_array, pa.array, pd.Series,
# pd.array and python list) and try the re-indexer and pandas.Index
keys = np.array(list(range(1, 100000000)), dtype=np.int64)
lookups = np.random.randint(0, 1000, 1000000000)
indexer_test_build("np.array", keys, lookups)

keys = pa.array(list(range(1, 100000000)))
lookups = pa.array(np.random.randint(0, 1000, 1000000000).astype(np.int64))
indexer_test_build("pa.array", keys, lookups)

keys = pa.chunked_array(
[
list(range(1, 10000000)),
list(range(10000000, 20000000)),
list(range(20000000, 30000000)),
list(range(30000000, 40000000)),
list(range(40000000, 50000000)),
list(range(50000000, 6000000)),
list(range(60000000, 70000000)),
list(range(70000000, 80000000)),
list(range(80000000, 90000000)),
list(range(90000000, 100000000)),
]
)
lookups = []
for x in range(10):
lookups.append(list(np.random.randint(0, 1000, 100000000, dtype=np.int64)))
lookups = pa.chunked_array(lookups)
indexer_test_build("pa.chunked_array", keys, lookups)

keys = pd.Series(list(range(1, 100000000)))
lookups = pd.Series(np.random.randint(0, 1000, 1000000000))
indexer_test_build("pd.Series", keys, lookups)

keys = pd.array(list(range(1, 100000000)))
lookups = pd.array(np.random.randint(0, 1000, 1000000000, dtype=np.int64))
indexer_test_build("pd.array", keys, lookups)

keys = list(range(1, 100000000))
lookups = list(np.random.randint(0, 1000, 1000000000))
# indexer_test_build("python list", keys, lookups)


main()
5 changes: 5 additions & 0 deletions libtiledbsoma/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ install(FILES
DESTINATION "include/tiledbsoma/reindexer/"
)

install(FILES
${CMAKE_CURRENT_SOURCE_DIR}/utils/carrow.h
DESTINATION "include/tiledbsoma/utils/"
)

install(FILES
${CMAKE_CURRENT_SOURCE_DIR}/tiledbsoma/tiledbsoma
DESTINATION "include/tiledbsoma"
Expand Down

0 comments on commit 3ac83b3

Please sign in to comment.