Skip to content

Commit

Permalink
[enhancement](cloud) support BE http action: list_cache and clear (ap…
Browse files Browse the repository at this point in the history
…ache#41037)

## Proposed changes

Add a http action which is useful when you debug.

### API
```http
GET /api/file_cache
```

### request parameter

#### request parameter1

|param|type|desc|require|
|:---|:---|:---|:---|
|op|string|the value must be `list_cache`, other value you can refer to
apache#40831 apache#37484 |yes|
|value|string|the segment file name |yes|

#### request parameter2

|param|type|desc|require|
|:---|:---|:---|:---|
|op|string|the value must be `clear`, other value you can refer to
apache#40831 apache#37484 |yes|
|value|string|the segment file name |yes|
|sync|bool|clean local cache in sync |no|

### response

#### response1

if success
|param|type|desc|
|:---|:---|:---|
||array|return the segment file cache in local path|

if fail
|param|type|desc|
|:---|:---|:---|
||array|empty array|

#### response2

if success
|param|type|desc|
|:---|:---|:---|
|status|string||
|msg|string||

### example

#### case 1

```bash
curl  '172.100.0.4:8040/api/file_cache?op=list_cache&value=0200000000000001bf42c14374fff491ffb7c89a1a65c5bb_0.dat'
```

return
```json
["/opt/doris/be/file_cache/c6a/c6a599f453f67f0949f80ad9990fa3dd/0"]
```

#### case 2

```bash
curl '127.0.0.1:8040/api/file_cache?op=clear&sync=true&value=0200000000000001284b68fea3dcfe8a83e65cd88426b081_0.dat'
```

return
```json
{
    "status": "OK",
    "msg": "OK"
}
```
  • Loading branch information
yagagagaga authored Nov 7, 2024
1 parent 8c88e35 commit 99d0748
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 1 deletion.
33 changes: 32 additions & 1 deletion be/src/http/action/file_cache_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

#include "file_cache_action.h"

#include <glog/logging.h>

#include <algorithm>
#include <memory>
#include <shared_mutex>
#include <sstream>
#include <string>
#include <string_view>
#include <vector>

#include "common/status.h"
#include "http/http_channel.h"
Expand All @@ -30,6 +35,7 @@
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
#include "io/cache/fs_file_cache_storage.h"
#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
#include "util/easy_json.h"
Expand All @@ -43,6 +49,7 @@ constexpr static std::string_view PATH = "path";
constexpr static std::string_view CLEAR = "clear";
constexpr static std::string_view RESET = "reset";
constexpr static std::string_view HASH = "hash";
constexpr static std::string_view LIST_CACHE = "list_cache";
constexpr static std::string_view CAPACITY = "capacity";
constexpr static std::string_view RELEASE = "release";
constexpr static std::string_view BASE_PATH = "base_path";
Expand All @@ -66,7 +73,14 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
*json_metrics = json.ToString();
} else if (operation == CLEAR) {
const std::string& sync = req->param(SYNC.data());
auto ret = io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true");
const std::string& segment_path = req->param(VALUE.data());
if (segment_path.empty()) {
io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true");
} else {
io::UInt128Wrapper hash = io::BlockFileCache::hash(segment_path);
io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash);
cache->remove_if_cached(hash);
}
} else if (operation == RESET) {
std::string capacity = req->param(CAPACITY.data());
int64_t new_capacity = 0;
Expand Down Expand Up @@ -96,6 +110,23 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
json[HASH.data()] = ret.to_string();
*json_metrics = json.ToString();
}
} else if (operation == LIST_CACHE) {
const std::string& segment_path = req->param(VALUE.data());
if (segment_path.empty()) {
st = Status::InvalidArgument("missing parameter: {} is required", VALUE.data());
} else {
io::UInt128Wrapper cache_hash = io::BlockFileCache::hash(segment_path);
std::vector<std::string> cache_files =
io::FileCacheFactory::instance()->get_cache_file_by_path(cache_hash);
if (cache_files.empty()) {
*json_metrics = "[]";
} else {
EasyJson json;
std::for_each(cache_files.begin(), cache_files.end(),
[&json](auto& x) { json.PushBack(x); });
*json_metrics = json.ToString();
}
}
} else {
st = Status::InternalError("invalid operation: {}", operation);
}
Expand Down
17 changes: 17 additions & 0 deletions be/src/io/cache/block_file_cache_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "io/cache/block_file_cache_factory.h"

#include <glog/logging.h>

#include <string>
#include <vector>
#if defined(__APPLE__)
#include <sys/mount.h>
#else
Expand Down Expand Up @@ -118,6 +121,20 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
return Status::OK();
}

std::vector<std::string> FileCacheFactory::get_cache_file_by_path(const UInt128Wrapper& hash) {
io::BlockFileCache* cache = io::FileCacheFactory::instance()->get_by_path(hash);
auto blocks = cache->get_blocks_by_key(hash);
std::vector<std::string> ret;
if (blocks.empty()) {
return ret;
} else {
for (auto& [_, fb] : blocks) {
ret.emplace_back(fb->get_cache_file());
}
}
return ret;
}

