Skip to content

Commit

Permalink
GH-171: Only allow on syncrhorization per DeltaConnection at a time.
Browse files Browse the repository at this point in the history
  • Loading branch information
afs committed Aug 16, 2022
1 parent f6540b5 commit d113b4c
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.function.Consumer ;

Expand Down Expand Up @@ -87,6 +88,11 @@ public class DeltaConnection implements AutoCloseable {
private final SyncPolicy syncPolicy;

private final LogLockMgr logLockMgr;
// Synchronization within this DeltaConnection.
private final Object localLock;

// Indicator of whether a sync is in-progress.
private final AtomicBoolean syncInProgress = new AtomicBoolean(false);

// Synchronize patches asynchronously to the caller.
// (ExecutorService in case we change the policy details in the future.)
Expand Down Expand Up @@ -134,6 +140,7 @@ private DeltaConnection(DataState dataState, DatasetGraph basedsg, DeltaLink lin
logLockMgr.add(logLock);

this.valid = true;
this.localLock = new Object();
this.syncPolicy = syncTxnBegin;
if ( basedsg == null ) {
this.target = null;
Expand Down Expand Up @@ -306,25 +313,38 @@ public void txnAbort() {

if ( syncPolicy != SyncPolicy.NONE ) {
// Run (almost) immediately and then every 5 minutes
scheduledExecutionService.scheduleAtFixedRate(this::asyncOneSync, 0, 5*60, TimeUnit.SECONDS);
scheduledExecutionService.scheduleAtFixedRate(this::oneSyncAttempt, 0, 5*60, TimeUnit.SECONDS);
}
}

/*package*/ void finish() {
/*reset();*/
/*package*/ void finish() { close(); }

/** Execute a DeltaConnection sync once. */
private void oneSyncAttempt() {
trySyncIfAuto();
}

/**
* No-op end-to-end operation. This operation succeeds or throws an exception.
* This operation makes one attempt only to perform the ping.
*/
public void ping() {
dLink.ping();
}

/** Send a patch to log server. */
public synchronized void append(RDFPatch patch) {
checkDeltaConnection();
Version ver = dLink.append(datasourceId, patch);
if ( ! Version.isValid(ver) )
// Didn't happen.
return ;
Version ver0 = state.version();
if ( ver0.value() >= ver.value() )
FmtLog.warn(LOG, "[%s] Version did not advance: %d -> %d", datasourceId.toString(), ver0 , ver);
state.updateState(ver, Id.fromNode(patch.getId()));
public void append(RDFPatch patch) {
synchronized(localLock) {
checkDeltaConnection();
Version ver = dLink.append(datasourceId, patch);
if ( ! Version.isValid(ver) )
// Didn't happen.
return ;
Version ver0 = state.version();
if ( ver0.value() >= ver.value() )
FmtLog.warn(LOG, "[%s] Version did not advance: %d -> %d", datasourceId.toString(), ver0 , ver);
state.updateState(ver, Id.fromNode(patch.getId()));
}
}

public RDFPatch fetch(Version version) {
Expand All @@ -341,6 +361,7 @@ public boolean trySync(PatchLogInfo logInfo) {
return attempt(()->sync(logInfo));
}

/** Sync to a specific log state. */
public void sync(PatchLogInfo logInfo) {
checkDeltaConnection();
syncToVersion(logInfo.getMaxVersion());
Expand All @@ -357,14 +378,6 @@ public boolean trySyncIfAuto() {
return trySync();
}

/**
* No-op end-to-end operation. This operation succeeds or throws an exception.
* This operation makes one attempt only to perform the ping.
*/
public void ping() {
dLink.ping();
}

public void sync() {
try {
checkDeltaConnection();
Expand All @@ -377,43 +390,72 @@ public void sync() {
}
}

/** Execute a DeltaConnection sync once. */
private void asyncOneSync() {
trySyncIfAuto();
}

// Attempt an operation and return true/false as to whether it succeeded or not.
private boolean attempt(Runnable action) {
try { action.run(); return true ; }
catch (RuntimeException ex ) { return false ; }
}

/** Sync until some version */
/**
* Indicator of whether a sync is in-progress on another thread.
* This is not gauranteed to be accurate.
* Synchronization may stil block or it may still skip a sync.
*/
public boolean syncInProgress() {
return syncInProgress.get();
}

/**
* Sync until some version.
* sync does not happen if another sync is in-progress.
* This operation takes the connection lock.
* Calls may wish to skip sync()
*/
private void syncToVersion(Version version) {
//long remoteVer = getRemoteVersionLatestOrDefault(VERSION_UNSET);
if ( ! Version.isValid(version) ) {
FmtLog.debug(LOG, "Sync: Asked for no patches to sync");
return;
}

Version localVer = getLocalVersion();
if ( localVer.isUnset() ) {
FmtLog.warn(LOG, "[%s] Local version is UNSET : sync to %s not done", datasourceId, version);
if ( syncInProgress() )
return;
}
syncToVersion(version, false);
}

if ( localVer.value() > version.value() )
FmtLog.info(LOG, "[%s] Local version ahead of remote : [local=%d, remote=%d]", datasourceId, getLocalVersion(), getRemoteVersionCached());
if ( localVer.value() >= version.value() )
return;
// localVer is not UNSET so next version to fetch is +1 (INIT is version 0)
FmtLog.info(LOG, "[%s:%s] Sync: Versions [%s, %s]", datasourceId, datasourceName, localVer, version);
playPatches(localVer, localVer.value()+1, version.value()) ;
//FmtLog.info(LOG, "Now: Versions [%d, %d]", getLocalVersion(), remoteVer);
/**
* Sync until some version.
* This is the work of sychronization.
* This operation takes the connection lock.
*/
private void syncToVersion(Version version, boolean allowOverlap) {
synchronized(localLock) {
// Inside lock - only one thread.
if ( !allowOverlap && syncInProgress() )
return ;
try {
syncInProgress.set(true);
Version localVer = getLocalVersion();
if ( localVer.isUnset() ) {
FmtLog.warn(LOG, "[%s] Local version is UNSET : sync to %s not done", datasourceId, version);
return;
}
if ( localVer.value() > version.value() )
FmtLog.info(LOG, "[%s] Local version ahead of remote : [local=%d, remote=%d]", datasourceId, getLocalVersion(), getRemoteVersionCached());
if ( localVer.value() >= version.value() )
return;
// localVer is not UNSET so next version to fetch is +1 (INIT is version 0)
FmtLog.info(LOG, "[%s:%s] Sync: Versions [%s, %s]", datasourceId, datasourceName, localVer, version);
// This updates the local state.
playPatches(localVer, localVer.value()+1, version.value()) ;
//FmtLog.info(LOG, "Now: Versions [%d, %d]", getLocalVersion(), remoteVer);
} finally {
syncInProgress.set(false);
}
}
}

/** Play the patches (range is inclusive at both ends) */
/** Play the patches (range is inclusive at both ends); set the new local state on exit. */
private void playPatches(Version currentVersion, long firstPatchVer, long lastPatchVer) {
// Inside synchronized of syncToVersion
Pair<Version, Node> p = play(datasourceId, base, target, dLink, currentVersion, firstPatchVer, lastPatchVer);
if ( p == null )
// Didn't make progress for some reason.
Expand Down Expand Up @@ -474,8 +516,11 @@ private static Pair<Version, Node> play(Id datasourceId, DatasetGraph base, RDFC

@Override
public void close() {
this.logLockMgr.stop();
this.shutdownSyncExecutorService();
if ( isValid() ) {
this.logLockMgr.stop();
this.shutdownSyncExecutorService();
this.valid = false;
}
}

private void shutdownSyncExecutorService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

/**
* When to synchronize with a patch log.
* {@link DeltaConnection} provide the option of syncing automagtically on transaction begin.
* The application call also call {@link DeltaConnection#sync} itself.
* {@link DeltaConnection} provides the option of syncing automatically on transaction begin.
* The application call also call {@link DeltaConnection#sync} itself.
* <ul>
* <li>{@code NONE} No automatic sync, all done by the application.
* <li>{@code TXN_RW} When a transaction starts (sync attempt for a READ transaction suppresses network errors).
* <li>{@code TXN_W} When a write-transaction starts.
* <li>{@code TXN_RW} When a transaction starts (sync attempt for a READ transaction suppresses network errors).
* <li>{@code TXN_W} When a write-transaction starts.
* </ul>
*/
public enum SyncPolicy { NONE, TXN_RW, TXN_W }

0 comments on commit d113b4c

Please sign in to comment.