Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update branch-21.10 with changes from rapidsai fork #1601

Open
wants to merge 42 commits into
base: branch-21.10
Choose a base branch
from
Open
Changes from 5 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
28535f3
fixed error where the ucx mem handle was getting freed twice because …
May 19, 2021
2a6977d
Merge pull request #1545 from williamBlazing/fix/ucx_mem_handle
wmalpica May 19, 2021
713b628
fixed parquet metadata for decimal
wmalpica Aug 3, 2021
68fed9b
changed warning to error for when creating a table based off of a cud…
wmalpica Aug 3, 2021
daa1385
[feature/decimal_support_using_float] cast decimal to float64 for par…
gcca Aug 3, 2021
8d6a324
[feature/decimal_support_using_float] cast decimal to float64 for orc
gcca Aug 3, 2021
9f75314
[feature/decimal_support_using_float] check float64 in schema for orc
gcca Aug 3, 2021
0a81e20
[feature/decimal_support_using_float] update changelog
gcca Aug 4, 2021
c3ab090
[feature/decimal_support_using_float] fix for check style
gcca Aug 4, 2021
9826a15
Merge pull request #1589 from gcca/feature/decimal_support_using_float
wmalpica Aug 4, 2021
8c783b9
Add argument thrust-ignore-cub-version-check
romulo-auccapuclla Aug 5, 2021
ddba597
Update changelog
romulo-auccapuclla Aug 5, 2021
6d9d3cb
Merge pull request #1590 from romulo-auccapuclla/fix/thrust-ignore-cu…
wmalpica Aug 5, 2021
e37e4bd
pre-release changes for 21.08
wmalpica Aug 13, 2021
1e59abc
Merge pull request #1592 from wmalpica/prerelease-21.08
wmalpica Aug 13, 2021
a34a089
Pin spdlog to match RAPIDS
raydouglass Aug 18, 2021
924605b
DOC Update changelog
raydouglass Aug 18, 2021
e90a645
Merge pull request #1595 from raydouglass/spdlog1.9
wmalpica Aug 18, 2021
de3b0e8
Restrict google-cloud-cpp version
raydouglass Aug 19, 2021
9efa6dd
DOC Update changelog
raydouglass Aug 19, 2021
bb48f0c
Ensure librmm comes from rapidai channel
raydouglass Aug 20, 2021
cc513aa
Fix dependencies.sh to match meta.yaml
raydouglass Aug 20, 2021
75398f7
Merge pull request #1597 from raydouglass/google-cloud-cpp-version
wmalpica Aug 20, 2021
98d8bab
Update README.md
wmalpica Aug 30, 2021
34b119e
Merge remote-tracking branch 'rapidsai/branch-21.08' into branch-21.10
raydouglass Sep 13, 2021
53f6490
remove empty file
trxcllnt Sep 18, 2021
3918ef2
remove stream param from make_strings_column
trxcllnt Sep 18, 2021
a56f634
vendor the old strings -> dtypes logic that was removed from cudf::io…
trxcllnt Sep 18, 2021
150baec
update groupby and rolling aggregation types and factory functions
trxcllnt Sep 18, 2021
cd271f5
use data_from_unique_ptr instead of the removed Table.from_unique_ptr
trxcllnt Sep 20, 2021
54fe7cd
remove unused import
trxcllnt Sep 20, 2021
a40fb00
update cudf._lib.types import names
trxcllnt Sep 21, 2021
d17342f
use rapidsai/blazingsql-testing-files instead of BlazingDB/blazingsql…
trxcllnt Sep 22, 2021
0379e2d
spread return of data_from_unique_ptr as args list to DataFrame._from…
trxcllnt Sep 22, 2021
d0205d0
fix cudf imports
trxcllnt Sep 22, 2021
52ae55c
update for new cudf::io::table_input_metadata API
trxcllnt Sep 23, 2021
3342f29
Merge pull request #1 from trxcllnt/fix/21.10-cudf-groupby-api-change
ajschmidt8 Sep 23, 2021
47ace5e
Disable uploads to `blazing` channels
ajschmidt8 Sep 24, 2021
e7241a7
Merge pull request #2 from ajschmidt8/disable-blazing-upload
ajschmidt8 Sep 24, 2021
747a240
Merge commit '2a6977d00a095b28e39a4d047572665f08b26f29' into fix_doub…
jglaser Sep 28, 2021
a35643d
Merge pull request #1600 from BlazingDB/fix_double_free
wmalpica Sep 30, 2021
8018b82
Merge pull request #3 from BlazingDB/branch-21.08
raydouglass Oct 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 21 additions & 30 deletions engine/src/bmr/BufferProvider.cpp
Original file line number Diff line number Diff line change
@@ -26,23 +26,23 @@ void pinned_allocator::setUcpContext(ucp_context_h _context)
use_ucx = true;
}

