-
Notifications
You must be signed in to change notification settings - Fork 6.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replication of user-defined SQL functions using ZooKeeper #46085
Replication of user-defined SQL functions using ZooKeeper #46085
Conversation
61bcfd7
to
722fb6f
Compare
80e6813
to
302784e
Compare
302784e
to
a48a7ac
Compare
String path2 = path + "/" + escaped_loader_zk_path; | ||
|
||
if (zk->exists(path2 + "/functions")) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would be easier to use designated if with temporary variables instead of path2
and path3
?
String path2 = path + "/" + escaped_loader_zk_path; | |
if (zk->exists(path2 + "/functions")) | |
{ | |
if (String func_path = fmt::format("{}/{}/functions", path, escaped_loader_path); zk->exists(func_path)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored a bit
void addDirectory(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id, const String & dir_path); | ||
|
||
/// Returns all added paths to directories if `host_id` is a host chosen to store user-defined SQL objects. | ||
Strings getDirectories(const String & loader_zk_path, UserDefinedSQLObjectType object_type, const String & host_id) const; | ||
|
||
private: | ||
using ZkPathAndObjectType = std::pair<String, UserDefinedSQLObjectType>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I'd recommend changing
ZkPathAndObjectType
to a structureZkPathAndObject
. Usingstd::pair
andstd::tuple
is usually error-prone. You can still use uniform initialization{zk_path, object_type}
or (unlikestd::pair
) you can use designated initializers which account for better readability. The wordType
in my opinion does not add any information (asusing
keyword denotes a type) and should be removed. - I'd recommend changing
addDirectory
andgetDirectories
signatures to takeZkPathAndObject
(the same generally applies to src/Backups/IBackupCoordination.h).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for 1:
- First, I want to point that it is consistent with code for access_storage.
- The
...Type
here is a part of...ObjectType
and not the whole alias. It denotes second element of pair. JustObject
would be incorrect. std::pair
is not always error-prone. For example, like here it is used as a key formap
which is quite a common practice. Because it has predefined comparisons operators (<=> actually). And it is not accessed via itfirst, second
which is really ugly.
As for 2:
- Frankly, i don't see need to introduce another indirection level for parameters. At least for now.
bool acquireReplicatedSQLObjects(const String & loader_zk_path, UserDefinedSQLObjectType object_type) override; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another small question about naming: are there any other replicated SQL objects that are processed, or it's only about UDF?. In the latter case would you consider renaming functions to smth. like acquireReplicatedUDF
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are no others at the moment. Only UDF. I think it is better to leave it as it is. The function can start working with new type (with a small modification, if there will be any.
namespace zkutil | ||
{ | ||
|
||
class ZooKeeperCachingGetter : boost::noncopyable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's up to ClickHouse maintainers to decide, but I tend to believe this feature (which in turn could be done with CacheBase) should be separated to another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CacheBase
functionality is definitely not needed here. Perhaps the word caching confused you. What it means here is that it returns previously saved session if it is not expired or recreated the new one otherwise.
In general, this class could not been extracted but i found this useful.
const char * toString(UserDefinedSQLObjectType type); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to write such functions as ClickHouse has enum reflection
and you can simply write fmt::format({}, my_enum)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did it for consistency with the rest of the code, for example.
Is it accepted in the project not to write any more toString
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to adopt no-more-boilerplate practices, and I'm currently working actively to remove all usages of toString(), but it's up to you as for now I guess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it. And used enum reflection capabilities.
UserDefinedSQLObjectsLoaderFromZooKeeper::UserDefinedSQLObjectsLoaderFromZooKeeper( | ||
const ContextPtr & global_context_, const String & zookeeper_path_) | ||
: global_context{global_context_} | ||
, zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Umm... lambda captures parameter by copy in this case and stores its own instance of the shared_ptr
. But if wrote [&global_context]
then yes lambda would store the reference.
|
||
ThreadFromGlobalPool watching_thread; | ||
std::atomic<bool> watching_flag = false; | ||
std::shared_ptr<ConcurrentBoundedQueue<std::pair<UserDefinedSQLObjectType, String>>> watch_queue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider reusing ZkPathAndObject
from https://github.com/ClickHouse/ClickHouse/pull/46085/files#diff-16b6bfd00f65d958566847206d85dbb1b9ae301b7bec9df2616878ba83a9dbf0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String
in watch_queue
is not a zk path, it's the name of an object. Apparently we need a comment here about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added self-documenting alias for pair.
|
||
UserDefinedSQLObjectsLoaderFromZooKeeper::~UserDefinedSQLObjectsLoaderFromZooKeeper() | ||
{ | ||
SCOPE_EXIT_SAFE(stopWatchingThread()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SCOPE_EXIT
macro is used to simplify control flow in case of early exits. This destructor does not have any complex control flow statements, maybe you should replace to
SCOPE_EXIT_SAFE(stopWatchingThread()); | |
stopWatchingThread(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is used here is primarily that it does try/catch with logging and gets rid of a little boilerplate code.
The question here is rather whether an abnormal fail is allowed here, in case of an exception when the thread is stopped. So far, I've decided it's better to log it
size_t num_attempts = 10; | ||
while (true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
size_t num_attempts = 10; | |
while (true) | |
for(size_t i = 0; i < num_attempts; ++i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure your version is much better, note the exception generation below after the last failed attempt.
auto object_list_watcher = [watch_queue = watch_queue, object_type](const Coordination::WatchResponse &) | ||
{ [[maybe_unused]] bool push_result = watch_queue->emplace(object_type, ""); }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just ignore the emplace result or use the [[maybe_unused]]
attribute without creating a temporary variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emplace
is marked by [[nodiscard]] attribute. As for [[maybe_unused]], it can be applicable only to declarations (as variable in this case), but not function calling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
push_result
is not a very clear name. Let's call it inserted
, and let's check it with chassert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Did it
UserDefinedSQLObjectsLoaderFromZooKeeper::UserDefinedSQLObjectsLoaderFromZooKeeper( | ||
const ContextPtr & global_context_, const String & zookeeper_path_) | ||
: global_context{global_context_} | ||
, zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Umm... lambda captures parameter by copy in this case and stores its own instance of the shared_ptr
. But if wrote [&global_context]
then yes lambda would store the reference.
src/Common/ConcurrentBoundedQueue.h
Outdated
@@ -109,7 +109,7 @@ class ConcurrentBoundedQueue | |||
template <typename... Args> | |||
[[nodiscard]] bool emplace(Args &&... args) | |||
{ | |||
return emplaceImpl(std::nullopt /* timeout in milliseconds */, std::forward<Args...>(args...)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial version was incorrect, actually. It forwarded all pack at once but forwarding each argument is needed, otherwise value categories wouldn't be preserved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the former variant std::forward<Args...>(args...)
is specialized to smth like this std::forward<Args_1, Args_2, ...>(args_1, args_2, ...));
that is incorrect because std::forward
accepts only one argument.
And thanks to @myrrc for the link to the documentation
|
||
/// Returns the ZooKeeper session and the flag whether it was taken from the cache(false) or opened new(true), | ||
/// because the session has expired or the cache was empty | ||
std::pair<zkutil::ZooKeeperPtr, bool> getZooKeeper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enum
instead of bool
here could look better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reworked to use nested enum
@@ -9,4 +9,6 @@ enum class UserDefinedSQLObjectType | |||
Function | |||
}; | |||
|
|||
const char * toString(UserDefinedSQLObjectType type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this function called anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it
{ | ||
const auto object_watcher = [watch_queue = watch_queue, object_type, object_name](const Coordination::WatchResponse & response) | ||
{ | ||
if (response.type == Coordination::Event::CHANGED) [[maybe_unused]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe_unused
is specified at the wrong line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected this
a48a7ac
to
a874b9d
Compare
a874b9d
to
886b530
Compare
Enabled by specifying a key
user_defined_zookeeper_path
in theconfig.xml
.BACKUP/RESTORE TABLE system.functions ON CLUSTER
for consistent behaviour.This PR is largely based on this PR #42145.
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Add replication of user-defined SQL functions using ZooKeeper