Skip to content

Commit

Permalink
Add a Secondary Cache to cache gandiva object code
Browse files Browse the repository at this point in the history
Arrow gandiva has a primary cache but this cache doesn't persist across
restarts. Integrate a new API in project and filter make calls that
allow the user to specify the implementation of the secondary cache by
providing a c++ and java interface to this persistant cache.
  • Loading branch information
siddhantrao23 authored and lriggs committed Jul 21, 2023
1 parent 200ba20 commit 6fd5468
Show file tree
Hide file tree
Showing 17 changed files with 741 additions and 9 deletions.
5 changes: 5 additions & 0 deletions cpp/src/gandiva/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class Cache {
cache_.insert(cache_key, module);
}

void Clear() {
std::lock_guard<std::mutex> lock(mtx_);
cache_.clear();
}

private:
LruCache<KeyType, ValueType> cache_;
std::mutex mtx_;
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/gandiva/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ std::once_flag llvm_init_once_flag;
static bool llvm_init = false;
static llvm::StringRef cpu_name;
static llvm::SmallVector<std::string, 10> cpu_attrs;
static char* cpu_details_;

void Engine::InitOnce() {
DCHECK_EQ(llvm_init, false);
Expand All @@ -111,9 +112,20 @@ void Engine::InitOnce() {
}
ARROW_LOG(INFO) << "Detected CPU Name : " << cpu_name.str();
ARROW_LOG(INFO) << "Detected CPU Features:" << cpu_attrs_str;

std::string cpu_details = cpu_name.str();
cpu_details += cpu_attrs_str;
cpu_details_ = strdup(cpu_details.c_str());

llvm_init = true;
}

char* Engine::GetCpuIdentifier() {
DCHECK_EQ(llvm_init, true);

return cpu_details_;
}

