Skip to content

Commit

Permalink
Merge branch 'master' into format_fetchvertices
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 authored Sep 7, 2021
2 parents 736c530 + 4d04c15 commit 0786c4d
Show file tree
Hide file tree
Showing 45 changed files with 659 additions and 130 deletions.
10 changes: 8 additions & 2 deletions .github/actions/tagname-action/README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
# Extract tag name

Extract tag name from release branch
Extract tag information from release branch

## Outputs

### `tag`

tag name

### `tagnum`

tag number

## Example usage

```yaml
- uses: ./.github/actions/tagname-action
id: tag

- name: Other step
run: echo ${{ steps.tag.outputs.tag }}
run: |
echo ${{ steps.tag.outputs.tag }}
echo ${{ steps.tag.outputs.tagnum }}
```
9 changes: 7 additions & 2 deletions .github/actions/tagname-action/action.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
name: 'Extract tag name'
description: 'Extract tag name'
name: 'Extract tag information'
description: 'Extract tag information'
outputs:
tag:
description: 'tag name'
value: ${{ steps.tag.outputs.tag }}
tagnum:
description: 'tag number'
value: ${{ steps.tag.outputs.tagnum }}
runs:
using: "composite"
steps:
- id: tag
run: |
tag=$(echo ${{ github.ref }} | rev | cut -d/ -f1 | rev)
tagnum=$(echo $tag |sed 's/^v//')
echo "::set-output name=tag::$tag"
echo "::set-output name=tagnum::$tagnum"
shell: bash
21 changes: 7 additions & 14 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,27 @@ jobs:
- centos8
container:
image: vesoft/nebula-dev:${{ matrix.os }}
env:
BUILD_DIR: ./pkg-build
CPACK_DIR: ./pkg-build/cpack_output
steps:
- uses: actions/checkout@v1
- uses: ./.github/actions/tagname-action
id: tag
- name: package
run: ./package/package.sh -b ${{ steps.tag.outputs.tag }}
- name: output some vars
id: vars
env:
SHA_EXT: sha256sum.txt
run: |
tag=$(echo ${{ github.ref }} | rev | cut -d/ -f1 | rev)
filename=$(find pkg-build/cpack_output -type f \( -iname \*.deb -o -iname \*.rpm \))
sha256sum $filename > $filename.$SHA_EXT
subdir=$(echo $tag |sed 's/^v//')
echo "::set-output name=subdir::$subdir"
- uses: ./.github/actions/upload-assets-action
with:
asset-path: pkg-build/cpack_output
tag: ${{ steps.tag.outputs.tag }}
tar zcf ${{ env.CPACK_DIR }}/nebula-${{ steps.tag.outputs.tagnum }}.tar.gz --exclude=${{ env.BUILD_DIR }} ./*
find ${{ env.CPACK_DIR }} -type f \( -iname \*.deb -o -iname \*.rpm \) -exec bash -c "sha256sum {} > {}.sha256sum.txt" \;
- uses: ./.github/actions/upload-to-oss-action
with:
key-id: ${{ secrets.OSS_ID }}
key-secret: ${{ secrets.OSS_SECRET }}
endpoint: ${{ secrets.OSS_ENDPOINT }}
bucket: nebula-graph
asset-path: pkg-build/cpack_output
target-path: package/${{ steps.vars.outputs.subdir }}
asset-path: ${{ env.CPACK_DIR }}
target-path: package/${{ steps.tag.outputs.tagnum }}

docker_build:
name: docker-build
Expand Down
82 changes: 69 additions & 13 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no re
DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry");
DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout");
DEFINE_string(cluster_id_path, "cluster.id", "file path saved clusterId");

DEFINE_int32(check_plan_killed_frequency, 8, "check plan killed every 1<<n times");
namespace nebula {
namespace meta {

MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
std::vector<HostAddr> addrs,
const MetaClientOptions& options)
: ioThreadPool_(ioThreadPool), addrs_(std::move(addrs)), options_(options) {
: ioThreadPool_(ioThreadPool),
addrs_(std::move(addrs)),
options_(options),
sessionMap_(new SessionMap{}),
killedPlans_(new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>{}) {
CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required";
CHECK(!addrs_.empty()) << "No meta server address is specified or can be "
"solved. Meta server is required";
CHECK(!addrs_.empty())
<< "No meta server address is specified or can be solved. Meta server is required";
clientsMan_ = std::make_shared<thrift::ThriftClientManager<cpp2::MetaServiceAsyncClient>>();
updateActive();
updateLeader();
Expand All @@ -50,6 +54,8 @@ MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool

MetaClient::~MetaClient() {
stop();
delete sessionMap_.load();
delete killedPlans_.load();
VLOG(3) << "~MetaClient";
}

Expand Down Expand Up @@ -182,6 +188,11 @@ bool MetaClient::loadData() {
return false;
}

if (!loadSessions()) {
LOG(ERROR) << "Load sessions Failed";
return false;
}

auto ret = listSpaces().get();
if (!ret.ok()) {
LOG(ERROR) << "List space failed, status:" << ret.status();
Expand Down Expand Up @@ -997,8 +1008,7 @@ void MetaClient::loadRemoteListeners() {
}
}

/// ================================== public methods
/// =================================
/// ================================== public methods =================================

PartitionID MetaClient::partId(int32_t numParts, const VertexID id) const {
// If the length of the id is 8, we will treat it as int64_t to be compatible
Expand Down Expand Up @@ -2859,17 +2869,15 @@ bool MetaClient::loadCfg() {
// only load current module's config is enough
auto ret = listConfigs(gflagsModule_).get();
if (ret.ok()) {
// if we load config from meta server successfully, update gflags and set
// configReady_
// if we load config from meta server successfully, update gflags and set configReady_
auto items = ret.value();
MetaConfigMap metaConfigMap;
for (auto& item : items) {
std::pair<cpp2::ConfigModule, std::string> key = {item.get_module(), item.get_name()};
metaConfigMap.emplace(std::move(key), std::move(item));
}
{
// For any configurations that is in meta, update in cache to replace
// previous value
// For any configurations that is in meta, update in cache to replace previous value
folly::RWSpinLock::WriteHolder holder(configCacheLock_);
for (const auto& entry : metaConfigMap) {
auto& key = entry.first;
Expand Down Expand Up @@ -2964,9 +2972,8 @@ void MetaClient::loadLeader(const std::vector<cpp2::HostItem>& hostItems,
<< item.get_leader_parts().size() << " space";
}
{
// todo(doodle): in worst case, storage and meta isolated, so graph may get
// a outdate leader info. The problem could be solved if leader term are
// cached as well.
// todo(doodle): in worst case, storage and meta isolated, so graph may get a outdate
// leader info. The problem could be solved if leader term are cached as well.
LOG(INFO) << "Load leader ok";
folly::RWSpinLock::WriteHolder wh(leadersLock_);
leadersInfo_ = std::move(leaderInfo);
Expand Down Expand Up @@ -3486,5 +3493,54 @@ folly::Future<StatusOr<bool>> MetaClient::ingest(GraphSpaceID spaceId) {
return folly::async(func);
}

bool MetaClient::loadSessions() {
auto session_list = listSessions().get();
if (!session_list.ok()) {
LOG(ERROR) << "List sessions failed, status:" << session_list.status();
return false;
}
SessionMap* oldSessionMap = sessionMap_.load();
SessionMap* newSessionMap = new SessionMap(*oldSessionMap);
auto oldKilledPlan = killedPlans_.load();
auto newKilledPlan = new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>(*oldKilledPlan);
for (auto& session : session_list.value().get_sessions()) {
(*newSessionMap)[session.get_session_id()] = session;
for (auto& query : session.get_queries()) {
if (query.second.get_status() == cpp2::QueryStatus::KILLING) {
newKilledPlan->insert({session.get_session_id(), query.first});
}
}
}
sessionMap_.store(newSessionMap);
killedPlans_.store(newKilledPlan);
folly::rcu_retire(oldKilledPlan);
folly::rcu_retire(oldSessionMap);
return true;
}

StatusOr<cpp2::Session> MetaClient::getSessionFromCache(const nebula::SessionID& session_id) {
if (!ready_) {
return Status::Error("Not ready!");
}
folly::rcu_reader guard;
auto session_map = sessionMap_.load();
auto it = session_map->find(session_id);
if (it != session_map->end()) {
return it->second;
}
return Status::SessionNotFound();
}

bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId) {
static thread_local int check_counter = 0;
// Inaccurate in a multi-threaded environment, but it is not important
check_counter = (check_counter + 1) & ((1 << FLAGS_check_plan_killed_frequency) - 1);
if (check_counter != 0) {
return false;
}
folly::rcu_reader guard;
return killedPlans_.load()->count({sessionId, planId});
}

} // namespace meta
} // namespace nebula
19 changes: 17 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
#define CLIENTS_META_METACLIENT_H_

#include <folly/RWSpinLock.h>
#include <folly/container/F14Map.h>
#include <folly/container/F14Set.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/synchronization/Rcu.h>
#include <gtest/gtest_prod.h>

#include <atomic>

#include "common/base/Base.h"
#include "common/base/Status.h"
#include "common/base/StatusOr.h"
Expand All @@ -20,6 +25,7 @@
#include "common/thread/GenericWorker.h"
#include "common/thrift/ThriftClientManager.h"
#include "interface/gen-cpp2/MetaServiceAsyncClient.h"
#include "interface/gen-cpp2/common_types.h"
#include "interface/gen-cpp2/meta_types.h"

DECLARE_int32(meta_client_retry_times);
Expand Down Expand Up @@ -55,8 +61,7 @@ using NameIndexMap = std::unordered_map<std::pair<GraphSpaceID, std::string>, In
// Get Index Structure by indexID
using Indexes = std::unordered_map<IndexID, std::shared_ptr<cpp2::IndexItem>>;

// Listeners is a map of ListenerHost => <PartId + type>, used to add/remove
// listener on local host
// Listeners is a map of ListenerHost => <PartId + type>, used to add/remove listener on local host
using Listeners =
std::unordered_map<HostAddr, std::vector<std::pair<PartitionID, cpp2::ListenerType>>>;

Expand Down Expand Up @@ -115,6 +120,7 @@ using FulltextClientsList = std::vector<cpp2::FTClient>;

using FTIndexMap = std::unordered_map<std::string, cpp2::FTIndex>;

using SessionMap = std::unordered_map<SessionID, cpp2::Session>;
class MetaChangedListener {
public:
virtual ~MetaChangedListener() = default;
Expand Down Expand Up @@ -175,6 +181,7 @@ class MetaClient {
FRIEND_TEST(MetaClientTest, RetryOnceTest);
FRIEND_TEST(MetaClientTest, RetryUntilLimitTest);
FRIEND_TEST(MetaClientTest, RocksdbOptionsTest);
friend class KillQueryMetaWrapper;

public:
MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
Expand Down Expand Up @@ -551,6 +558,10 @@ class MetaClient {

StatusOr<std::vector<HostAddr>> getStorageHosts() const;

StatusOr<cpp2::Session> getSessionFromCache(const nebula::SessionID& session_id);

bool checkIsPlanKilled(SessionID session_id, ExecutionPlanID plan_id);

StatusOr<HostAddr> getStorageLeaderFromCache(GraphSpaceID spaceId, PartitionID partId);

void updateStorageLeader(GraphSpaceID spaceId, PartitionID partId, const HostAddr& leader);
Expand Down Expand Up @@ -634,6 +645,8 @@ class MetaClient {

bool loadFulltextIndexes();

bool loadSessions();

void loadLeader(const std::vector<cpp2::HostItem>& hostItems,
const SpaceNameIdMap& spaceIndexByName);

Expand Down Expand Up @@ -746,6 +759,8 @@ class MetaClient {
MetaClientOptions options_;
std::vector<HostAddr> storageHosts_;
int64_t heartbeatTime_;
std::atomic<SessionMap*> sessionMap_;
std::atomic<folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>*> killedPlans_;
};

} // namespace meta
Expand Down
Loading

0 comments on commit 0786c4d

Please sign in to comment.