Skip to content

Commit

Permalink
add testcases and fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
young-scott committed Jul 27, 2023
1 parent d1edd98 commit 21ee29a
Show file tree
Hide file tree
Showing 27 changed files with 263 additions and 2,359 deletions.
613 changes: 8 additions & 605 deletions src/Disks/CubeFS/DiskCubeFS.cpp

Large diffs are not rendered by default.

143 changes: 6 additions & 137 deletions src/Disks/CubeFS/DiskCubeFS.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <Common/logger_useful.h>
#include <Disks/IDisk.h>
#include <base/logger_useful.h>
#include <Disks/DiskLocal.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
Expand All @@ -15,7 +15,7 @@ namespace ErrorCodes
namespace DB
{
class DiskCubeFSCheckThread;
class DiskCubeFS : public IDisk
class DiskCubeFS : public DiskLocal
{
public:
friend class DiskCubeFSCheckThread;
Expand All @@ -30,144 +30,13 @@ class DiskCubeFS : public IDisk
UInt64 local_disk_check_period_ms);
virtual ~DiskCubeFS() override;

const String & getPath() const override { return disk_path; }

ReservationPtr reserve(UInt64 bytes) override;

UInt64 getTotalSpace() const override;

UInt64 getAvailableSpace() const override;

UInt64 getUnreservedSpace() const override;

UInt64 getKeepingFreeSpace() const override { return keep_free_space_bytes; }

bool exists(const String & path) const override;

bool isFile(const String & path) const override;

bool isDirectory(const String & path) const override;

size_t getFileSize(const String & path) const override;

void createDirectory(const String & path) override;

void createDirectories(const String & path) override;

void clearDirectory(const String & path) override;

void moveDirectory(const String & from_path, const String & to_path) override;

DirectoryIteratorPtr iterateDirectory(const String & path) const override;

void createFile(const String & path) override;

void moveFile(const String & from_path, const String & to_path) override;

void replaceFile(const String & from_path, const String & to_path) override;

void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;

void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;

void listFiles(const String & path, std::vector<String> & file_names) const override;

std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const override;

std::unique_ptr<WriteBufferFromFileBase> writeFile(
const String & path,
size_t buf_size,
WriteMode mode,
const WriteSettings & settings) override;

void removeFile(const String & path) override;
void removeFileIfExists(const String & path) override;
void removeDirectory(const String & path) override;
void removeRecursive(const String & path) override;

void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;

Poco::Timestamp getLastModified(const String & path) const override;

time_t getLastChanged(const String & path) const override;

void setReadOnly(const String & path) override;

void createHardLink(const String & src_path, const String & dst_path) override;

void truncateFile(const String & path, size_t size) override;

DataSourceDescription getDataSourceDescription() const override;

bool isRemote() const override { return false; }
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;

void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) override;
bool supportDataSharing() const override { return true; }

bool isRemote() const override { return false; }
bool supportZeroCopyReplication() const override { return false; }
public:

SyncGuardPtr getDirectorySyncGuard(const String & path) const override;

void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) override;

bool isBroken() const override { return broken; }
bool isReadOnly() const override { return readonly; }

void startupImpl(ContextPtr context) override;

void shutdown() override;

/// Check if the disk is OK to proceed read/write operations. Currently the check is
/// rudimentary. The more advanced choice would be using
/// https://github.com/smartmontools/smartmontools. However, it's good enough for now.
bool canRead() const noexcept;
bool canWrite() const noexcept;

DiskObjectStoragePtr createDiskObjectStorage() override;

bool supportsStat() const override { return true; }
struct stat stat(const String & path) const override;

bool supportsChmod() const override { return true; }
void chmod(const String & path, mode_t mode) override;

MetadataStoragePtr getMetadataStorage() override;

protected:
void checkAccessImpl(const String & path) override;

private:
std::optional<UInt64> tryReserve(UInt64 bytes);

/// Setup disk for healthy check.
/// Throw exception if it's not possible to setup necessary files and directories.
void setup();

