Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pd_client: Add getAllStore API #126

Merged
merged 1 commit into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions include/pingcap/pd/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,18 @@ class Client : public IClient

void update(const std::vector<std::string> & addrs, const ClusterConfig & config) override;

//uint64_t getClusterID() override;

// only implement a weak get ts.
uint64_t getTS() override;

std::pair<metapb::Region, metapb::Peer> getRegionByKey(const std::string & key) override;

//std::pair<metapb::Region, metapb::Peer> getPrevRegion(std::string key) override;

std::pair<metapb::Region, metapb::Peer> getRegionByID(uint64_t region_id) override;

metapb::Store getStore(uint64_t store_id) override;

bool isClusterBootstrapped() override;
std::vector<metapb::Store> getAllStores(bool exclude_tombstone) override;

//std::vector<metapb::Store> getAllStores() override;
bool isClusterBootstrapped() override;

uint64_t getGCSafePoint() override;

Expand Down Expand Up @@ -101,6 +97,7 @@ class Client : public IClient

std::shared_ptr<PDConnClient> getOrCreateGRPCConn(const std::string &);

private:
std::shared_mutex leader_mutex;

std::mutex channel_map_mutex;
Expand Down
6 changes: 1 addition & 5 deletions include/pingcap/pd/IClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,21 @@ namespace pd
class IClient
{
public:
// virtual uint64_t getClusterID() = 0;

virtual ~IClient() = default;

virtual uint64_t getTS() = 0;

// return region meta and leader peer.
virtual std::pair<metapb::Region, metapb::Peer> getRegionByKey(const std::string & key) = 0;

// virtual std::pair<metapb::Region, metapb::Peer> getPrevRegion(std::string key) = 0;

// return region meta and leader peer.
virtual std::pair<metapb::Region, metapb::Peer> getRegionByID(uint64_t region_id) = 0;

virtual metapb::Store getStore(uint64_t store_id) = 0;

virtual bool isClusterBootstrapped() = 0;

// virtual std::vector<metapb::Store> getAllStores() = 0;
virtual std::vector<metapb::Store> getAllStores(bool exclude_tombstone) = 0;

virtual uint64_t getGCSafePoint() = 0;

Expand Down
1 change: 1 addition & 0 deletions include/pingcap/pd/MockPDClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class MockPDClient : public IClient
std::pair<metapb::Region, metapb::Peer> getRegionByID(uint64_t) override { throw Exception("not implemented", pingcap::ErrorCodes::UnknownError); }

metapb::Store getStore(uint64_t) override { throw Exception("not implemented", pingcap::ErrorCodes::UnknownError); }
std::vector<metapb::Store> getAllStores(bool) override { throw Exception("not implemented", pingcap::ErrorCodes::UnknownError); }

bool isClusterBootstrapped() override { return true; }

Expand Down
33 changes: 31 additions & 2 deletions src/pd/Client.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#include <Poco/URI.h>
#include <grpcpp/client_context.h>
#include <kvproto/pdpb.pb.h>
#include <pingcap/SetThreadName.h>
#include <pingcap/pd/Client.h>

#include <chrono>
#include <mutex>

namespace pingcap
Expand Down Expand Up @@ -38,6 +41,7 @@ Client::Client(const std::vector<std::string> & addrs, const ClusterConfig & con
, loop_interval(100)
, update_leader_interval(60)
, cluster_id(0)
, work_threads_stop(false)
, check_leader(false)
, log(&Logger::get("pingcap.pd"))
{
Expand All @@ -48,8 +52,6 @@ Client::Client(const std::vector<std::string> & addrs, const ClusterConfig & con

initLeader();

work_threads_stop = false;

work_thread = std::thread([&]() { leaderLoop(); });

check_leader.store(false);
Expand Down Expand Up @@ -371,6 +373,33 @@ std::pair<metapb::Region, metapb::Peer> Client::getRegionByID(uint64_t region_id
return std::make_pair(response.region(), response.leader());
}

std::vector<metapb::Store> Client::getAllStores(bool exclude_tombstone)
{
pdpb::GetAllStoresRequest req;
pdpb::GetAllStoresResponse resp;

req.set_allocated_header(requestHeader());
req.set_exclude_tombstone_stores(exclude_tombstone);

grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + pd_timeout);

auto status = leaderClient()->stub->GetAllStores(&context, req, &resp);
if (!status.ok())
{
std::string err_msg = ("get all stores failed: " + std::to_string(status.error_code()) + ": " + status.error_message());
log->error(err_msg);
check_leader.store(true);
throw Exception(err_msg, GRPCErrorCode);
}

std::vector<metapb::Store> all_stores;
all_stores.reserve(resp.stores_size());
for (const auto & s : resp.stores())
all_stores.emplace_back(s);
return all_stores;
}

metapb::Store Client::getStore(uint64_t store_id)
{
pdpb::GetStoreRequest request{};
Expand Down