BlockFileCache* FileCacheFactory::get_by_path(const UInt128Wrapper& key) {
// dont need lock mutex because _caches is immutable after create_file_cache
return _caches[KeyHash()(key) % _caches.size()].get();
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/block_file_cache_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class FileCacheFactory {

[[nodiscard]] size_t get_cache_instance_size() const { return _caches.size(); }

std::vector<std::string> get_cache_file_by_path(const UInt128Wrapper& hash);

BlockFileCache* get_by_path(const UInt128Wrapper& hash);
BlockFileCache* get_by_path(const std::string& cache_base_path);
std::vector<BlockFileCache::QueryFileCacheContextHolderPtr> get_query_context_holders(
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/cache/file_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ std::string FileBlock::state_to_string(FileBlock::State state) {
}
}

std::string FileBlock::get_cache_file() const {
return _mgr->_storage->get_local_file(this->_key);
}

FileBlocksHolder::~FileBlocksHolder() {
for (auto file_block_it = file_blocks.begin(); file_block_it != file_blocks.end();) {
auto current_file_block_it = file_block_it;
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/file_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class FileBlock {

uint64_t expiration_time() const { return _key.meta.expiration_time; }

std::string get_cache_file() const;

State state_unlock(std::lock_guard<std::mutex>&) const;

FileBlock& operator=(const FileBlock&) = delete;
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class FileCacheStorage {
// force clear all current data in the cache
virtual Status clear(std::string& msg) = 0;
virtual FileCacheStorageType get_type() = 0;
// get local cached file
virtual std::string get_local_file(const FileCacheKey& key) = 0;
};

} // namespace doris::io
5 changes: 5 additions & 0 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,11 @@ Status FSFileCacheStorage::clear(std::string& msg) {
return Status::OK();
}

std::string FSFileCacheStorage::get_local_file(const FileCacheKey& key) {
return get_path_in_local_cache(get_path_in_local_cache(key.hash, key.meta.expiration_time),
key.offset, key.meta.type, false);
}

FSFileCacheStorage::~FSFileCacheStorage() {
if (_cache_background_load_thread.joinable()) {
_cache_background_load_thread.join();
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/fs_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class FSFileCacheStorage : public FileCacheStorage {
void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key,
std::lock_guard<std::mutex>& cache_lock) override;
Status clear(std::string& msg) override;
std::string get_local_file(const FileCacheKey& key) override;

[[nodiscard]] static std::string get_path_in_local_cache(const std::string& dir, size_t offset,
FileCacheType type,
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/cache/mem_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,8 @@ Status MemFileCacheStorage::clear(std::string& msg) {
return Status::OK();
}

std::string MemFileCacheStorage::get_local_file(const FileCacheKey& key) {
return "";
}

} // namespace doris::io
1 change: 1 addition & 0 deletions be/src/io/cache/mem_file_cache_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class MemFileCacheStorage : public FileCacheStorage {
void load_blocks_directly_unlocked(BlockFileCache* _mgr, const FileCacheKey& key,
std::lock_guard<std::mutex>& cache_lock) override;
Status clear(std::string& msg) override;
std::string get_local_file(const FileCacheKey& key) override;

FileCacheStorageType get_type() override { return MEMORY; }

Expand Down
117 changes: 117 additions & 0 deletions regression-test/suites/cloud_p0/cache/http/test_list_cache_file.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_list_cache_file") {
sql """ use @regression_cluster_name1 """
String[][] backends = sql """ show backends """
String backendId;
def backendIdToBackendIP = [:]
def backendIdToBackendHttpPort = [:]
def backendIdToBackendBrpcPort = [:]
for (String[] backend in backends) {
if (backend[9].equals("true") && backend[19].contains("regression_cluster_name1")) {
backendIdToBackendIP.put(backend[0], backend[1])
backendIdToBackendHttpPort.put(backend[0], backend[4])
backendIdToBackendBrpcPort.put(backend[0], backend[5])
}
}
assertEquals(backendIdToBackendIP.size(), 1)

backendId = backendIdToBackendIP.keySet()[0]
def socket = backendIdToBackendIP.get(backendId) + ":" + backendIdToBackendHttpPort.get(backendId)

sql "drop table IF EXISTS `user`"

sql """
CREATE TABLE IF NOT EXISTS `user` (
`id` int NULL,
`name` string NULL
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"file_cache_ttl_seconds" = "2884"
)
"""

sql "insert into user select number, cast(rand() as varchar(32)) from numbers(\"number\"=\"1000000\")"

def get_tablets = { String tbl_name ->
def res = sql "show tablets from ${tbl_name}"
List<Integer> tablets = new ArrayList<>()
for (final def line in res) {
tablets.add(Integer.valueOf(line[0].toString()))
}
return tablets
}

def get_rowsets = { int tablet_id ->
var ret = []
httpTest {
endpoint ""
uri socket + "/api/compaction/show?tablet_id=" + tablet_id
op "get"
check {respCode, body ->
assertEquals(respCode, 200)
var map = parseJson(body)
for (final def line in map.get("rowsets")) {
var tokens = line.toString().split(" ")
ret.add(tokens[4])
}
}
}
return ret
}

var tablets = get_tablets("user")
var rowsets = get_rowsets(tablets.get(0))
var segment_file = rowsets[rowsets.size() - 1] + "_0.dat"

httpTest {
endpoint ""
uri socket + "/api/file_cache?op=list_cache&value=" + segment_file
op "get"
check {respCode, body ->
assertEquals(respCode, 200)
var arr = parseJson(body)
assertTrue(arr.size() > 0, "There shouldn't be no cache file at all, maybe you need to check disk capacity and modify file_cache_enter_disk_resource_limit_mode_percent in be.conf")
}
}

// clear single segment file cache
httpTest {
endpoint ""
uri socket + "/api/file_cache?op=clear&value=" + segment_file
op "get"
check {respCode, body ->
assertEquals(respCode, 200, "clear local cache fail, maybe you can find something in respond: " + parseJson(body))
}
}

httpTest {
endpoint ""
uri socket + "/api/file_cache?op=list_cache&value=" + segment_file
op "get"
check {respCode, body ->
assertEquals(respCode, 200)
var arr = parseJson(body)
assertTrue(arr.size() == 0, "local cache files should not greater than 0, because it has already clear")
}
}
}

0 comments on commit 99d0748

Please sign in to comment.