From f0382fbca8bffdab98349c13d5756cd7be328b31 Mon Sep 17 00:00:00 2001 From: Andy Seaborne Date: Mon, 15 Aug 2022 17:36:50 +0100 Subject: [PATCH] GH-171: Only allow on syncrhorization per DeltaConnection at a time. --- .../delta/client/DeltaConnection.java | 136 ++++++++++++------ .../org/seaborne/delta/client/SyncPolicy.java | 8 +- 2 files changed, 97 insertions(+), 47 deletions(-) diff --git a/rdf-delta-client/src/main/java/org/seaborne/delta/client/DeltaConnection.java b/rdf-delta-client/src/main/java/org/seaborne/delta/client/DeltaConnection.java index cd682139e..9ca3b6753 100644 --- a/rdf-delta-client/src/main/java/org/seaborne/delta/client/DeltaConnection.java +++ b/rdf-delta-client/src/main/java/org/seaborne/delta/client/DeltaConnection.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference ; import java.util.function.Consumer ; @@ -87,6 +88,11 @@ public class DeltaConnection implements AutoCloseable { private final SyncPolicy syncPolicy; private final LogLockMgr logLockMgr; + // Synchronization within this DeltaConnection. + private final Object localLock; + + // Indicator of whether a sync is in-progress. + private final AtomicBoolean syncInProgress = new AtomicBoolean(false); // Synchronize patches asynchronously to the caller. // (ExecutorService in case we change the policy details in the future.) @@ -134,6 +140,7 @@ private DeltaConnection(DataState dataState, DatasetGraph basedsg, DeltaLink lin logLockMgr.add(logLock); this.valid = true; + this.localLock = new Object(); this.syncPolicy = syncTxnBegin; if ( basedsg == null ) { this.target = null; @@ -306,25 +313,45 @@ public void txnAbort() { if ( syncPolicy != SyncPolicy.NONE ) { // Run (almost) immediately and then every 5 minutes - scheduledExecutionService.scheduleAtFixedRate(this::asyncOneSync, 0, 5*60, TimeUnit.SECONDS); + scheduledExecutionService.scheduleAtFixedRate(this::oneSyncAttempt, 0, 5*60, TimeUnit.SECONDS); } } /*package*/ void finish() { - /*reset();*/ + if ( isValid() ) { + this.logLockMgr.stop(); + this.shutdownSyncExecutorService(); + this.valid = false; + } + } + + + /** Execute a DeltaConnection sync once. */ + private void oneSyncAttempt() { + trySyncIfAuto(); + } + + /** + * No-op end-to-end operation. This operation succeeds or throws an exception. + * This operation makes one attempt only to perform the ping. + */ + public void ping() { + dLink.ping(); } /** Send a patch to log server. */ - public synchronized void append(RDFPatch patch) { - checkDeltaConnection(); - Version ver = dLink.append(datasourceId, patch); - if ( ! Version.isValid(ver) ) - // Didn't happen. - return ; - Version ver0 = state.version(); - if ( ver0.value() >= ver.value() ) - FmtLog.warn(LOG, "[%s] Version did not advance: %d -> %d", datasourceId.toString(), ver0 , ver); - state.updateState(ver, Id.fromNode(patch.getId())); + public void append(RDFPatch patch) { + synchronized(localLock) { + checkDeltaConnection(); + Version ver = dLink.append(datasourceId, patch); + if ( ! Version.isValid(ver) ) + // Didn't happen. + return ; + Version ver0 = state.version(); + if ( ver0.value() >= ver.value() ) + FmtLog.warn(LOG, "[%s] Version did not advance: %d -> %d", datasourceId.toString(), ver0 , ver); + state.updateState(ver, Id.fromNode(patch.getId())); + } } public RDFPatch fetch(Version version) { @@ -341,6 +368,7 @@ public boolean trySync(PatchLogInfo logInfo) { return attempt(()->sync(logInfo)); } + /** Sync to a specific log state. */ public void sync(PatchLogInfo logInfo) { checkDeltaConnection(); syncToVersion(logInfo.getMaxVersion()); @@ -357,14 +385,6 @@ public boolean trySyncIfAuto() { return trySync(); } - /** - * No-op end-to-end operation. This operation succeeds or throws an exception. - * This operation makes one attempt only to perform the ping. - */ - public void ping() { - dLink.ping(); - } - public void sync() { try { checkDeltaConnection(); @@ -377,43 +397,72 @@ public void sync() { } } - /** Execute a DeltaConnection sync once. */ - private void asyncOneSync() { - trySyncIfAuto(); - } - // Attempt an operation and return true/false as to whether it succeeded or not. private boolean attempt(Runnable action) { try { action.run(); return true ; } catch (RuntimeException ex ) { return false ; } } - /** Sync until some version */ + /** + * Indicator of whether a sync is in-progress on another thread. + * This is not gauranteed to be accurate. + * Synchronization may stil block or it may still skip a sync. + */ + public boolean syncInProgress() { + return syncInProgress.get(); + } + + /** + * Sync until some version. + * sync does not happen if another sync is in-progress. + * This operation takes the connection lock. + * Calls may wish to skip sync() + */ private void syncToVersion(Version version) { - //long remoteVer = getRemoteVersionLatestOrDefault(VERSION_UNSET); if ( ! Version.isValid(version) ) { FmtLog.debug(LOG, "Sync: Asked for no patches to sync"); return; } - - Version localVer = getLocalVersion(); - if ( localVer.isUnset() ) { - FmtLog.warn(LOG, "[%s] Local version is UNSET : sync to %s not done", datasourceId, version); + if ( syncInProgress() ) return; - } + syncToVersion(version, false); + } - if ( localVer.value() > version.value() ) - FmtLog.info(LOG, "[%s] Local version ahead of remote : [local=%d, remote=%d]", datasourceId, getLocalVersion(), getRemoteVersionCached()); - if ( localVer.value() >= version.value() ) - return; - // localVer is not UNSET so next version to fetch is +1 (INIT is version 0) - FmtLog.info(LOG, "[%s:%s] Sync: Versions [%s, %s]", datasourceId, datasourceName, localVer, version); - playPatches(localVer, localVer.value()+1, version.value()) ; - //FmtLog.info(LOG, "Now: Versions [%d, %d]", getLocalVersion(), remoteVer); + /** + * Sync until some version. + * This is the work of sychronization. + * This operation takes the connection lock. + */ + private void syncToVersion(Version version, boolean allowOverlap) { + synchronized(localLock) { + // Inside lock - only one thread. + if ( !allowOverlap && syncInProgress() ) + return ; + try { + syncInProgress.set(true); + Version localVer = getLocalVersion(); + if ( localVer.isUnset() ) { + FmtLog.warn(LOG, "[%s] Local version is UNSET : sync to %s not done", datasourceId, version); + return; + } + if ( localVer.value() > version.value() ) + FmtLog.info(LOG, "[%s] Local version ahead of remote : [local=%d, remote=%d]", datasourceId, getLocalVersion(), getRemoteVersionCached()); + if ( localVer.value() >= version.value() ) + return; + // localVer is not UNSET so next version to fetch is +1 (INIT is version 0) + FmtLog.info(LOG, "[%s:%s] Sync: Versions [%s, %s]", datasourceId, datasourceName, localVer, version); + // This updates the local state. + playPatches(localVer, localVer.value()+1, version.value()) ; + //FmtLog.info(LOG, "Now: Versions [%d, %d]", getLocalVersion(), remoteVer); + } finally { + syncInProgress.set(false); + } + } } - /** Play the patches (range is inclusive at both ends) */ + /** Play the patches (range is inclusive at both ends); set the new local state on exit. */ private void playPatches(Version currentVersion, long firstPatchVer, long lastPatchVer) { + // Inside synchronized of syncToVersion Pair p = play(datasourceId, base, target, dLink, currentVersion, firstPatchVer, lastPatchVer); if ( p == null ) // Didn't make progress for some reason. @@ -474,8 +523,9 @@ private static Pair play(Id datasourceId, DatasetGraph base, RDFC @Override public void close() { - this.logLockMgr.stop(); - this.shutdownSyncExecutorService(); + // Send of try-with-resources block. + // The connection is still usable so don't throw away the + // Call finish() when a connection is not going to be used again. } private void shutdownSyncExecutorService() { diff --git a/rdf-delta-client/src/main/java/org/seaborne/delta/client/SyncPolicy.java b/rdf-delta-client/src/main/java/org/seaborne/delta/client/SyncPolicy.java index 61f6897ca..42e1355d1 100644 --- a/rdf-delta-client/src/main/java/org/seaborne/delta/client/SyncPolicy.java +++ b/rdf-delta-client/src/main/java/org/seaborne/delta/client/SyncPolicy.java @@ -19,12 +19,12 @@ /** * When to synchronize with a patch log. - * {@link DeltaConnection} provide the option of syncing automagtically on transaction begin. - * The application call also call {@link DeltaConnection#sync} itself. + * {@link DeltaConnection} provides the option of syncing automatically on transaction begin. + * The application call also call {@link DeltaConnection#sync} itself. *
    *
  • {@code NONE} No automatic sync, all done by the application. - *
  • {@code TXN_RW} When a transaction starts (sync attempt for a READ transaction suppresses network errors). - *
  • {@code TXN_W} When a write-transaction starts. + *
  • {@code TXN_RW} When a transaction starts (sync attempt for a READ transaction suppresses network errors). + *
  • {@code TXN_W} When a write-transaction starts. *
*/ public enum SyncPolicy { NONE, TXN_RW, TXN_W }