Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent file directories index #15

Draft
wants to merge 36 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2907e70
split FileDirectoriesIndex single db to 100 to improve concurrency
Jun 26, 2020
64c8850
fix compiling error
Jun 26, 2020
c17cdd3
remove logging
Jun 26, 2020
502cc2d
add queue for each sqlite db as buffer zone
Jun 30, 2020
5417f09
add logging for debug
Jun 30, 2020
ede212e
adjust cas-test cas size and HashFunction and add logging
Jul 1, 2020
1cc0f23
drain all queues each time any queue is full
Jul 1, 2020
91890be
avoid npe
Jul 1, 2020
2648403
synchronize queues draining
Jul 1, 2020
57859ef
increae max total queue size
Jul 1, 2020
48c1d54
adjust synchronization point and number of database
Jul 2, 2020
2070ff1
adjust synchronization point
Jul 2, 2020
d452210
adjust synchronization
Jul 2, 2020
8a574af
maintain all in memory and then flush to disk
Jul 2, 2020
88a4a07
avoid logging per entry
Jul 2, 2020
0c7023b
drain each queue independently
Jul 2, 2020
6580d55
add logging for debugging
Jul 2, 2020
385af36
adjust queue size
Jul 2, 2020
bff954d
adjust queue size and queue number
Jul 2, 2020
cec07a6
drain queue concurrently
Jul 5, 2020
16e19f6
increase total queue size
Jul 5, 2020
75bdbe3
increase total number of database to reduce lock retension in adding
Jul 5, 2020
61cf41b
drain queue independently
Jul 5, 2020
bccffb2
incresease separate queue size
Jul 5, 2020
2b97d0e
using string array to replace MapEntry
Jul 6, 2020
589fcba
adjust logging
Jul 6, 2020
783ea45
use 40 times number of queue
Jul 8, 2020
2e3fe95
logging out of synchronization block
Jul 8, 2020
5b03122
drain queue concurrently
Jul 8, 2020
68e9d80
adjust number of thread
Jul 8, 2020
ab29fa7
add synchronization
Jul 8, 2020
4848ce1
double producer count
Jul 9, 2020
8432eba
use ConcurrentHashMap
Jul 9, 2020
8cc33ce
unlimited queue
Jul 9, 2020
042c9e0
add logging
Jul 9, 2020
b258c24
drain queue independently
Jul 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/build/buildfarm/CASTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public static void main(String[] args) throws Exception {
CASFileCache fileCache =
new LocalCASFileCache(
root,
/* maxSizeInBytes=*/ GBtoBytes(500),
new DigestUtil(HashFunction.SHA1),
/* maxSizeInBytes=*/ GBtoBytes(2 * 1024),
new DigestUtil(HashFunction.SHA256),
/* expireService=*/ newDirectExecutorService(),
/* accessRecorder=*/ directExecutor());

Expand Down
55 changes: 26 additions & 29 deletions src/main/java/build/buildfarm/cas/CASFileCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public abstract class CASFileCache implements ContentAddressableStorage {
private static final Logger logger = Logger.getLogger(CASFileCache.class.getName());

protected static final String DEFAULT_DIRECTORIES_INDEX_NAME = "directories.sqlite";
protected static final String DIRECTORIES_INDEX_NAME_MEMORY = ":memory:";

private final Path root;
private final FileStore fileStore;
Expand All @@ -129,7 +128,7 @@ public abstract class CASFileCache implements ContentAddressableStorage {
private final Executor accessRecorder;
private final ExecutorService expireService;

private final Map<Digest, DirectoryEntry> directoryStorage = Maps.newHashMap();
private final Map<Digest, DirectoryEntry> directoryStorage = Maps.newConcurrentMap();
private final DirectoriesIndex directoriesIndex;
private final String directoriesIndexDbName;
private final LockMap locks = new LockMap();
Expand Down Expand Up @@ -286,24 +285,7 @@ public CASFileCache(
this.onExpire = onExpire;
this.delegate = delegate;
this.directoriesIndexDbName = directoriesIndexDbName;

String directoriesIndexUrl = "jdbc:sqlite:";
if (directoriesIndexDbName.equals(DIRECTORIES_INDEX_NAME_MEMORY)) {
directoriesIndexUrl += directoriesIndexDbName;
} else {
// db is ephemeral for now, no reuse occurs to match it, computation
// occurs each time anyway, and expected use of put is noop on collision
Path path = getPath(directoriesIndexDbName);
try {
if (Files.exists(path)) {
Files.delete(path);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
directoriesIndexUrl += path.toString();
}
this.directoriesIndex = new FileDirectoriesIndex(directoriesIndexUrl, root);
this.directoriesIndex = new FileDirectoriesIndex(directoriesIndexDbName, root);

header.before = header.after = header;
}
Expand Down Expand Up @@ -1295,16 +1277,33 @@ public StartupCacheResults start(Consumer<Digest> onPut, ExecutorService removeD
CacheScanResults cacheScanResults = scanRoot();
LogCacheScanResults(cacheScanResults);
deleteInvalidFileContent(cacheScanResults.deleteFiles, removeDirectoryService);
Instant phase1Ending = Instant.now();
logger.log(
Level.INFO,
"Phase 1 Time: "
+ Duration.between(startTime, phase1Ending).getSeconds()
+ "s");

// Phase 2: Compute
// recursively construct all directory structures.
List<Path> invalidDirectories = computeDirectories(cacheScanResults);
LogComputeDirectoriesResults(invalidDirectories);
deleteInvalidFileContent(invalidDirectories, removeDirectoryService);

logger.log(Level.INFO, "Creating Index");
Instant beforeIndexing = Instant.now();
logger.log(
Level.INFO,
"Phase 2 Inserting Time: "
+ Duration.between(phase1Ending, beforeIndexing).getSeconds()
+ "s");

directoriesIndex.start();
logger.log(Level.INFO, "Index Created");
Instant afterIndexing = Instant.now();
logger.log(
Level.INFO,
"Phase 2 Building Index Time: "
+ Duration.between(beforeIndexing, afterIndexing).getSeconds()
+ "s");

// Calculate Startup time
Instant endTime = Instant.now();
Expand Down Expand Up @@ -1400,7 +1399,7 @@ private void processRootFile(

// ignore our directories index database
// indexes will be removed and rebuilt for compute
if (!basename.equals(directoriesIndexDbName)) {
if (!basename.startsWith(directoriesIndexDbName)) {
FileStatus stat = stat(file, false);

// mark directory for later key compute
Expand Down Expand Up @@ -1458,14 +1457,15 @@ private List<Path> computeDirectories(CacheScanResults cacheScanResults)
throws IOException, InterruptedException {

// create thread pool
int nThreads = Runtime.getRuntime().availableProcessors();
int nThreads = Runtime.getRuntime().availableProcessors() * 2;
String threadNameFormat = "compute-cache-pool-%d";
ExecutorService pool =
Executors.newFixedThreadPool(
nThreads, new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build());

ImmutableList.Builder<Path> invalidDirectories = new ImmutableList.Builder<>();

directoriesIndex.setBatchMode(true);
for (Path path : cacheScanResults.computeDirs) {
pool.execute(
() -> {
Expand All @@ -1482,9 +1482,7 @@ private List<Path> computeDirectories(CacheScanResults cacheScanResults)
if (digest != null && getDirectoryPath(digest).equals(path)) {
DirectoryEntry e = new DirectoryEntry(directory, Deadline.after(10, SECONDS));
directoriesIndex.put(digest, inputsBuilder.build());
synchronized (this) {
directoryStorage.put(digest, e);
}
} else {
synchronized (invalidDirectories) {
invalidDirectories.add(path);
Expand All @@ -1497,6 +1495,7 @@ private List<Path> computeDirectories(CacheScanResults cacheScanResults)
}

joinThreads(pool, "Populating Directories...", 1, MINUTES);
directoriesIndex.setBatchMode(false);

return invalidDirectories.build();
}
Expand Down Expand Up @@ -2226,9 +2225,7 @@ private ListenableFuture<Path> putDirectorySynchronized(
? Directory.getDefaultInstance()
: directoriesByDigest.get(digest),
Deadline.after(10, SECONDS));
synchronized (this) {
directoryStorage.put(digest, e);
}
directoryStorage.put(digest, e);
return path;
},
service);
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/build/buildfarm/cas/DirectoriesIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ interface DirectoriesIndex {
void remove(Digest directory);

void start();

void setBatchMode(boolean batchMode) throws InterruptedException;
}
Loading