void base_allocator::allocate(void ** ptr, std::size_t size){
do_allocate(ptr,size);
void base_allocator::allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr){
do_allocate(ptr,size,mem_handle_ptr);
}

void base_allocator::deallocate(void * ptr){
do_deallocate(ptr);
void base_allocator::deallocate(void * ptr, ucp_mem_h mem_handle){
do_deallocate(ptr,mem_handle);
}

void host_allocator::do_allocate(void ** ptr, std::size_t size){
void host_allocator::do_allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr){

*ptr = aligned_alloc( BLAZING_ALIGNMENT, size );
if (!ptr) {
throw std::runtime_error("Couldn't perform host allocation.");
}
}

void pinned_allocator::do_allocate(void ** ptr, std::size_t size){
void pinned_allocator::do_allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr){

// do we really want to do a host allocation instead of a device one? (have to try zero-copy later)
cudaError_t err = cudaMallocHost(ptr, size);
@@ -60,19 +60,19 @@ void pinned_allocator::do_allocate(void ** ptr, std::size_t size){
mem_map_params.length = size;
mem_map_params.flags = 0; // try UCP_MEM_MAP_NONBLOCK

ucs_status_t status = ucp_mem_map(context, &mem_map_params, &mem_handle);
ucs_status_t status = ucp_mem_map(context, &mem_map_params, mem_handle_ptr);
if (status != UCS_OK)
{
throw std::runtime_error("Error on ucp_mem_map");
}
}
}

void host_allocator::do_deallocate(void * ptr){
void host_allocator::do_deallocate(void * ptr, ucp_mem_h mem_handle){
free(ptr);
}

void pinned_allocator::do_deallocate(void * ptr){
void pinned_allocator::do_deallocate(void * ptr, ucp_mem_h mem_handle){
if (use_ucx)
{
ucs_status_t status = ucp_mem_unmap(context, mem_handle);
@@ -91,7 +91,6 @@ void pinned_allocator::do_deallocate(void * ptr){
allocation_pool::allocation_pool(std::unique_ptr<base_allocator> allocator, std::size_t size_buffers, std::size_t num_buffers) :
num_buffers (num_buffers), buffer_size(size_buffers), allocator(std::move(allocator)) {
this->buffer_counter = 0; // this will get incremented by grow()
this->allocation_counter = 0;
this->grow();

}
@@ -120,7 +119,6 @@ std::unique_ptr<blazing_allocation_chunk> allocation_pool::get_chunk() {
}
for(auto & allocation : allocations){
if(!allocation->allocation_chunks.empty()){
this->allocation_counter++;
auto temp = std::move(allocation->allocation_chunks.top());
allocation->allocation_chunks.pop();

@@ -141,7 +139,7 @@ void allocation_pool::grow() {
allocations.back()->index = this->allocations.size() - 1;
auto last_index = allocations.size() -1;
try{
allocator->allocate((void **) &allocations[last_index]->data,num_new_buffers * buffer_size);
allocator->allocate((void **) &allocations[last_index]->data,num_new_buffers * buffer_size, &allocations[last_index]->mem_handle);
this->allocations[last_index]->total_number_of_chunks = num_new_buffers;
for (int buffer_index = 0; buffer_index < num_new_buffers; buffer_index++) {
auto buffer = std::make_unique<blazing_allocation_chunk>();
@@ -162,34 +160,32 @@ void allocation_pool::free_chunk(std::unique_ptr<blazing_allocation_chunk> buffe
std::unique_lock<std::mutex> lock(in_use_mutex);
const std::size_t idx = buffer->allocation->index;

if (idx+1 > this->allocations.size()) {
if (idx >= this->allocations.size()) {
std::shared_ptr<spdlog::logger> logger = spdlog::get("batch_logger");
if(logger){
logger->error("|||{0}|||||","free_chunk cannot delete an invalid allocation.");
}
assert(("free_chunk cannot delete an invalid allocation.", idx < this->allocations.size()));
}

buffer->allocation->allocation_chunks.push(std::move(buffer));
this->allocations.at(idx)->allocation_chunks.push(std::move(buffer));

if (idx > 0) {
if (this->allocations.at(idx)->total_number_of_chunks == this->allocations.at(idx)->allocation_chunks.size()) {
auto it = this->allocations.begin();
std::advance(it, idx);
if ((*it)->data != nullptr) {
this->allocator->deallocate((*it)->data);
this->allocations.erase(it);

if (this->allocations.at(idx)->data != nullptr) {
this->buffer_counter -= this->allocations.at(idx)->total_number_of_chunks;
this->allocator->deallocate(this->allocations.at(idx)->data, this->allocations.at(idx)->mem_handle);

// for all allocations after the pos at idx
// we need to update the allocation.index after we deleted one
for (std::size_t i = idx; i < this->allocations.size(); ++i) {
this->allocations[i]->index = this->allocations[i]->index - 1;
}

this->allocations.erase(this->allocations.begin() + idx);
}
}
}

this->allocation_counter--;
}
}


@@ -202,10 +198,9 @@ void allocation_pool::free_all() {
auto buffer = std::move(allocation->allocation_chunks.top());
allocation->allocation_chunks.pop();
}
allocator->deallocate(allocation->data);
allocator->deallocate(allocation->data, allocation->mem_handle);
}
allocations.resize(0);
this->allocation_counter = 0;
allocations.resize(0);
}
}

@@ -240,10 +235,6 @@ void empty_pools(){
buffer_providers::get_host_buffer_provider()->free_all();
buffer_providers::get_pinned_buffer_provider()->free_all();
}
std::size_t allocation_pool::get_allocated_buffers(){
return allocation_counter;
}


std::size_t allocation_pool::get_total_buffers(){
return buffer_counter;
35 changes: 9 additions & 26 deletions engine/src/bmr/BufferProvider.h
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ struct blazing_allocation{
char *data; // the pointer to the allocated memory
std::stack< std::unique_ptr<blazing_allocation_chunk> > allocation_chunks; // These are the available chunks that are part of the allocation.
allocation_pool * pool; // this is the pool that was used to make this allocation, and therefore this is what we would use to free it
ucp_mem_h mem_handle; // this is a memhandle used by UCX
};

struct blazing_allocation_chunk{
@@ -56,25 +57,20 @@ struct blazing_chunked_column_info {
class base_allocator{
public:
base_allocator() {}
void allocate(void ** ptr, std::size_t size);
void deallocate(void * ptr);

virtual ucp_mem_h getUcpMemoryHandle() const
{
throw std::runtime_error("getUcpMemoryHandle not implemented in base class");
}
void allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr);
void deallocate(void * ptr, ucp_mem_h mem_handle);

protected:
virtual void do_allocate(void ** ptr, std::size_t size) = 0;
virtual void do_deallocate(void * ptr) = 0;
virtual void do_allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr) = 0;
virtual void do_deallocate(void * ptr, ucp_mem_h mem_handle) = 0;
};

class host_allocator : public base_allocator {
public:
host_allocator(bool use_ucx) {}
protected:
void do_allocate(void ** ptr, std::size_t size);
void do_deallocate(void * ptr);
void do_allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr);
void do_deallocate(void * ptr, ucp_mem_h mem_handle);
};

class pinned_allocator : public base_allocator {
@@ -83,17 +79,11 @@ class pinned_allocator : public base_allocator {

void setUcpContext(ucp_context_h context);

virtual ucp_mem_h getUcpMemoryHandle() const
{
return mem_handle;
}

protected:
void do_allocate(void ** ptr, std::size_t size);
void do_deallocate(void * ptr);
void do_allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr);
void do_deallocate(void * ptr, ucp_mem_h mem_handle);
bool use_ucx;
ucp_context_h context;
ucp_mem_h mem_handle;
};

class allocation_pool {
@@ -104,11 +94,6 @@ class allocation_pool {

std::unique_ptr<blazing_allocation_chunk> get_chunk();

ucp_mem_h getUcpMemoryHandle() const
{
return allocator->getUcpMemoryHandle();
}

void free_chunk(std::unique_ptr<blazing_allocation_chunk> allocation);

std::size_t size_buffers();
@@ -131,8 +116,6 @@ class allocation_pool {

int buffer_counter;

int allocation_counter;

std::vector<std::unique_ptr<blazing_allocation> > allocations;

std::unique_ptr<base_allocator> allocator;
47 changes: 46 additions & 1 deletion engine/tests/allocation_pool/allocation_pool.cpp
Original file line number Diff line number Diff line change
@@ -97,7 +97,8 @@ TEST_F(AllocationPoolTest, mem_map_test) {
ral::memory::set_allocation_pools(size_buffers_host, num_buffers_host,
size_buffers_pinned, num_buffers_pinned, map_ucx, context);

ucp_mem_h handle = ral::memory::buffer_providers::get_pinned_buffer_provider()->getUcpMemoryHandle();
std::unique_ptr<ral::memory::blazing_allocation_chunk> allocation_chunk = ral::memory::buffer_providers::get_pinned_buffer_provider()->get_chunk();
ucp_mem_h handle = allocation_chunk->allocation->mem_handle;
ucp_mem_attr_t attr;
std::memset(&attr, 0, sizeof(ucp_mem_attr_t));
// check that it is mapped
@@ -111,3 +112,47 @@ TEST_F(AllocationPoolTest, mem_map_test) {
ASSERT_TRUE(attr.length != 0);
ral::memory::empty_pools();
}


TEST_F(AllocationPoolTest, get_chuck_free_chunk) {
std::size_t size_buffers_host = 1000000;
std::size_t num_buffers_host = 100;
std::size_t size_buffers_pinned = 1000000;
std::size_t num_buffers_pinned = 100;
bool map_ucx = true;

auto context = CreateUcpContext();
ral::memory::set_allocation_pools(size_buffers_host, num_buffers_host,
size_buffers_pinned, num_buffers_pinned, map_ucx, context);

// lets make some buffers
std::vector<std::unique_ptr<ral::memory::blazing_allocation_chunk> > raw_buffers0, raw_buffers1, raw_buffers2;
for (int i = 0; i < num_buffers_pinned; i++){
raw_buffers0.push_back(std::move(ral::memory::buffer_providers::get_pinned_buffer_provider()->get_chunk()));
}
for (int i = 0; i < num_buffers_pinned; i++){
raw_buffers1.push_back(std::move(ral::memory::buffer_providers::get_pinned_buffer_provider()->get_chunk()));
}
for (int i = 0; i < num_buffers_pinned; i++){
raw_buffers2.push_back(std::move(ral::memory::buffer_providers::get_pinned_buffer_provider()->get_chunk()));
}

// lets free them in a different order and make sure we handle that correctly
for(auto i = 0; i < raw_buffers2.size(); i++){
auto pool = raw_buffers2[i]->allocation->pool;
pool->free_chunk(std::move(raw_buffers2[i]));
}
for(auto i = 0; i < raw_buffers1.size(); i++){
auto pool = raw_buffers1[i]->allocation->pool;
pool->free_chunk(std::move(raw_buffers1[i]));
}
for(auto i = 0; i < raw_buffers0.size(); i++){
auto pool = raw_buffers0[i]->allocation->pool;
pool->free_chunk(std::move(raw_buffers0[i]));
}
ASSERT_TRUE(true);


}