Skip to content

Commit

Permalink
FLASH-275/276/277/278/281/282: Complete DDL implementation (#110)
Browse files Browse the repository at this point in the history
* Widen schema syncer with sync all semantic, add background schema sync service and do init sync on startup

* Finish widen some data types to avoid data rewriting during apply ddl

* Add update_timestamp into table info

* Refine read and flush to be more robust for schema change

* Fix nullable for overflow check when decode region cache

* Add schema state enum

* Support column type change in schema syncer

* Fix comment

* Add mock functions for TiDB DDL

* Fix rvalue ref

* Fix some bugs in schema syncer and mock tidb

* Adjust test

* Add alter test

* Not do data conversion for alter txn table, as we already stored widen data

* Fix typo

* Fix mock tidb reverse type mapping

* Enhance overflow checking when flushing

* Enhance alter test

* Refine some code

* Remove table region mapping when drop table

* Adjust drop order

* Print file path in mutable test script when error

* Fix insert widen data through sql

* Fix some txn tests

* Fix widen data write of import statement

* DDL read and update by tikv's meta (#103)

* DDL read and update by tikv's meta

* add dbg back

* refine test

* address comments

* Add schema version check before reading

* Fix build error

* Format

* Reorg schema syncer code

* Add lock control in schema syncer interface

* Refinement

* Add sync schema on read

* fix bug during creating column: use a wrong db name (#107)

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* fix ddl sync for table drop and truncate (#109)

* when we drop a table when flash server is closed, we should sync this
drop when restart flash server.

* Add schema sync on read and simplify schema syncer interface and adjust mock stuff (#108)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Fix build error

* Fix build error

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Minor fix to schema debug

* Fix alter tmt and pass tests

* Remove useless codes (#112)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Fix build fail

* Fix alter tmt and pass schema tests (#114)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Minor fix to schema debug

* Fix alter tmt and pass tests

* Fix build fail

* sync default value from tidb to flash (#113)

- still need to support current_timestamp
- still need to support date/datetime/enum types.

* Add lock for mock schema syncer

* Fix schema sync service init context

* Adjust schema tests

* Not sync if no schema change detected

* Adjust txn mock tests

* [DDL] Pass all txn tests (#116)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Minor fix to schema debug

* Fix alter tmt and pass tests

* Fix build fail

* Add lock for mock schema syncer

* Fix schema sync service init context

* Adjust schema tests

* Not sync if no schema change detected

* Adjust txn mock tests

* Fix default value bug

* Rename some tests

* Fix empty default value bug (#117)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Minor fix to schema debug

* Fix alter tmt and pass tests

* Fix build fail

* Add lock for mock schema syncer

* Fix schema sync service init context

* Adjust schema tests

* Not sync if no schema change detected

* Adjust txn mock tests

* Fix default value bug

* Rename some tests

* Remove sync schema test

* Remove a lot useless code

* Refine schema sync on read, and add drop on read test

* Remove a lot useless code (#118)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Minor fix to schema debug

* Fix alter tmt and pass tests

* Fix build fail

* Add lock for mock schema syncer

* Fix schema sync service init context

* Adjust schema tests

* Not sync if no schema change detected

* Adjust txn mock tests

* Fix default value bug

* Rename some tests

* Remove sync schema test

* Remove a lot useless code

* Refine schema sync on read logic and add test (#120)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Minor fix to schema debug

* Fix alter tmt and pass tests

* Fix build fail

* Add lock for mock schema syncer

* Fix schema sync service init context

* Adjust schema tests

* Not sync if no schema change detected

* Adjust txn mock tests

* Fix default value bug

* Rename some tests

* Remove sync schema test

* Remove a lot useless code

* Refine schema sync on read, and add drop on read test

* add schema_version for TableInfo (#122)

* add schema_version for TableInfo

In order to make read procedure more efficiently.

* address comment

* pass all tests

* fix bug for drop table; (#123)

* Support rename mock tidb table

* Add rename tests

* add logs for debug (#126)

* Add mock truncate table and tests

* Test rename table (#124)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Minor fix to schema debug

* Fix alter tmt and pass tests

* Fix build fail

* Add lock for mock schema syncer

* Fix schema sync service init context

* Adjust schema tests

* Not sync if no schema change detected

* Adjust txn mock tests

* Fix default value bug

* Rename some tests

* Remove sync schema test

* Remove a lot useless code

* Refine schema sync on read, and add drop on read test

* Support rename mock tidb table

* Add rename tests

* Add mock truncate table and tests

* tiny fix (#127)

* Add rename/truncate on read/write test and alter on read test

* Add rename/truncate on read/write test and alter on read test (#128)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Minor fix to schema debug

* Fix alter tmt and pass tests

* Fix build fail

* Add lock for mock schema syncer

* Fix schema sync service init context

* Adjust schema tests

* Not sync if no schema change detected

* Adjust txn mock tests

* Fix default value bug

* Rename some tests

* Remove sync schema test

* Remove a lot useless code

* Refine schema sync on read, and add drop on read test

* Support rename mock tidb table

* Add rename tests

* Add mock truncate table and tests

* Add rename/truncate on read/write test and alter on read test

* support for alter column. (#129)

* support for alter column.

* add log

* format

* address comment

* Fix mis-widen internal columns

* Support rename table. (#130)

* Fix fragile txn mock test

* Fix mis-widen internal columns (#133)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit

* Remove curl library

* Remove curl from builder image

* Remove useless codes, init schema syncer based on pd config

* Minor fix to schema debug

* Fix alter tmt and pass tests

* Fix build fail

* Add lock for mock schema syncer

* Fix schema sync service init context

* Adjust schema tests

* Not sync if no schema change detected

* Adjust txn mock tests

* Fix default value bug

* Rename some tests

* Remove sync schema test

* Remove a lot useless code

* Refine schema sync on read, and add drop on read test

* Support rename mock tidb table

* Add rename tests

* Add mock truncate table and tests

* Add rename/truncate on read/write test and alter on read test

* Fix mis-widen internal columns

* Fix fragile txn mock test again

* Remove useless code

* Resolve a dead lock problem during bootstraping. (#134)

* Resolve a dead lock problem during bootstraping.

* Merger will handle the structure lock of a storage. If merging is
earlier than syncing schema when bootstraping, merger is waiting for tmt
context's init, and schema syncer is waiting for the structure lock,
which cause dead lock :(

* add err code

* make log smarter

* format

* Fix dead lock in learner read (#135)

* Adjust order of region check and lock by region scanner creation (#136)

* Fix dead lock in learner read

* Adjust order of region check and lock by region scanner creation

* Remove useless invalid region version check

* Address comments

* address comment

* [FLASH-358] Support Region DeleteRange (#139)

* Address comments

* Fix test config based on new ddl

* fix drop db bug

* refine log

* Fix a ignore db config bug and add verbose pd addr

* Enlarge pd/kv/db startup waiting time

* Remove useless commands

* format client-c

* Address comments

* address comments

* hot fix

* Fix failed test

* Fix mutable tests by using no schema syncer

* fix. optimize background thread pool. (#153)
  • Loading branch information
zanmato1984 authored Aug 5, 2019
1 parent eb87a62 commit 71e1b04
Show file tree
Hide file tree
Showing 153 changed files with 4,876 additions and 1,957 deletions.
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ include (cmake/find_capnp.cmake)
include (cmake/find_llvm.cmake)
include (cmake/find_grpc.cmake)
include (cmake/find_kvproto.cmake)
include (cmake/find_curl.cmake)


include (cmake/find_contrib_lib.cmake)
Expand Down
7 changes: 0 additions & 7 deletions cmake/find_curl.cmake

This file was deleted.

12 changes: 7 additions & 5 deletions contrib/client-c/include/common/CltException.h
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
#pragma once

#include <Poco/Exception.h>
#include <exception>
#include <string>
#include <Poco/Exception.h>

namespace pingcap {
namespace pingcap
{

const int MismatchClusterIDCode = 1;
const int GRPCErrorCode = 2;
const int InitClusterIDFailed = 3;
const int UpdatePDLeaderFailed = 4;
const int TimeoutError = 5;
const int RegionUnavailable = 6;
const int LogicalError = 7;
const int LockError = 8;

class Exception : public Poco::Exception
{
public:
Exception() {} /// For deferred initialization.
Exception() {} /// For deferred initialization.
Exception(const std::string & msg, int code = 0) : Poco::Exception(msg, code) {}
Exception(const std::string & msg, const std::string & arg, int code = 0) : Poco::Exception(msg, arg, code) {}
Exception(const std::string & msg, const Exception & exc, int code = 0) : Poco::Exception(msg, exc, code) {}
explicit Exception(const Poco::Exception & exc) : Poco::Exception(exc.displayText()) {}

Exception * clone() const override { return new Exception(*this); }
void rethrow() const override { throw *this; }

};

}
} // namespace pingcap
3 changes: 2 additions & 1 deletion contrib/client-c/include/common/Log.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Poco/Logger.h>

namespace pingcap {
namespace pingcap
{
using Poco::Logger;
}
38 changes: 19 additions & 19 deletions contrib/client-c/include/pd/Client.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
#pragma once

#include <common/Log.h>
#include <kvproto/pdpb.grpc.pb.h>
#include <atomic>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <shared_mutex>
#include <condition_variable>
#include <kvproto/pdpb.grpc.pb.h>
#include <common/Log.h>
#include <thread>
#include "IClient.h"

namespace pingcap{
namespace pd {
namespace pingcap
{
namespace pd
{

struct SecurityOption {
struct SecurityOption
{
std::string CAPath;
std::string CertPath;
std::string KeyPath;
};

class Client : public IClient {
class Client : public IClient
{
const int max_init_cluster_retries;

const std::chrono::seconds pd_timeout;
Expand All @@ -28,17 +32,14 @@ class Client : public IClient {
const std::chrono::seconds update_leader_interval;

public:

Client(const std::vector<std::string> & addrs);

~Client() override;

//uint64_t getClusterID() override;

// only implement a weak get ts.
uint64_t getTS() override {
throw "not implemented";
}
uint64_t getTS() override;

std::tuple<metapb::Region, metapb::Peer, std::vector<metapb::Peer>> getRegion(std::string key) override;

Expand All @@ -59,17 +60,17 @@ class Client : public IClient {

void updateLeader();

void updateURLs(const ::google::protobuf::RepeatedPtrField<::pdpb::Member>& members);
void updateURLs(const ::google::protobuf::RepeatedPtrField<::pdpb::Member> & members);

void leaderLoop();

void switchLeader(const ::google::protobuf::RepeatedPtrField<std::string>&);
void switchLeader(const ::google::protobuf::RepeatedPtrField<std::string> &);

std::unique_ptr<pdpb::PD::Stub> leaderStub();

pdpb::GetMembersResponse getMembers(std::string);

pdpb::RequestHeader* requestHeader();
pdpb::RequestHeader * requestHeader();

std::shared_ptr<grpc::Channel> getOrCreateGRPCConn(const std::string &);

Expand All @@ -83,7 +84,7 @@ class Client : public IClient {

std::mutex gc_safepoint_mutex;

std::unordered_map<std::string, std::shared_ptr<grpc::Channel> > channel_map;
std::unordered_map<std::string, std::shared_ptr<grpc::Channel>> channel_map;

std::vector<std::string> urls;

Expand All @@ -100,9 +101,8 @@ class Client : public IClient {
std::atomic<bool> check_leader;

Logger * log;

};


}
}
} // namespace pd
} // namespace pingcap
24 changes: 13 additions & 11 deletions contrib/client-c/include/pd/IClient.h
Original file line number Diff line number Diff line change
@@ -1,41 +1,43 @@
#pragma once

#include<string>
#include<vector>
#include <string>
#include <vector>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <kvproto/enginepb.pb.h>
#pragma GCC diagnostic pop

namespace pingcap {
namespace pd {
namespace pingcap
{
namespace pd
{

class IClient {
class IClient
{
public:
// virtual uint64_t getClusterID() = 0;
// virtual uint64_t getClusterID() = 0;

virtual ~IClient() {}

virtual uint64_t getTS() = 0;

virtual std::tuple<metapb::Region, metapb::Peer, std::vector<metapb::Peer>> getRegion(std::string key) = 0;

// virtual std::pair<metapb::Region, metapb::Peer> getPrevRegion(std::string key) = 0;
// virtual std::pair<metapb::Region, metapb::Peer> getPrevRegion(std::string key) = 0;

virtual std::tuple<metapb::Region, metapb::Peer, std::vector<metapb::Peer>> getRegionByID(uint64_t region_id) = 0;

virtual metapb::Store getStore(uint64_t store_id) = 0;

// virtual std::vector<metapb::Store> getAllStores() = 0;
// virtual std::vector<metapb::Store> getAllStores() = 0;

virtual uint64_t getGCSafePoint() = 0;

virtual bool isMock() = 0;

};

using ClientPtr = std::shared_ptr<IClient>;

}
}
} // namespace pd
} // namespace pingcap
42 changes: 15 additions & 27 deletions contrib/client-c/include/pd/MockPDClient.h
Original file line number Diff line number Diff line change
@@ -1,46 +1,34 @@
#pragma once

#include <limits>
#include <pd/IClient.h>
#include <limits>

namespace pingcap {
namespace pd {
namespace pingcap
{
namespace pd
{

using Clock = std::chrono::system_clock;
using Seconds = std::chrono::seconds;

class MockPDClient : public IClient {
class MockPDClient : public IClient
{
public:
MockPDClient() = default;

~MockPDClient() override {}

uint64_t getGCSafePoint() override
{
return (Clock::now() - Seconds(2)).time_since_epoch().count();
}
uint64_t getGCSafePoint() override { return 1000000000000000; }

uint64_t getTS() override
{
return Clock::now().time_since_epoch().count();
}
uint64_t getTS() override { return Clock::now().time_since_epoch().count(); }

std::tuple<metapb::Region, metapb::Peer, std::vector<metapb::Peer>> getRegion(std::string) override {
throw "not implemented";
}
std::tuple<metapb::Region, metapb::Peer, std::vector<metapb::Peer>> getRegion(std::string) override { throw "not implemented"; }

std::tuple<metapb::Region, metapb::Peer, std::vector<metapb::Peer>> getRegionByID(uint64_t) override {
throw "not implemented";
}
std::tuple<metapb::Region, metapb::Peer, std::vector<metapb::Peer>> getRegionByID(uint64_t) override { throw "not implemented"; }

metapb::Store getStore(uint64_t) override {
throw "not implemented";
}
metapb::Store getStore(uint64_t) override { throw "not implemented"; }

bool isMock() override {
return true;
}
bool isMock() override { return true; }
};

}
}
} // namespace pd
} // namespace pingcap
52 changes: 31 additions & 21 deletions contrib/client-c/include/tikv/Backoff.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
#pragma once

#include <iostream>
#include <cmath>
#include <unistd.h>
#include <cmath>
#include <iostream>
#include <map>
#include <memory>

#include <common/CltException.h>

namespace pingcap {
namespace kv {
namespace pingcap
{
namespace kv
{

enum Jitter {
enum Jitter
{
NoJitter = 1,
FullJitter,
EqualJitter,
DecorrJitter
};

enum BackoffType {
enum BackoffType
{
boTiKVRPC = 0,
boTxnLock,
boTxnLockFast,
Expand All @@ -28,28 +32,31 @@ enum BackoffType {
boServerBusy
};

inline int expo(int base, int cap, int n) {
return std::min(double(cap), double(base) * std::pow(2.0, double(n)));
}
inline int expo(int base, int cap, int n) { return std::min(double(cap), double(base) * std::pow(2.0, double(n))); }

struct Backoff {
struct Backoff
{
int base;
int cap;
int jitter;
int last_sleep;
int attempts;

Backoff(int base_, int cap_, Jitter jitter_) : base(base_), cap(cap_), jitter(jitter_), attempts(0) {
if (base < 2) {
Backoff(int base_, int cap_, Jitter jitter_) : base(base_), cap(cap_), jitter(jitter_), attempts(0)
{
if (base < 2)
{
base = 2;
}
last_sleep = base;
}

int sleep () {
int sleep()
{
int sleep_time = 0;
int v = 0;
switch(jitter) {
switch (jitter)
{
case NoJitter:
sleep_time = expo(base, cap, attempts);
break;
Expand All @@ -59,31 +66,34 @@ struct Backoff {
break;
case EqualJitter:
v = expo(base, cap, attempts);
sleep_time = v/2 + rand() % (v/2);
sleep_time = v / 2 + rand() % (v / 2);
break;
case DecorrJitter:
sleep_time = int(std::min(double(cap), double(base + rand() % (last_sleep*3-base))));
sleep_time = int(std::min(double(cap), double(base + rand() % (last_sleep * 3 - base))));
}
::usleep(sleep_time);
attempts ++;
attempts++;
last_sleep = sleep_time;
return last_sleep;
}
};

constexpr int readIndexMaxBackoff = 20000;
constexpr int GetMaxBackoff = 20000;
constexpr int scanMaxBackoff = 20000;

using BackoffPtr = std::shared_ptr<Backoff>;

struct Backoffer {
struct Backoffer
{
std::map<BackoffType, BackoffPtr> backoff_map;
size_t total_sleep; // ms
size_t max_sleep; // ms
size_t max_sleep; // ms

Backoffer(size_t max_sleep_) : total_sleep(0), max_sleep(max_sleep_) {}

void backoff(BackoffType tp, const Exception & exc);
};

}
}
} // namespace kv
} // namespace pingcap
Loading

0 comments on commit 71e1b04

Please sign in to comment.