/// Read magic number from disk checker file. Return std::nullopt if exception happens.
std::optional<UInt32> readDiskCheckerMagicNumber() const noexcept;

const String disk_path;
const String disk_checker_path = ".disk_checker_file";
std::atomic<UInt64> keep_free_space_bytes;
Poco::Logger * logger;
DataSourceDescription data_source_description;

UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;

static std::mutex reservation_mutex;

std::atomic<bool> broken{false};
std::atomic<bool> readonly{false};
std::unique_ptr<DiskCubeFSCheckThread> disk_checker;
/// A magic number to vaguely check if reading operation generates correct result.
/// -1 means there is no available disk_checker_file yet.
Int64 disk_checker_magic_number = -1;
bool disk_checker_can_check_read = true;
};

extern void loadDiskCubeFSConfig(const String & name,
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/CubeFS/DiskCubeFSCheckThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <Disks/CubeFS/DiskCubeFS.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
#include <base/logger_useful.h>

namespace DB
{
Expand Down
27 changes: 22 additions & 5 deletions src/Disks/CubeFS/DistributeLockGuard.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include <filesystem>
#include <Disks/CubeFS/DistributeLockGuard.h>
#include <Common/logger_useful.h>
#include <base/logger_useful.h>
#include <Common/ZooKeeper/KeeperException.h>


namespace DB
{

namespace fs = std::filesystem;
const String DistributeLockGuard::owner_tag = "__[host]__";

DistributeLockGuard::DistributeLockGuard(const zkutil::ZooKeeperPtr & zk, const String & zk_path)
: zookeeper(zk), lock_path(zk_path), locked(false), log(&Poco::Logger::get("CubeFS:DistributeLockGuard"))
Expand All @@ -19,11 +21,11 @@ DistributeLockGuard::~DistributeLockGuard()
unlock();
}

bool DistributeLockGuard::tryLock(const String & value)
bool DistributeLockGuard::tryLock(const String & value, const String & owner)
{
if (locked)
return true;
locked = tryCreateDistributeLock(value);
locked = tryCreateDistributeLock(value, owner);
return locked;
}

Expand All @@ -33,7 +35,15 @@ void DistributeLockGuard::unlock()
locked = !releaseDistributeLock();
}

bool DistributeLockGuard::tryCreateDistributeLock(const String & value) const
String DistributeLockGuard::getLockOwner(String node_value) const
{
auto n = node_value.rfind(owner_tag);
if (n == std::string::npos)
return "";
return node_value.substr(n + owner_tag.length());
}

bool DistributeLockGuard::tryCreateDistributeLock(const String & value, const String & owner) const
{
/// In rare case other replica can remove path between createAncestors and createIfNotExists
/// So we make up to 5 attempts
Expand All @@ -44,18 +54,25 @@ bool DistributeLockGuard::tryCreateDistributeLock(const String & value) const
}

int attempts = 2;
String node_value = value + owner_tag + owner;
while (attempts > 0)
{
try
{
auto code = zookeeper->tryCreate(lock_path, value, zkutil::CreateMode::Ephemeral);
auto code = zookeeper->tryCreate(lock_path, node_value, zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZOK)
{
LOG_TRACE(log, "Create lock for znode {}", lock_path);
return true;
}
else if (code == Coordination::Error::ZNODEEXISTS)
{
String cur_node_value = zookeeper->get(lock_path);
String cur_owner = getLockOwner(cur_node_value);
if (0 == cur_owner.compare(owner))
{
return true;
}
LOG_INFO(log, "Cannot create lock for znode {} because lock was locked by others", lock_path);
return false;
}
Expand Down
12 changes: 7 additions & 5 deletions src/Disks/CubeFS/DistributeLockGuard.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once
#include <string>
#include <memory>

#include <base/types.h>
#include <Common/ZooKeeper/ZooKeeper.h>


Expand All @@ -13,21 +13,23 @@ class DistributeLockGuard
public:
DistributeLockGuard(const zkutil::ZooKeeperPtr & zk, const String &zk_path);
~DistributeLockGuard();
bool tryLock(const String & value = "");
bool tryLock(const String & value = "", const String & owner ="");
void unlock();

private:
/// add distribute lock for cubefs
bool tryCreateDistributeLock(const String & value) const;
bool tryCreateDistributeLock(const String & value, const String & owner) const;
bool releaseDistributeLock() const;
String getLockOwner(String node_value) const;

private:
zkutil::ZooKeeperPtr zookeeper;

std::string lock_path;
std::string lock_message;
String lock_path;
String lock_message;
bool locked;
Poco::Logger * log;
static const String owner_tag;

};

Expand Down
9 changes: 4 additions & 5 deletions src/Disks/CubeFS/registerDiskCubeFS.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#include <Disks/DiskFactory.h>
#include <Disks/CubeFS/DiskCubeFS.h>
#include <Common/Macros.h>
#include <Common/logger_useful.h>
#include <base/logger_useful.h>

namespace DB
{


void registerDiskCubeFS(DiskFactory & factory, bool global_skip_access_check)
void registerDiskCubeFS(DiskFactory & factory)
{
auto creator = [global_skip_access_check](
auto creator = [](
const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
Expand All @@ -24,10 +24,9 @@ void registerDiskCubeFS(DiskFactory & factory, bool global_skip_access_check)
if (path == disk_ptr->getPath())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk {} and disk {} cannot have the same path ({})", name, disk_name, path);

bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk
= std::make_shared<DiskCubeFS>(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0));
disk->startup(context, skip_access_check);
disk->startup();
return disk;
};

Expand Down
2 changes: 1 addition & 1 deletion src/Disks/DiskLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class DiskLocal : public IDisk
bool canRead() const noexcept;
bool canWrite() const noexcept;

private:
protected:
bool tryReserve(UInt64 bytes);

/// Setup disk for healthy check. Returns true if it's read-write, false if read-only.
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/DiskType.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ inline String toString(DiskType disk_type)
return "web";
case DiskType::AzureBlobStorage:
return "azure_blob_storage";
case DataSourceType::CubeFS:
case DiskType::CubeFS:
return "cubefs";
}
__builtin_unreachable();
Expand Down
2 changes: 0 additions & 2 deletions src/Disks/registerDisks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ void registerDisks()

registerDiskCubeFS(factory);
registerDiskWebServer(factory);

registerDiskCache(factory);
}

}
9 changes: 4 additions & 5 deletions src/Interpreters/DatabaseCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Databases/IDatabase.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseOnDisk.h>
#include <Disks/IDisk.h>
#include <Common/quoteString.h>
#include <Storages/StorageMemory.h>
#include <Storages/LiveView/TemporaryLiveViewCleaner.h>
Expand Down Expand Up @@ -918,17 +919,15 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)

/// Even if table is not loaded, try remove its data from disk.
/// TODO remove data from all volumes
fs::path data_path = fs::path(getContext()->getPath()) / "store" / getPathForUUID(table.table_id.uuid);
if (fs::exists(data_path))
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
String data_path = "store/" + getPathForUUID(table.table_id.uuid);
if (disk->supportDataSharing() && !need_remove_shared_data_parts) continue;

if (disk->isReadOnly() || !disk->exists(data_path))
continue;

LOG_INFO(log, "Removing data directory {} of dropped table {}", data_path.string(), table.table_id.getNameForLogs());
fs::remove_all(data_path);
LOG_INFO(log, "Removing data directory {} of dropped table {} from disk {}", data_path, table.table_id.getNameForLogs(), disk_name);
disk->removeRecursive(data_path);
}

LOG_INFO(log, "Removing metadata {} of dropped table {}", table.metadata_path, table.table_id.getNameForLogs());
Expand Down
Loading

0 comments on commit 21ee29a

Please sign in to comment.