Skip to content

Commit

Permalink
Merge pull request JanusGraph#717 from dpitera/sync-graph-instances-716
Browse files Browse the repository at this point in the history
Sync EvictionTrigger's openInstances on schedule
  • Loading branch information
dpitera authored Feb 22, 2018
2 parents 1863521 + fbce02c commit 894488c
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public interface MessageReader {
*/
public void read(Message message);

/**
* Updates the state of the MessageReader.
*/
public void updateState();

/**
* Need to override this method because the {@link Log} uses this comparison
* when unregistering readers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.janusgraph.diskstorage.*;
import org.janusgraph.diskstorage.util.time.*;

import org.janusgraph.graphdb.database.management.ManagementLogger;
import org.janusgraph.diskstorage.configuration.ConfigOption;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.keycolumnvalue.*;
Expand Down Expand Up @@ -636,6 +637,11 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
pos++;
}
}
readExecutor.scheduleWithFixedDelay(
new MessageReaderStateUpdater(),
INITIAL_READER_DELAY.toNanos(),
readPollingInterval.toNanos(),
TimeUnit.NANOSECONDS);
}
}

Expand All @@ -645,6 +651,15 @@ public synchronized boolean unregisterReader(MessageReader reader) {
return this.readers.remove(reader);
}

private class MessageReaderStateUpdater implements Runnable {
@Override
public void run() {
for (MessageReader reader : readers) {
reader.updateState();
}
}
}

/**
* Thread which runs to read all messages from a particular partition id and bucket up to the next timeslice
* or current timestamp minus the configured read lag time {@link #LOG_READ_LAG_TIME}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,20 @@ public class GraphDatabaseConfiguration {

public static final ConfigOption<Short> UNIQUE_INSTANCE_ID_SUFFIX = new ConfigOption<Short>(GRAPH_NS,"unique-instance-id-suffix",
"When this is set and " + UNIQUE_INSTANCE_ID.getName() + " is not, this JanusGraph " +
"instance's unique identifier is generated by concatenating the hostname to the " +
"provided number. This is a legacy option which is currently only useful if the JVM's " +
"ManagementFactory.getRuntimeMXBean().getName() is not unique between processes.",
"instance's unique identifier is generated by concatenating the hex encoded hostname to the " +
"provided number.",
ConfigOption.Type.LOCAL, Short.class);

public static final ConfigOption<Boolean> UNIQUE_INSTANCE_ID_HOSTNAME = new ConfigOption<Boolean>(GRAPH_NS,"use-hostname-for-unique-instance-id",
"When this is set, this JanusGraph's unique instance identifier is set to the hostname. If " + UNIQUE_INSTANCE_ID_SUFFIX.getName() +
" is also set, then the identifier is set to <hostname><suffix>.",
ConfigOption.Type.LOCAL, Boolean.class, false);

public static final ConfigOption<Boolean> REPLACE_INSTANCE_IF_EXISTS = new ConfigOption<Boolean>(GRAPH_NS,"replace-instance-if-exists",
"If a JanusGraph instance with the same instance identifier already exists, the usage of this " +
"configuration option results in the opening of this graph anwyay.",
ConfigOption.Type.LOCAL, Boolean.class, false);

public static final ConfigOption<String> INITIAL_JANUSGRAPH_VERSION = new ConfigOption<String>(GRAPH_NS,"janusgraph-version",
"The version of JanusGraph with which this database was created. Automatically set on first start. Don't manually set this property.",
ConfigOption.Type.FIXED, String.class).hide();
Expand Down Expand Up @@ -1580,27 +1589,50 @@ public boolean apply(@Nullable Map.Entry<ConfigElement.PathIdentifier, Object> e

private static final AtomicLong INSTANCE_COUNTER = new AtomicLong(0);

private static String computeUniqueInstanceId(Configuration config) {
private static String getSuffix(Configuration config) {
final String suffix;

if (config.has(GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_SUFFIX)) {
suffix = LongEncoding.encode(config.get(
GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_SUFFIX));
} else {
} else if (!config.has(GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_HOSTNAME)) {
suffix = ManagementFactory.getRuntimeMXBean().getName() + LongEncoding.encode(INSTANCE_COUNTER.incrementAndGet());
} else {
suffix = "";
}
return suffix;
}

byte[] addrBytes;
try {
addrBytes = Inet4Address.getLocalHost().getAddress();
} catch (UnknownHostException e) {
throw new JanusGraphConfigurationException("Cannot determine local host", e);
private static String getUid(Configuration config) {
final String localHostErrMsg = "Cannot determine local host";
final String uid;
if (config.has(GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_HOSTNAME)
&& config.get(GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_HOSTNAME)) {
try {
uid = Inet4Address.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new JanusGraphConfigurationException(localHostErrMsg, e);
}
} else {
final byte[] addrBytes;
try {
addrBytes = Inet4Address.getLocalHost().getAddress();
} catch (UnknownHostException e) {
throw new JanusGraphConfigurationException(localHostErrMsg, e);
}
uid = new String(Hex.encodeHex(addrBytes));
}
String uid = new String(Hex.encodeHex(addrBytes)) + suffix;
return uid;

}

private static String computeUniqueInstanceId(Configuration config) {
final String suffix = getSuffix(config);
final String uid = getUid(config);
String instanceId = uid + suffix;
for (char c : ConfigElement.ILLEGAL_CHARS) {
uid = StringUtils.replaceChars(uid,c,'-');
instanceId = StringUtils.replaceChars(instanceId,c,'-');
}
return uid;
return instanceId;
}

public static String getOrGenerateUniqueInstanceId(Configuration config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
import org.janusgraph.graphdb.util.ExceptionFactory;
import org.janusgraph.util.system.IOUtils;
import org.janusgraph.util.system.TXUtils;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REPLACE_INSTANCE_IF_EXISTS;

import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Graph;
Expand Down Expand Up @@ -157,8 +160,12 @@ public StandardJanusGraph(GraphDatabaseConfiguration configuration) {
//Register instance and ensure uniqueness
String uniqueInstanceId = configuration.getUniqueGraphId();
ModifiableConfiguration globalConfig = GraphDatabaseConfiguration.getGlobalSystemConfig(backend);
if (globalConfig.has(REGISTRATION_TIME, uniqueInstanceId)) {
final boolean instanceExists = globalConfig.has(REGISTRATION_TIME, uniqueInstanceId);
final boolean replaceExistingInstance = configuration.getConfiguration().get(REPLACE_INSTANCE_IF_EXISTS);
if (instanceExists && !replaceExistingInstance) {
throw new JanusGraphException(String.format("A JanusGraph graph with the same instance id [%s] is already open. Might required forced shutdown.", uniqueInstanceId));
} else if (instanceExists && replaceExistingInstance) {
log.debug(String.format("Instance [%s] already exists. Opening the graph per " + REPLACE_INSTANCE_IF_EXISTS.getName() + " configuration.", uniqueInstanceId));
}
globalConfig.set(REGISTRATION_TIME, times.getTime(), uniqueInstanceId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.janusgraph.core.JanusGraphTransaction;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.diskstorage.ResourceUnavailableException;

import org.janusgraph.diskstorage.util.time.Timer;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void sendCacheEviction(Set<JanusGraphSchemaVertex> updatedTypes,
Set<String> openInstances) {
Preconditions.checkArgument(!openInstances.isEmpty());
long evictionId = evictionTriggerCounter.incrementAndGet();
evictionTriggerMap.put(evictionId,new EvictionTrigger(evictionId,updatedTypeTriggers,openInstances));
evictionTriggerMap.put(evictionId,new EvictionTrigger(evictionId,updatedTypeTriggers,graph));
DataOutput out = graph.getDataSerializer().getDataOutput(128);
out.writeObjectNotNull(MgmtLogType.CACHED_TYPE_EVICTION);
VariableLong.writePositive(out,evictionId);
Expand All @@ -122,38 +123,64 @@ public void sendCacheEviction(Set<JanusGraphSchemaVertex> updatedTypes,
sysLog.add(out.getStaticBuffer());
}

@Override
public void updateState() {
evictionTriggerMap.forEach((k, v) -> {
final int ackCounter = v.removeDroppedInstances();
if (ackCounter == 0) {
v.runTriggers();
}
});
}

private class EvictionTrigger {

final long evictionId;
final List<Callable<Boolean>> updatedTypeTriggers;
final ImmutableSet<String> openInstances;
final AtomicInteger ackCounter;
final StandardJanusGraph graph;
final Set<String> instancesToBeAcknowledged;

private EvictionTrigger(long evictionId, List<Callable<Boolean>> updatedTypeTriggers, Set<String> openInstances) {
private EvictionTrigger(long evictionId, List<Callable<Boolean>> updatedTypeTriggers, StandardJanusGraph graph) {
this.graph = graph;
this.evictionId = evictionId;
this.updatedTypeTriggers = updatedTypeTriggers;
this.openInstances = ImmutableSet.copyOf(openInstances);
this.ackCounter = new AtomicInteger(openInstances.size());
final JanusGraphManagement mgmt = graph.openManagement();
this.instancesToBeAcknowledged = ConcurrentHashMap.newKeySet();
((ManagementSystem) mgmt).getOpenInstancesInternal().forEach(instancesToBeAcknowledged::add);
mgmt.rollback();
}

void receivedAcknowledgement(String senderId) {
if (openInstances.contains(senderId)) {
int countdown = ackCounter.decrementAndGet();
if (instancesToBeAcknowledged.remove(senderId)) {
final int ackCounter = instancesToBeAcknowledged.size();
log.debug("Received acknowledgement for eviction [{}] from senderID={} ({} more acks still outstanding)",
evictionId, senderId, countdown);
if (countdown==0) { //Trigger actions
for (Callable<Boolean> trigger : updatedTypeTriggers) {
try {
boolean success = trigger.call();
assert success;
} catch (Throwable e) {
log.error("Could not execute trigger ["+trigger.toString()+"] for eviction ["+evictionId+"]",e);
}
}
log.info("Received all acknowledgements for eviction [{}]",evictionId);
evictionTriggerMap.remove(evictionId,this);
evictionId, senderId, ackCounter);
if (ackCounter == 0) {
runTriggers();
}
}
}

void runTriggers() {
for (Callable<Boolean> trigger : updatedTypeTriggers) {
try {
final boolean success = trigger.call();
assert success;
} catch (Throwable e) {
log.error("Could not execute trigger ["+trigger.toString()+"] for eviction ["+evictionId+"]",e);
}
}
log.info("Received all acknowledgements for eviction [{}]",evictionId);
evictionTriggerMap.remove(evictionId,this);
}

int removeDroppedInstances() {
final JanusGraphManagement mgmt = graph.openManagement();
final Set<String> updatedInstances = ((ManagementSystem) mgmt).getOpenInstancesInternal();
final String instanceRemovedMsg = "Instance [{}] was removed list of open instances and therefore dropped from list of instances to be acknowledged.";
instancesToBeAcknowledged.stream().filter(it -> !updatedInstances.contains(it)).filter(instancesToBeAcknowledged::remove).forEach(it -> log.debug(instanceRemovedMsg, it));
mgmt.rollback();
return instancesToBeAcknowledged.size();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ public void read(Message message) {
}
}
}

@Override
public void updateState() {}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ public void read(Message message) {
entry.update(txentry);
}


@Override
public void updateState() {}
}

private class TxEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ public final void read(Message message) {
latch.countDown();
}

@Override
public void updateState() {}

/**
* Subclasses can override this method to perform additional processing on the message.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2017 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.graphdb;

import org.janusgraph.core.JanusGraphException;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.diskstorage.configuration.backend.CommonsConfiguration;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_BACKEND;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_HOSTNAME;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.UNIQUE_INSTANCE_ID_SUFFIX;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.REPLACE_INSTANCE_IF_EXISTS;

import org.apache.commons.configuration.MapConfiguration;

import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.HashMap;
import org.junit.Test;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.equalTo;

public class GraphDatabaseConfigurationInstanceIdTest {

@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void graphShouldOpenWithSameInstanceId() {
final Map<String, Object> map = new HashMap<String, Object>();
map.put(STORAGE_BACKEND.toStringWithoutRoot(), "inmemory");
map.put(UNIQUE_INSTANCE_ID.toStringWithoutRoot(), "not-unique");
map.put(REPLACE_INSTANCE_IF_EXISTS.toStringWithoutRoot(), true);
final MapConfiguration config = new MapConfiguration(map);
final StandardJanusGraph graph1 = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config)));

assertEquals(graph1.openManagement().getOpenInstances().size(), 1);
assertEquals(graph1.openManagement().getOpenInstances().toArray()[0], "not-unique");

final StandardJanusGraph graph2 = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config)));

assertEquals(graph1.openManagement().getOpenInstances().size(), 1);
assertEquals(graph1.openManagement().getOpenInstances().toArray()[0], "not-unique");
assertEquals(graph2.openManagement().getOpenInstances().size(), 1);
assertEquals(graph2.openManagement().getOpenInstances().toArray()[0], "not-unique");
graph1.close();
graph2.close();
}

@Test
public void graphShouldNotOpenWithSameInstanceId() {
final Map<String, Object> map = new HashMap<String, Object>();
map.put(STORAGE_BACKEND.toStringWithoutRoot(), "inmemory");
map.put(UNIQUE_INSTANCE_ID.toStringWithoutRoot(), "not-unique");
final MapConfiguration config = new MapConfiguration(map);
final StandardJanusGraph graph1 = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config)));

assertEquals(graph1.openManagement().getOpenInstances().size(), 1);
assertEquals(graph1.openManagement().getOpenInstances().toArray()[0], "not-unique");

thrown.expect(JanusGraphException.class);
final String err = "A JanusGraph graph with the same instance id [not-unique] is already open. Might required forced shutdown.";
thrown.expectMessage(equalTo(err));
final StandardJanusGraph graph2 = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config)));

graph1.close();
}

@Test
public void instanceIdShouldEqualHostname() throws UnknownHostException {
final Map<String, Object> map = new HashMap<String, Object>();
map.put(STORAGE_BACKEND.toStringWithoutRoot(), "inmemory");
map.put(UNIQUE_INSTANCE_ID_HOSTNAME.toStringWithoutRoot(), true);
final MapConfiguration config = new MapConfiguration(map);
final StandardJanusGraph graph = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config)));
assertEquals(graph.openManagement().getOpenInstances().size(), 1);
assertEquals(graph.openManagement().getOpenInstances().toArray()[0], Inet4Address.getLocalHost().getHostName());
graph.close();
}

@Test
public void instanceIdShouldEqualHostnamePlusSuffix() throws UnknownHostException {
final Map<String, Object> map = new HashMap<String, Object>();
map.put(STORAGE_BACKEND.toStringWithoutRoot(), "inmemory");
map.put(UNIQUE_INSTANCE_ID_HOSTNAME.toStringWithoutRoot(), true);
map.put(UNIQUE_INSTANCE_ID_SUFFIX.toStringWithoutRoot(), 1);
final MapConfiguration config = new MapConfiguration(map);
final StandardJanusGraph graph = new StandardJanusGraph(new GraphDatabaseConfiguration(new CommonsConfiguration(config)));
assertEquals(graph.openManagement().getOpenInstances().size(), 1);
assertEquals(graph.openManagement().getOpenInstances().toArray()[0], Inet4Address.getLocalHost().getHostName() + "1");
graph.close();
}
}
Loading

0 comments on commit 894488c

Please sign in to comment.