Engine::Engine(const std::shared_ptr<Configuration>& conf,
std::unique_ptr<llvm::LLVMContext> ctx,
std::unique_ptr<llvm::ExecutionEngine> engine, llvm::Module* module,
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/gandiva/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class GANDIVA_EXPORT Engine {
/// Load the function IRs that can be accessed in the module.
Status LoadFunctionIRs();

static char* GetCpuIdentifier();

private:
Engine(const std::shared_ptr<Configuration>& conf,
std::unique_ptr<llvm::LLVMContext> ctx,
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/gandiva/expression_cache_key.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <stddef.h>

#include <sstream>
#include <thread>

#include "arrow/util/hash_util.h"
Expand Down Expand Up @@ -109,6 +110,18 @@ class ExpressionCacheKey {

bool operator!=(const ExpressionCacheKey& other) const { return !(*this == other); }

std::string ToString() {
std::stringstream s;

s << schema_->ToString() << "\n";
s << mode_ << configuration_->Hash() << uniqifier_ << "\n";
for (std::string expr : expressions_as_strings_) {
s << expr << "\n";
}

return s.str();
}

private:
size_t hash_code_;
SchemaPtr schema_;
Expand Down
44 changes: 44 additions & 0 deletions cpp/src/gandiva/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,15 @@ Filter::Filter(std::unique_ptr<LLVMGenerator> llvm_generator, SchemaPtr schema,

Filter::~Filter() {}

Status Filter::Make(SchemaPtr schema, ConditionPtr condition,
std::shared_ptr<Configuration> config,
std::shared_ptr<Filter>* filter) {
return Make(schema, condition, config, nullptr, filter);
}

Status Filter::Make(SchemaPtr schema, ConditionPtr condition,
std::shared_ptr<Configuration> configuration,
std::shared_ptr<SecondaryCacheInterface> sec_cache,
std::shared_ptr<Filter>* filter) {
ARROW_RETURN_IF(schema == nullptr, Status::Invalid("Schema cannot be null"));
ARROW_RETURN_IF(condition == nullptr, Status::Invalid("Condition cannot be null"));
Expand Down Expand Up @@ -68,6 +75,20 @@ Status Filter::Make(SchemaPtr schema, ConditionPtr condition,
std::unique_ptr<LLVMGenerator> llvm_gen;
ARROW_RETURN_NOT_OK(LLVMGenerator::Make(configuration, is_cached, &llvm_gen));

if (!is_cached && sec_cache != nullptr) {
std::shared_ptr<arrow::Buffer> arrow_buffer =
sec_cache->Get(GetSecondaryCacheKey(cache_key.ToString()));
if (arrow_buffer != nullptr) {
is_cached = true;
llvm::StringRef string_buffer(reinterpret_cast<char*>(arrow_buffer->address()),
arrow_buffer->size());
std::unique_ptr<llvm::MemoryBuffer> obj_buffer =
llvm::MemoryBuffer::getMemBufferCopy(string_buffer);
std::shared_ptr<llvm::MemoryBuffer> sec_cached_obj = std::move(obj_buffer);
cache->PutObjectCode(cache_key, sec_cached_obj);
}
}

if (!is_cached) {
// Run the validation on the expression.
// Return if the expression is invalid since we will not be able to process further.
Expand All @@ -84,6 +105,15 @@ Status Filter::Make(SchemaPtr schema, ConditionPtr condition,
*filter = std::make_shared<Filter>(std::move(llvm_gen), schema, configuration);
filter->get()->SetBuiltFromCache(is_cached);

// insert into the secondary cache, if present
if (sec_cache != nullptr && is_cached == false) {
std::shared_ptr<llvm::MemoryBuffer> sec_cached_obj = cache->GetObjectCode(cache_key);
llvm::StringRef string_buffer = sec_cached_obj->getBuffer();
std::shared_ptr<arrow::Buffer> arrow_buffer =
arrow::Buffer::FromString(string_buffer.str());
sec_cache->Set(GetSecondaryCacheKey(cache_key.ToString()), arrow_buffer);
}

return Status::OK();
}

Expand Down Expand Up @@ -124,4 +154,18 @@ void Filter::SetBuiltFromCache(bool flag) { built_from_cache_ = flag; }

bool Filter::GetBuiltFromCache() { return built_from_cache_; }

// added method for testing the secondary cache
void Filter::Clear() {
std::shared_ptr<Cache<ExpressionCacheKey, std::shared_ptr<llvm::MemoryBuffer>>> cache =
LLVMGenerator::GetCache();
cache->Clear();
}

std::shared_ptr<arrow::Buffer> Filter::GetSecondaryCacheKey(std::string primaryKey) {
// compute key from primary key and cpu attributes
// cpu attributes are required as the compiled code depends on the cpu type and features
std::string key = std::string(Engine::GetCpuIdentifier()) + " | " + primaryKey;
return arrow::Buffer::FromString(key);
}

} // namespace gandiva
19 changes: 19 additions & 0 deletions cpp/src/gandiva/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "gandiva/arrow.h"
#include "gandiva/condition.h"
#include "gandiva/configuration.h"
#include "gandiva/secondary_cache.h"
#include "gandiva/selection_vector.h"
#include "gandiva/visibility.h"

Expand Down Expand Up @@ -68,6 +69,19 @@ class GANDIVA_EXPORT Filter {
std::shared_ptr<Configuration> config,
std::shared_ptr<Filter>* filter);

/// \brief Build a filter for the given schema and condition.
/// Customize the filter with runtime configuration.
///
/// \param[in] schema schema for the record batches, and the condition.
/// \param[in] condition filter conditions.
/// \param[in] config run time configuration.
/// \param[in] sec_cache object to enable jni upcalls.
/// \param[out] filter the returned filter object
static Status Make(SchemaPtr schema, ConditionPtr condition,
std::shared_ptr<Configuration> config,
std::shared_ptr<SecondaryCacheInterface> sec_cache,
std::shared_ptr<Filter>* filter);

/// Evaluate the specified record batch, and populate output selection vector.
///
/// \param[in] batch the record batch. schema should be the same as the one in 'Make'
Expand All @@ -82,7 +96,12 @@ class GANDIVA_EXPORT Filter {

bool GetBuiltFromCache();

void Clear();

private:
// Create an arrow buffer with the key for the secondary cache.
static std::shared_ptr<arrow::Buffer> GetSecondaryCacheKey(std::string primaryKey);

std::unique_ptr<LLVMGenerator> llvm_generator_;
SchemaPtr schema_;
std::shared_ptr<Configuration> configuration_;
Expand Down
53 changes: 53 additions & 0 deletions cpp/src/gandiva/projector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs,
SelectionVector::Mode selection_vector_mode,
std::shared_ptr<Configuration> configuration,
std::shared_ptr<Projector>* projector) {
return Projector::Make(schema, exprs, selection_vector_mode, configuration, nullptr,
projector);
}

Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs,
std::shared_ptr<Configuration> configuration,
std::shared_ptr<SecondaryCacheInterface> sec_cache,
std::shared_ptr<Projector>* projector) {
return Projector::Make(schema, exprs, SelectionVector::Mode::MODE_NONE, configuration,
sec_cache, projector);
}

Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs,
SelectionVector::Mode selection_vector_mode,
std::shared_ptr<Configuration> configuration,
std::shared_ptr<SecondaryCacheInterface> sec_cache,
std::shared_ptr<Projector>* projector) {
ARROW_RETURN_IF(schema == nullptr, Status::Invalid("Schema cannot be null"));
ARROW_RETURN_IF(exprs.empty(), Status::Invalid("Expressions cannot be empty"));
ARROW_RETURN_IF(configuration == nullptr,
Expand Down Expand Up @@ -83,6 +100,20 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs,
std::unique_ptr<LLVMGenerator> llvm_gen;
ARROW_RETURN_NOT_OK(LLVMGenerator::Make(configuration, is_cached, &llvm_gen));

if (!is_cached && sec_cache != nullptr) {
std::shared_ptr<arrow::Buffer> arrow_buffer =
sec_cache->Get(GetSecondaryCacheKey(cache_key.ToString()));
if (arrow_buffer != nullptr) {
is_cached = true;
llvm::StringRef string_buffer(reinterpret_cast<char*>(arrow_buffer->address()),
arrow_buffer->size());
std::unique_ptr<llvm::MemoryBuffer> obj_buffer =
llvm::MemoryBuffer::getMemBufferCopy(string_buffer);
std::shared_ptr<llvm::MemoryBuffer> sec_cached_obj = std::move(obj_buffer);
cache->PutObjectCode(cache_key, sec_cached_obj);
}
}

// Run the validation on the expressions.
// Return if any of the expression is invalid since
// we will not be able to process further.
Expand Down Expand Up @@ -110,6 +141,14 @@ Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs,
new Projector(std::move(llvm_gen), schema, output_fields, configuration));
projector->get()->SetBuiltFromCache(is_cached);

if (sec_cache != nullptr && is_cached == false) {
std::shared_ptr<llvm::MemoryBuffer> sec_cached_obj = cache->GetObjectCode(cache_key);
llvm::StringRef string_buffer = sec_cached_obj->getBuffer();
std::shared_ptr<arrow::Buffer> arrow_buffer =
arrow::Buffer::FromString(string_buffer.str());
sec_cache->Set(GetSecondaryCacheKey(cache_key.ToString()), arrow_buffer);
}

return Status::OK();
}

Expand Down Expand Up @@ -286,4 +325,18 @@ void Projector::SetBuiltFromCache(bool flag) { built_from_cache_ = flag; }

bool Projector::GetBuiltFromCache() { return built_from_cache_; }

// added method for testing the secondary cache
void Projector::Clear() {
std::shared_ptr<Cache<ExpressionCacheKey, std::shared_ptr<llvm::MemoryBuffer>>> cache =
LLVMGenerator::GetCache();
cache->Clear();
}

std::shared_ptr<arrow::Buffer> Projector::GetSecondaryCacheKey(std::string primaryKey) {
// compute key from primary key and cpu attributes
// cpu attributes are required as the compiled code depends on the cpu type and features
std::string key = std::string(Engine::GetCpuIdentifier()) + " | " + primaryKey;
return arrow::Buffer::FromString(key);
}

} // namespace gandiva
34 changes: 34 additions & 0 deletions cpp/src/gandiva/projector.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "gandiva/arrow.h"
#include "gandiva/configuration.h"
#include "gandiva/expression.h"
#include "gandiva/secondary_cache.h"
#include "gandiva/selection_vector.h"
#include "gandiva/visibility.h"

Expand Down Expand Up @@ -77,6 +78,34 @@ class GANDIVA_EXPORT Projector {
std::shared_ptr<Configuration> configuration,
std::shared_ptr<Projector>* projector);

/// Build a projector for the given schema to evaluate the vector of expressions.
/// Customize the projector with a secondary cache
///
/// \param[in] schema schema for the record batches, and the expressions.
/// \param[in] exprs vector of expressions.
/// \param[in] configuration run time configuration.
/// \param[in] sec_cache object to enable jni upcalls.
/// \param[out] projector the returned projector object
static Status Make(SchemaPtr schema, const ExpressionVector& exprs,
std::shared_ptr<Configuration> configuration,
std::shared_ptr<SecondaryCacheInterface> sec_cache,
std::shared_ptr<Projector>* projector);

/// Build a projector for the given schema to evaluate the vector of expressions.
/// Customize the projector with a secondary cache
///
/// \param[in] schema schema for the record batches, and the expressions.
/// \param[in] exprs vector of expressions.
/// \param[in] selection_vector_mode mode of selection vector
/// \param[in] configuration run time configuration.
/// \param[in] sec_cache object to enable jni upcalls.
/// \param[out] projector the returned projector object
static Status Make(SchemaPtr schema, const ExpressionVector& exprs,
SelectionVector::Mode selection_vector_mode,
std::shared_ptr<Configuration> configuration,
std::shared_ptr<SecondaryCacheInterface> sec_cache,
std::shared_ptr<Projector>* projector);

/// Evaluate the specified record batch, and return the allocated and populated output
/// arrays. The output arrays will be allocated from the memory pool 'pool', and added
/// to the vector 'output'.
Expand Down Expand Up @@ -124,6 +153,8 @@ class GANDIVA_EXPORT Projector {

bool GetBuiltFromCache();

void Clear();

private:
Projector(std::unique_ptr<LLVMGenerator> llvm_generator, SchemaPtr schema,
const FieldVector& output_fields, std::shared_ptr<Configuration>);
Expand All @@ -139,6 +170,9 @@ class GANDIVA_EXPORT Projector {
/// Validate the common args for Evaluate() APIs.
Status ValidateEvaluateArgsCommon(const arrow::RecordBatch& batch) const;

// Create an arrow buffer with the key for the secondary cache.
static std::shared_ptr<arrow::Buffer> GetSecondaryCacheKey(std::string primaryKey);

std::unique_ptr<LLVMGenerator> llvm_generator_;
SchemaPtr schema_;
FieldVector output_fields_;
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/gandiva/secondary_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "arrow/buffer.h"

namespace gandiva {

// secondary cache c++ interface
class GANDIVA_EXPORT SecondaryCacheInterface {
public:
virtual std::shared_ptr<arrow::Buffer> Get(
std::shared_ptr<arrow::Buffer> serialized_expr) = 0;

virtual void Set(std::shared_ptr<arrow::Buffer> serialized_expr,
std::shared_ptr<arrow::Buffer> value) = 0;

virtual ~SecondaryCacheInterface() {}
};

} // namespace gandiva
1 change: 1 addition & 0 deletions cpp/src/gandiva/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ add_gandiva_test(null_validity_test)
add_gandiva_test(decimal_test)
add_gandiva_test(decimal_single_test)
add_gandiva_test(filter_project_test)
add_gandiva_test(secondary_cache_test)

if(ARROW_BUILD_STATIC)
add_gandiva_test(projector_test_static SOURCES projector_test.cc USE_STATIC_LINKING)
Expand Down
Loading

0 comments on commit 6fd5468

Please sign in to comment.