Skip to content

Commit

Permalink
修复复制快照Bug:
Browse files Browse the repository at this point in the history
1. 修复复制快照有可能丢失文件的问题;
2. 将Snapshot从JournalKeeperState类中分离出来,结构更清晰;
3. 增加了Snapshot完整性的验证,避免加载使用不完整快照导致的数据错误;
4. 修改复制Snapshot实现,现在空目录也会被复制。
  • Loading branch information
liyue2008 committed Apr 17, 2020
1 parent 1073104 commit 91f3a6c
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 132 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.journalkeeper.exceptions;

/**
* @author LiYue
* Date: 2019-03-25
*/
public class StateQueryException extends RuntimeException {
public StateQueryException(String msg) {
super(msg);
}

public StateQueryException(Throwable throwable) {
super(throwable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.journalkeeper.core.entry.internal.UpdateVotersS1Entry;
import io.journalkeeper.core.entry.internal.UpdateVotersS2Entry;
import io.journalkeeper.core.state.EntryFutureImpl;
import io.journalkeeper.core.state.Snapshot;
import io.journalkeeper.exceptions.JournalException;
import io.journalkeeper.exceptions.RecoverException;
import io.journalkeeper.core.journal.Journal;
Expand Down Expand Up @@ -156,7 +157,7 @@ public abstract class AbstractServer
/**
* 存放节点上所有状态快照的稀疏数组,数组的索引(key)就是快照对应的日志位置的索引
*/
protected final NavigableMap<Long, JournalKeeperState> snapshots = new ConcurrentSkipListMap<>();
protected final NavigableMap<Long, Snapshot> snapshots = new ConcurrentSkipListMap<>();
protected final PartialSnapshot partialSnapshot;
protected final BufferPool bufferPool;
protected final Map<URI, ServerRpc> remoteServers = new HashMap<>();
Expand Down Expand Up @@ -393,8 +394,13 @@ protected Path snapshotsPath() {
}

private void createFistSnapshot(List<URI> voters, Set<Integer> partitions, URI preferredLeader) throws IOException {
JournalKeeperState snapshot = new JournalKeeperState(stateFactory, metadataPersistence);
snapshot.init(snapshotsPath().resolve(String.valueOf(0L)), voters, partitions, preferredLeader);
Snapshot snapshot = new Snapshot(stateFactory, metadataPersistence);
Path snapshotPath = snapshotsPath().resolve(String.valueOf(0L));
snapshot.init(snapshotPath, voters, partitions, preferredLeader);
Snapshot.markComplete(snapshotPath);
snapshot.recover(snapshotPath, properties);
snapshot.createFirstSnapshot(partitions);
snapshot.close();
}

protected Path statePath() {
Expand Down Expand Up @@ -537,16 +543,16 @@ public CompletableFuture<QueryStateResponse> querySnapshot(QueryStateRequest req
}
}

JournalKeeperState snapshot;
Map.Entry<Long, JournalKeeperState> nearestSnapshot = snapshots.floorEntry(request.getIndex());
Snapshot snapshot;
Map.Entry<Long, Snapshot> nearestSnapshot = snapshots.floorEntry(request.getIndex());
if (null == nearestSnapshot) {
throw new IndexUnderflowException();
}

if (request.getIndex() == nearestSnapshot.getKey()) {
snapshot = nearestSnapshot.getValue();
} else {
snapshot = new JournalKeeperState(stateFactory, metadataPersistence);
snapshot = new Snapshot(stateFactory, metadataPersistence);
Path tempSnapshotPath = snapshotsPath().resolve(String.valueOf(request.getIndex()));
if (Files.exists(tempSnapshotPath)) {
throw new ConcurrentModificationException(String.format("A snapshot of position %d is creating, please retry later.", request.getIndex()));
Expand Down Expand Up @@ -578,7 +584,7 @@ private void createSnapShot(InternalEntryType type, byte[] internalEntry) {

private void recoverSnapShot(InternalEntryType type, byte[] internalEntry) {
RecoverSnapshotEntry recoverSnapshotEntry = InternalEntriesSerializeSupport.parse(internalEntry);
JournalKeeperState targetSnapshot = snapshots.get(recoverSnapshotEntry.getIndex());
Snapshot targetSnapshot = snapshots.get(recoverSnapshotEntry.getIndex());
if (targetSnapshot == null) {
logger.warn("recover snapshot failed, snapshot not exist, index: {}", recoverSnapshotEntry.getIndex());
return;
Expand All @@ -591,7 +597,7 @@ private void recoverSnapShot(InternalEntryType type, byte[] internalEntry) {
}
}

protected void doRecoverSnapshot(JournalKeeperState targetSnapshot) throws IOException {
protected void doRecoverSnapshot(Snapshot targetSnapshot) throws IOException {
logger.info("recover snapshot, target snapshot: {}", targetSnapshot.getPath());
state.closeUnsafe();
state.clearUserState();
Expand Down Expand Up @@ -704,9 +710,11 @@ private void createSnapshot() {
try {
FileUtils.deleteFolder(snapshotPath);
state.dump(snapshotPath);
JournalKeeperState snapshot = new JournalKeeperState(stateFactory, metadataPersistence);
Snapshot.markComplete(snapshotPath);
Snapshot snapshot = new Snapshot(stateFactory, metadataPersistence);
snapshot.recover(snapshotPath, properties);
snapshot.createSnapshot(journal);

snapshots.put(snapshot.lastApplied(), snapshot);
logger.info("Snapshot at index: {} created, {}.", lastApplied, snapshot);

Expand All @@ -724,7 +732,7 @@ public CompletableFuture<GetServerStateResponse> getServerState(GetServerStateRe
iteratorId = request.getIteratorId();
} else {
long snapshotIndex = request.getLastIncludedIndex() + 1;
JournalKeeperState snapshot = snapshots.get(snapshotIndex);
Snapshot snapshot = snapshots.get(snapshotIndex);
if (null != snapshot) {
ReplicableIterator iterator = snapshot.iterator();
iteratorId = nextSnapshotIteratorId.getAndIncrement();
Expand All @@ -738,7 +746,7 @@ public CompletableFuture<GetServerStateResponse> getServerState(GetServerStateRe
if (null != iterator) {
return new GetServerStateResponse(
iterator.lastIncludedIndex(), iterator.lastIncludedTerm(),
iterator.offset(), iterator.nextTrunk(), iterator.hasMoreTrunks(), iteratorId
iterator.offset(), iterator.nextTrunk(), !iterator.hasMoreTrunks(), iteratorId
);
} else {
throw new NoSuchSnapshotException();
Expand Down Expand Up @@ -986,15 +994,20 @@ private void recoverSnapshots() throws IOException {
entry -> entry.getFileName().toString().matches("\\d+")
).spliterator(), false)
.map(path -> {
JournalKeeperState snapshot = new JournalKeeperState(stateFactory, metadataPersistence);
snapshot.recover(path, properties, true);
if (Long.parseLong(path.getFileName().toString()) == snapshot.lastApplied()) {
return snapshot;
} else {
try {
Snapshot snapshot = new Snapshot(stateFactory, metadataPersistence);
snapshot.recover(path, properties);
if (Long.parseLong(path.getFileName().toString()) == snapshot.lastApplied()) {
return snapshot;
} else {
return null;
}
} catch (Throwable t) {
logger.warn("Recover snapshot {} exception: ", path.toString(), t);
return null;
}
}).filter(Objects::nonNull)
.peek(JournalKeeperState::close)
.peek(Snapshot::close)
.forEach(snapshot -> snapshots.put(snapshot.lastApplied(), snapshot));
}

Expand Down Expand Up @@ -1078,14 +1091,14 @@ private void compactJournalPeriodically() {
private void compactJournalToSnapshot(long index) {
logger.info("Compact journal to index: {}...", index);
try {
JournalKeeperState snapshot = snapshots.get(index);
Snapshot snapshot = snapshots.get(index);
if (null != snapshot) {
JournalSnapshot journalSnapshot = snapshot.getJournalSnapshot();
logger.info("Compact journal entries, journal snapshot: {}, journal: {}...", journalSnapshot, journal);
journal.compact(snapshot.getJournalSnapshot());
logger.info("Compact journal finished, journal: {}.", journal);

NavigableMap<Long, JournalKeeperState> headMap = snapshots.headMap(index, false);
NavigableMap<Long, Snapshot> headMap = snapshots.headMap(index, false);
while (!headMap.isEmpty()) {
snapshot = headMap.remove(headMap.firstKey());
logger.info("Discard snapshot: {}.", snapshot.getPath());
Expand Down Expand Up @@ -1165,15 +1178,15 @@ void installSnapshot(long offset, long lastIncludedIndex, int lastIncludedTerm,
ThreadSafeFormat.formatWithComma(journal.commitIndex())
);

JournalKeeperState snapshot;
Snapshot snapshot;
long lastApplied = lastIncludedIndex + 1;
Path snapshotPath = snapshotsPath().resolve(String.valueOf(lastApplied));
partialSnapshot.installTrunk(offset, data, snapshotPath);

if (isDone) {
logger.info("All snapshot files received, discard any existing snapshot with a same or smaller index...");
// discard any existing snapshot with a same or smaller index
NavigableMap<Long, JournalKeeperState> headMap = snapshots.headMap(lastApplied, true);
NavigableMap<Long, Snapshot> headMap = snapshots.headMap(lastApplied, true);
while (!headMap.isEmpty()) {
snapshot = headMap.remove(headMap.firstKey());
logger.info("Discard snapshot: {}.", snapshot.getPath());
Expand All @@ -1183,8 +1196,8 @@ void installSnapshot(long offset, long lastIncludedIndex, int lastIncludedTerm,
logger.info("add the installed snapshot to snapshots: {}...", snapshotPath);
partialSnapshot.finish();
// add the installed snapshot to snapshots.
snapshot = new JournalKeeperState(stateFactory, metadataPersistence);
snapshot.recover(snapshotPath, properties, true);
snapshot = new Snapshot(stateFactory, metadataPersistence);
snapshot.recover(snapshotPath, properties);
snapshots.put(lastApplied, snapshot);

logger.info("New installed snapshot: {}.", snapshot.getJournalSnapshot());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.journalkeeper.core.api.VoterState;
import io.journalkeeper.core.journal.Journal;
import io.journalkeeper.core.state.JournalKeeperState;
import io.journalkeeper.core.state.Snapshot;
import io.journalkeeper.exceptions.IndexUnderflowException;
import io.journalkeeper.rpc.server.AsyncAppendEntriesRequest;
import io.journalkeeper.rpc.server.AsyncAppendEntriesResponse;
Expand Down Expand Up @@ -53,7 +54,7 @@ class Follower extends ServerStateMachine implements StateServer {
private final int currentTerm;
private final VoterConfigManager voterConfigManager;
private final Threads threads;
private final NavigableMap<Long, JournalKeeperState> snapshots;
private final NavigableMap<Long, Snapshot> snapshots;

/**
* Leader 日志当前的最大位置
Expand All @@ -63,7 +64,7 @@ class Follower extends ServerStateMachine implements StateServer {

private boolean readyForStartPreferredLeaderElection = false;

Follower(Journal journal, JournalKeeperState state, URI serverUri, int currentTerm, VoterConfigManager voterConfigManager, Threads threads, NavigableMap<Long, JournalKeeperState> snapshots, int cachedRequests) {
Follower(Journal journal, JournalKeeperState state, URI serverUri, int currentTerm, VoterConfigManager voterConfigManager, Threads threads, NavigableMap<Long, Snapshot> snapshots, int cachedRequests) {
super(true);
this.state = state;
this.voterConfigManager = voterConfigManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.journalkeeper.core.state.ApplyReservedEntryInterceptor;
import io.journalkeeper.core.state.ConfigState;
import io.journalkeeper.core.state.JournalKeeperState;
import io.journalkeeper.core.state.Snapshot;
import io.journalkeeper.core.transaction.JournalTransactionManager;
import io.journalkeeper.exceptions.IndexUnderflowException;
import io.journalkeeper.exceptions.NotLeaderException;
Expand Down Expand Up @@ -120,7 +121,7 @@ class Leader extends ServerStateMachine implements StateServer {
/**
* 存放节点上所有状态快照的稀疏数组,数组的索引(key)就是快照对应的日志位置的索引
*/
private final Map<Long, JournalKeeperState> immutableSnapshots;
private final Map<Long, Snapshot> immutableSnapshots;

private final URI serverUri;
private final int currentTerm;
Expand All @@ -134,7 +135,7 @@ class Leader extends ServerStateMachine implements StateServer {
private final JournalTransactionManager journalTransactionManager;
private final ApplyReservedEntryInterceptor journalTransactionInterceptor;
private final ApplyInternalEntryInterceptor leaderAnnouncementInterceptor;
private final NavigableMap<Long, JournalKeeperState> snapshots;
private final NavigableMap<Long, Snapshot> snapshots;
private final int snapshotIntervalSec;
private final AtomicBoolean isLeaderAnnouncementApplied = new AtomicBoolean(false);
private final AtomicLong callbackBarrier = new AtomicLong(0L);
Expand All @@ -147,7 +148,7 @@ class Leader extends ServerStateMachine implements StateServer {
private ScheduledFuture takeSnapshotFuture;
private AtomicBoolean isAnyFollowerNextIndexUpdated = new AtomicBoolean(false);

Leader(Journal journal, JournalKeeperState state, Map<Long, JournalKeeperState> immutableSnapshots,
Leader(Journal journal, JournalKeeperState state, Map<Long, Snapshot> immutableSnapshots,
int currentTerm,
URI serverUri,
int cacheRequests, long heartbeatIntervalMs, long rpcTimeoutMs, int replicationBatchSize,
Expand All @@ -159,7 +160,7 @@ class Leader extends ServerStateMachine implements StateServer {
VoterConfigManager voterConfigManager,
MetricProvider metricProvider,
JournalEntryParser journalEntryParser,
long transactionTimeoutMs, NavigableMap<Long, JournalKeeperState> snapshots) {
long transactionTimeoutMs, NavigableMap<Long, Snapshot> snapshots) {

super(true);
this.pendingUpdateStateRequests = new LinkedBlockingQueue<>(cacheRequests);
Expand Down Expand Up @@ -425,7 +426,7 @@ private int getTerm(long index) {
}
}

private void installSnapshot(ReplicationDestination follower, JournalKeeperState snapshot) {
private void installSnapshot(ReplicationDestination follower, Snapshot snapshot) {

try {
logger.info("Install snapshot to {} ...", follower.getUri());
Expand Down Expand Up @@ -820,7 +821,7 @@ private void replication() {
long start = metric == null ? 0L : System.nanoTime();

// 如果有必要,先安装第一个快照
Map.Entry<Long, JournalKeeperState> fistSnapShotEntry = snapshots.firstEntry();
Map.Entry<Long, Snapshot> fistSnapShotEntry = snapshots.firstEntry();
maybeInstallSnapshotFirst(fistSnapShotEntry);

// 读取需要复制的Entry
Expand Down Expand Up @@ -878,7 +879,7 @@ private void replication() {
}
}

private void maybeInstallSnapshotFirst(Map.Entry<Long, JournalKeeperState> fistSnapShotEntry) {
private void maybeInstallSnapshotFirst(Map.Entry<Long, Snapshot> fistSnapShotEntry) {
if (nextIndex <= fistSnapShotEntry.getKey()) {
installSnapshot(this, fistSnapShotEntry.getValue());
nextIndex = fistSnapShotEntry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
*/
package io.journalkeeper.core.server;

import io.journalkeeper.core.state.FolderTrunkIterator;
import io.journalkeeper.exceptions.InstallSnapshotException;
import io.journalkeeper.utils.files.FileUtils;
import org.slf4j.Logger;
Expand All @@ -49,6 +50,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;

/**
*
Expand Down Expand Up @@ -122,22 +124,38 @@ void installTrunk(long offset, byte[] data, Path snapshotPath) throws IOExceptio
Files.createDirectories(filePath.getParent());
}

logger.info("Installing snapshot file: {}...", filePath);

if (offsetOfFile == 0 || Files.size(filePath) == offsetOfFile) {
try (FileOutputStream output = new FileOutputStream(filePath.toFile(), true)) {
output.write(data, buffer.position(), buffer.remaining());
}
this.offset += data.length;
if (offsetOfFile == 0 && isDirectory(buffer)) {
logger.info("Creating snapshot directory: {}...", filePath);
Files.createDirectories(filePath);
} else {
throw new InstallSnapshotException(
String.format(
"Current file size %d should equal trunk offset %d! File: %s",
Files.size(filePath), offsetOfFile, filePath.toString()
)
);
logger.info("Installing snapshot file: {}...", filePath);
if(offsetOfFile == 0 || Files.size(filePath) == offsetOfFile) {
try (FileOutputStream output = new FileOutputStream(filePath.toFile(), true)) {
output.write(data, buffer.position(), buffer.remaining());
}
} else {
throw new InstallSnapshotException(
String.format(
"Current file size %d should equal trunk offset %d! File: %s",
Files.size(filePath), offsetOfFile, filePath.toString()
)
);
}
}
this.offset += data.length;
}

private boolean isDirectory(ByteBuffer buffer) {
if(buffer.remaining() == FolderTrunkIterator.DIRECTORY_MAGIC_CODE.length) {
for (int i = 0; i < FolderTrunkIterator.DIRECTORY_MAGIC_CODE.length; i++) {
if (buffer.get(buffer.position() + i) != FolderTrunkIterator.DIRECTORY_MAGIC_CODE[i]) {
return false;
}
}
return true;
}
return false;
}

void finish() throws IOException {
Expand Down
Loading

0 comments on commit 91f3a6c

Please sign in to comment.