diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/MessageReader.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/MessageReader.java index 5659e58cb8..4feac2ed37 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/MessageReader.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/MessageReader.java @@ -27,6 +27,11 @@ public interface MessageReader { */ public void read(Message message); + /** + * Updates the state of the MessageReader. + */ + public void updateState(); + /** * Need to override this method because the {@link Log} uses this comparison * when unregistering readers diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/kcvs/KCVSLog.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/kcvs/KCVSLog.java index 995bc6e7c4..5e31723093 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/kcvs/KCVSLog.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/kcvs/KCVSLog.java @@ -20,6 +20,7 @@ import org.janusgraph.diskstorage.*; import org.janusgraph.diskstorage.util.time.*; +import org.janusgraph.graphdb.database.management.ManagementLogger; import org.janusgraph.diskstorage.configuration.ConfigOption; import org.janusgraph.diskstorage.configuration.Configuration; import org.janusgraph.diskstorage.keycolumnvalue.*; @@ -636,6 +637,11 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { pos++; } } + readExecutor.scheduleWithFixedDelay( + new MessageReaderStateUpdater(), + INITIAL_READER_DELAY.toNanos(), + readPollingInterval.toNanos(), + TimeUnit.NANOSECONDS); } } @@ -645,6 +651,15 @@ public synchronized boolean unregisterReader(MessageReader reader) { return this.readers.remove(reader); } + private class MessageReaderStateUpdater implements Runnable { + @Override + public void run() { + for (MessageReader reader : readers) { + reader.updateState(); + } + } + } + /** * Thread which runs to read all messages from a particular partition id and bucket up to the next timeslice * or current timestamp minus the configured read lag time {@link #LOG_READ_LAG_TIME}. diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java index 7f2d9b4c0c..33d156ffcc 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java @@ -154,11 +154,20 @@ public class GraphDatabaseConfiguration { public static final ConfigOption UNIQUE_INSTANCE_ID_SUFFIX = new ConfigOption(GRAPH_NS,"unique-instance-id-suffix", "When this is set and " + UNIQUE_INSTANCE_ID.getName() + " is not, this JanusGraph " + - "instance's unique identifier is generated by concatenating the hostname to the " + - "provided number. This is a legacy option which is currently only useful if the JVM's " + - "ManagementFactory.getRuntimeMXBean().getName() is not unique between processes.", + "instance's unique identifier is generated by concatenating the hex encoded hostname to the " + + "provided number.", ConfigOption.Type.LOCAL, Short.class); + public static final ConfigOption UNIQUE_INSTANCE_ID_HOSTNAME = new ConfigOption(GRAPH_NS,"use-hostname-for-unique-instance-id", + "When this is set, this JanusGraph's unique instance identifier is set to the hostname. If " + UNIQUE_INSTANCE_ID_SUFFIX.getName() + + " is also set, then the identifier is set to .", + ConfigOption.Type.LOCAL, Boolean.class, false); + + public static final ConfigOption REPLACE_INSTANCE_IF_EXISTS = new ConfigOption(GRAPH_NS,"replace-instance-if-exists", + "If a JanusGraph instance with the same instance identifier already exists, the usage of this " + + "configuration option results in the opening of this graph anwyay.", + ConfigOption.Type.LOCAL, Boolean.class, false); + public static final ConfigOption INITIAL_JANUSGRAPH_VERSION = new ConfigOption(GRAPH_NS,"janusgraph-version", "The version of JanusGraph with which this database was created. Automatically set on first start. Don't manually set this property.", ConfigOption.Type.FIXED, String.class).hide(); @@ -1580,27 +1589,50 @@ public boolean apply(@Nullable Map.Entry e private static final AtomicLong INSTANCE_COUNTER = new AtomicLong(0); - private static String computeUniqueInstanceId(Configuration config) { + private static String getSuffix(Configuration config) { final String suffix; - if (config.has(GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_SUFFIX)) { suffix = LongEncoding.encode(config.get( GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_SUFFIX)); - } else { + } else if (!config.has(GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_HOSTNAME)) { suffix = ManagementFactory.getRuntimeMXBean().getName() + LongEncoding.encode(INSTANCE_COUNTER.incrementAndGet()); + } else { + suffix = ""; } + return suffix; + } - byte[] addrBytes; - try { - addrBytes = Inet4Address.getLocalHost().getAddress(); - } catch (UnknownHostException e) { - throw new JanusGraphConfigurationException("Cannot determine local host", e); + private static String getUid(Configuration config) { + final String localHostErrMsg = "Cannot determine local host"; + final String uid; + if (config.has(GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_HOSTNAME) + && config.get(GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_HOSTNAME)) { + try { + uid = Inet4Address.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new JanusGraphConfigurationException(localHostErrMsg, e); + } + } else { + final byte[] addrBytes; + try { + addrBytes = Inet4Address.getLocalHost().getAddress(); + } catch (UnknownHostException e) { + throw new JanusGraphConfigurationException(localHostErrMsg, e); + } + uid = new String(Hex.encodeHex(addrBytes)); } - String uid = new String(Hex.encodeHex(addrBytes)) + suffix; + return uid; + + } + + private static String computeUniqueInstanceId(Configuration config) { + final String suffix = getSuffix(config); + final String uid = getUid(config); + String instanceId = uid + suffix; for (char c : ConfigElement.ILLEGAL_CHARS) { - uid = StringUtils.replaceChars(uid,c,'-'); + instanceId = StringUtils.replaceChars(instanceId,c,'-'); } - return uid; + return instanceId; } public static String getOrGenerateUniqueInstanceId(Configuration config) { diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java index 8b9f628dca..3fa9e27ca8 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/StandardJanusGraph.java @@ -69,6 +69,9 @@ import org.janusgraph.graphdb.util.ExceptionFactory; import org.janusgraph.util.system.IOUtils; import org.janusgraph.util.system.TXUtils; + +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REPLACE_INSTANCE_IF_EXISTS; + import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -157,8 +160,12 @@ public StandardJanusGraph(GraphDatabaseConfiguration configuration) { //Register instance and ensure uniqueness String uniqueInstanceId = configuration.getUniqueGraphId(); ModifiableConfiguration globalConfig = GraphDatabaseConfiguration.getGlobalSystemConfig(backend); - if (globalConfig.has(REGISTRATION_TIME, uniqueInstanceId)) { + final boolean instanceExists = globalConfig.has(REGISTRATION_TIME, uniqueInstanceId); + final boolean replaceExistingInstance = configuration.getConfiguration().get(REPLACE_INSTANCE_IF_EXISTS); + if (instanceExists && !replaceExistingInstance) { throw new JanusGraphException(String.format("A JanusGraph graph with the same instance id [%s] is already open. Might required forced shutdown.", uniqueInstanceId)); + } else if (instanceExists && replaceExistingInstance) { + log.debug(String.format("Instance [%s] already exists. Opening the graph per " + REPLACE_INSTANCE_IF_EXISTS.getName() + " configuration.", uniqueInstanceId)); } globalConfig.set(REGISTRATION_TIME, times.getTime(), uniqueInstanceId); diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementLogger.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementLogger.java index d0fcb3eafd..bf270b5d60 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementLogger.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementLogger.java @@ -17,6 +17,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import org.janusgraph.core.JanusGraphTransaction; +import org.janusgraph.core.schema.JanusGraphManagement; import org.janusgraph.diskstorage.ResourceUnavailableException; import org.janusgraph.diskstorage.util.time.Timer; @@ -110,7 +111,7 @@ public void sendCacheEviction(Set updatedTypes, Set openInstances) { Preconditions.checkArgument(!openInstances.isEmpty()); long evictionId = evictionTriggerCounter.incrementAndGet(); - evictionTriggerMap.put(evictionId,new EvictionTrigger(evictionId,updatedTypeTriggers,openInstances)); + evictionTriggerMap.put(evictionId,new EvictionTrigger(evictionId,updatedTypeTriggers,graph)); DataOutput out = graph.getDataSerializer().getDataOutput(128); out.writeObjectNotNull(MgmtLogType.CACHED_TYPE_EVICTION); VariableLong.writePositive(out,evictionId); @@ -122,38 +123,64 @@ public void sendCacheEviction(Set updatedTypes, sysLog.add(out.getStaticBuffer()); } + @Override + public void updateState() { + evictionTriggerMap.forEach((k, v) -> { + final int ackCounter = v.removeDroppedInstances(); + if (ackCounter == 0) { + v.runTriggers(); + } + }); + } + private class EvictionTrigger { final long evictionId; final List> updatedTypeTriggers; - final ImmutableSet openInstances; - final AtomicInteger ackCounter; + final StandardJanusGraph graph; + final Set instancesToBeAcknowledged; - private EvictionTrigger(long evictionId, List> updatedTypeTriggers, Set openInstances) { + private EvictionTrigger(long evictionId, List> updatedTypeTriggers, StandardJanusGraph graph) { + this.graph = graph; this.evictionId = evictionId; this.updatedTypeTriggers = updatedTypeTriggers; - this.openInstances = ImmutableSet.copyOf(openInstances); - this.ackCounter = new AtomicInteger(openInstances.size()); + final JanusGraphManagement mgmt = graph.openManagement(); + this.instancesToBeAcknowledged = ConcurrentHashMap.newKeySet(); + ((ManagementSystem) mgmt).getOpenInstancesInternal().forEach(instancesToBeAcknowledged::add); + mgmt.rollback(); } void receivedAcknowledgement(String senderId) { - if (openInstances.contains(senderId)) { - int countdown = ackCounter.decrementAndGet(); + if (instancesToBeAcknowledged.remove(senderId)) { + final int ackCounter = instancesToBeAcknowledged.size(); log.debug("Received acknowledgement for eviction [{}] from senderID={} ({} more acks still outstanding)", - evictionId, senderId, countdown); - if (countdown==0) { //Trigger actions - for (Callable trigger : updatedTypeTriggers) { - try { - boolean success = trigger.call(); - assert success; - } catch (Throwable e) { - log.error("Could not execute trigger ["+trigger.toString()+"] for eviction ["+evictionId+"]",e); - } - } - log.info("Received all acknowledgements for eviction [{}]",evictionId); - evictionTriggerMap.remove(evictionId,this); + evictionId, senderId, ackCounter); + if (ackCounter == 0) { + runTriggers(); + } + } + } + + void runTriggers() { + for (Callable trigger : updatedTypeTriggers) { + try { + final boolean success = trigger.call(); + assert success; + } catch (Throwable e) { + log.error("Could not execute trigger ["+trigger.toString()+"] for eviction ["+evictionId+"]",e); } } + log.info("Received all acknowledgements for eviction [{}]",evictionId); + evictionTriggerMap.remove(evictionId,this); + } + + int removeDroppedInstances() { + final JanusGraphManagement mgmt = graph.openManagement(); + final Set updatedInstances = ((ManagementSystem) mgmt).getOpenInstancesInternal(); + final String instanceRemovedMsg = "Instance [{}] was removed list of open instances and therefore dropped from list of instances to be acknowledged."; + instancesToBeAcknowledged.stream().filter(it -> !updatedInstances.contains(it)).filter(instancesToBeAcknowledged::remove).forEach(it -> log.debug(instanceRemovedMsg, it)); + mgmt.rollback(); + return instancesToBeAcknowledged.size(); } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/StandardLogProcessorFramework.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/StandardLogProcessorFramework.java index a71075bb99..31e490b2bd 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/StandardLogProcessorFramework.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/StandardLogProcessorFramework.java @@ -265,6 +265,9 @@ public void read(Message message) { } } } + + @Override + public void updateState() {} } } diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/StandardTransactionLogProcessor.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/StandardTransactionLogProcessor.java index 9c4078e89c..6ebd0eb60e 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/StandardTransactionLogProcessor.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/log/StandardTransactionLogProcessor.java @@ -350,7 +350,8 @@ public void read(Message message) { entry.update(txentry); } - + @Override + public void updateState() {} } private class TxEntry { diff --git a/janusgraph-test/src/main/java/org/janusgraph/diskstorage/log/LogTest.java b/janusgraph-test/src/main/java/org/janusgraph/diskstorage/log/LogTest.java index 69875b0aeb..2bf38a5b2f 100644 --- a/janusgraph-test/src/main/java/org/janusgraph/diskstorage/log/LogTest.java +++ b/janusgraph-test/src/main/java/org/janusgraph/diskstorage/log/LogTest.java @@ -303,6 +303,9 @@ public final void read(Message message) { latch.countDown(); } + @Override + public void updateState() {} + /** * Subclasses can override this method to perform additional processing on the message. */ diff --git a/janusgraph-test/src/main/java/org/janusgraph/graphdb/GraphDatabaseConfigurationInstanceIdTest.java b/janusgraph-test/src/main/java/org/janusgraph/graphdb/GraphDatabaseConfigurationInstanceIdTest.java new file mode 100644 index 0000000000..94f633f654 --- /dev/null +++ b/janusgraph-test/src/main/java/org/janusgraph/graphdb/GraphDatabaseConfigurationInstanceIdTest.java @@ -0,0 +1,109 @@ +// Copyright 2017 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.graphdb; + +import org.janusgraph.core.JanusGraphException; +import org.janusgraph.graphdb.database.StandardJanusGraph; +import org.janusgraph.diskstorage.configuration.backend.CommonsConfiguration; +import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_BACKEND; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_HOSTNAME; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_SUFFIX; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REPLACE_INSTANCE_IF_EXISTS; + +import org.apache.commons.configuration.MapConfiguration; + +import java.net.Inet4Address; +import java.net.UnknownHostException; +import java.util.Map; +import java.util.HashMap; +import org.junit.Test; +import org.junit.Rule; +import org.junit.rules.ExpectedException; +import static org.junit.Assert.*; +import static org.hamcrest.Matchers.equalTo; + +public class GraphDatabaseConfigurationInstanceIdTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void graphShouldOpenWithSameInstanceId() { + final Map map = new HashMap(); + map.put(STORAGE_BACKEND.toStringWithoutRoot(), "inmemory"); + map.put(UNIQUE_INSTANCE_ID.toStringWithoutRoot(), "not-unique"); + map.put(REPLACE_INSTANCE_IF_EXISTS.toStringWithoutRoot(), true); + final MapConfiguration config = new MapConfiguration(map); + final StandardJanusGraph graph1 = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config))); + + assertEquals(graph1.openManagement().getOpenInstances().size(), 1); + assertEquals(graph1.openManagement().getOpenInstances().toArray()[0], "not-unique"); + + final StandardJanusGraph graph2 = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config))); + + assertEquals(graph1.openManagement().getOpenInstances().size(), 1); + assertEquals(graph1.openManagement().getOpenInstances().toArray()[0], "not-unique"); + assertEquals(graph2.openManagement().getOpenInstances().size(), 1); + assertEquals(graph2.openManagement().getOpenInstances().toArray()[0], "not-unique"); + graph1.close(); + graph2.close(); + } + + @Test + public void graphShouldNotOpenWithSameInstanceId() { + final Map map = new HashMap(); + map.put(STORAGE_BACKEND.toStringWithoutRoot(), "inmemory"); + map.put(UNIQUE_INSTANCE_ID.toStringWithoutRoot(), "not-unique"); + final MapConfiguration config = new MapConfiguration(map); + final StandardJanusGraph graph1 = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config))); + + assertEquals(graph1.openManagement().getOpenInstances().size(), 1); + assertEquals(graph1.openManagement().getOpenInstances().toArray()[0], "not-unique"); + + thrown.expect(JanusGraphException.class); + final String err = "A JanusGraph graph with the same instance id [not-unique] is already open. Might required forced shutdown."; + thrown.expectMessage(equalTo(err)); + final StandardJanusGraph graph2 = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config))); + + graph1.close(); + } + + @Test + public void instanceIdShouldEqualHostname() throws UnknownHostException { + final Map map = new HashMap(); + map.put(STORAGE_BACKEND.toStringWithoutRoot(), "inmemory"); + map.put(UNIQUE_INSTANCE_ID_HOSTNAME.toStringWithoutRoot(), true); + final MapConfiguration config = new MapConfiguration(map); + final StandardJanusGraph graph = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config))); + assertEquals(graph.openManagement().getOpenInstances().size(), 1); + assertEquals(graph.openManagement().getOpenInstances().toArray()[0], Inet4Address.getLocalHost().getHostName()); + graph.close(); + } + + @Test + public void instanceIdShouldEqualHostnamePlusSuffix() throws UnknownHostException { + final Map map = new HashMap(); + map.put(STORAGE_BACKEND.toStringWithoutRoot(), "inmemory"); + map.put(UNIQUE_INSTANCE_ID_HOSTNAME.toStringWithoutRoot(), true); + map.put(UNIQUE_INSTANCE_ID_SUFFIX.toStringWithoutRoot(), 1); + final MapConfiguration config = new MapConfiguration(map); + final StandardJanusGraph graph = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config))); + assertEquals(graph.openManagement().getOpenInstances().size(), 1); + assertEquals(graph.openManagement().getOpenInstances().toArray()[0], Inet4Address.getLocalHost().getHostName() + "1"); + graph.close(); + } +} diff --git a/janusgraph-test/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java b/janusgraph-test/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java index aa1a67b7e6..0f85432687 100644 --- a/janusgraph-test/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java +++ b/janusgraph-test/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java @@ -1764,6 +1764,66 @@ public void testIndexUpdateSyncWithMultipleInstances() throws InterruptedExcepti } + @Category({BrittleTests.class}) + @Test + public void testIndexShouldRegisterWhenWeRemoveAnInstance() throws InterruptedException { + clopen(option(LOG_SEND_DELAY, MANAGEMENT_LOG), Duration.ofMillis(0), + option(KCVSLog.LOG_READ_LAG_TIME, MANAGEMENT_LOG), Duration.ofMillis(50), + option(LOG_READ_INTERVAL, MANAGEMENT_LOG), Duration.ofMillis(250) + ); + + StandardJanusGraph graph2 = (StandardJanusGraph) JanusGraphFactory.open(config); + JanusGraphTransaction tx2; + + mgmt.makePropertyKey("name").dataType(String.class).make(); + finishSchema(); + + tx.addVertex("name", "v1"); + newTx(); + evaluateQuery(tx.query().has("name", "v1"), ElementCategory.VERTEX, 1, new boolean[]{false, true}); + tx2 = graph2.newTransaction(); + evaluateQuery(tx2.query().has("name", "v1"), ElementCategory.VERTEX, 1, new boolean[]{false, true}); + //Leave tx2 open to delay acknowledgement + + mgmt.buildIndex("theIndex", Vertex.class).addKey(mgmt.getPropertyKey("name")).buildCompositeIndex(); + mgmt.commit(); + + JanusGraphTransaction tx3 = graph2.newTransaction(); + tx3.addVertex("name", "v2"); + tx3.commit(); + newTx(); + tx.addVertex("name", "v3"); + tx.commit(); + + finishSchema(); + try { + mgmt.updateIndex(mgmt.getGraphIndex("theIndex"), SchemaAction.ENABLE_INDEX); + fail(); //Open tx2 should not make this possible + } catch (IllegalArgumentException e) { + } + finishSchema(); + + //close second graph instance, so index can move to REGISTERED + Set openInstances = mgmt.getOpenInstances(); + assertEquals(2, openInstances.size()); + assertTrue(openInstances.contains(graph.getConfiguration().getUniqueGraphId() + "(current)")); + assertTrue(openInstances.contains(graph2.getConfiguration().getUniqueGraphId())); + try { + mgmt.forceCloseInstance(graph.getConfiguration().getUniqueGraphId()); + fail(); //Cannot close current instance + } catch (IllegalArgumentException e) { + } + mgmt.forceCloseInstance(graph2.getConfiguration().getUniqueGraphId()); + + mgmt.commit(); + assertTrue(ManagementSystem.awaitGraphIndexStatus(graph, "theIndex").status(SchemaStatus.REGISTERED) + .timeout(TestGraphConfigs.getSchemaConvergenceTime(ChronoUnit.SECONDS), ChronoUnit.SECONDS) + .call().getSucceeded()); + finishSchema(); + mgmt.updateIndex(mgmt.getGraphIndex("theIndex"), SchemaAction.ENABLE_INDEX); + finishSchema(); + } + /* ================================================================================== ADVANCED ==================================================================================*/ @@ -3745,6 +3805,8 @@ public void simpleLogTest(final boolean withLogFailure) throws InterruptedExcept for (LogTxStatus status : LogTxStatus.values()) txMsgCounter.put(status, new AtomicInteger(0)); final AtomicInteger userlogMeta = new AtomicInteger(0); txlog.registerReader(startMarker, new MessageReader() { + @Override public void updateState() {} + @Override public void read(Message message) { Instant msgTime = message.getTimestamp(); @@ -3784,6 +3846,9 @@ public void read(Message message) { for (Change change : Change.values()) userChangeCounter.put(change, new AtomicInteger(0)); final AtomicInteger userLogMsgCounter = new AtomicInteger(0); userLog.registerReader(startMarker, new MessageReader() { + @Override + public void updateState() {} + @Override public void read(Message message) { Instant msgTime = message.getTimestamp();