Skip to content

Commit

Permalink
WIP WIP WIP
Browse files Browse the repository at this point in the history
Signed-off-by: David Pitera <[email protected]>
  • Loading branch information
dpitera committed Jan 19, 2018
1 parent 89ff095 commit 82cbadd
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.janusgraph.graphdb.management.ConfigurationManagementGraph;
import org.janusgraph.graphdb.management.JanusGraphManager;
import org.janusgraph.graphdb.database.management.ManagementSystem;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.diskstorage.BackendException;
Expand Down Expand Up @@ -152,7 +153,7 @@ public static JanusGraph close(String graphName) throws Exception {
public static void drop(String graphName) throws BackendException, ConfigurationManagementGraphNotEnabledException, Exception {
final StandardJanusGraph graph = (StandardJanusGraph) ConfiguredGraphFactory.close(graphName);
JanusGraphFactory.drop(graph);
ConfigurationManagementGraph.getInstance().removeConfiguration(graphName);
removeConfiguration(graphName);
}

private static ConfigurationManagementGraph getConfigGraphManagementInstance() {
Expand Down Expand Up @@ -197,6 +198,8 @@ public static void createTemplateConfiguration(final Configuration config) {
*/
public static void updateConfiguration(final String graphName, final Configuration config) {
final ConfigurationManagementGraph configManagementGraph = getConfigGraphManagementInstance();
final JanusGraph graph = open(graphName);
removeGraphFromCache(graph);
configManagementGraph.updateConfiguration(graphName, config);
}

Expand All @@ -217,9 +220,20 @@ public static void updateTemplateConfiguration(final Configuration config) {
*/
public static void removeConfiguration(final String graphName) {
final ConfigurationManagementGraph configManagementGraph = getConfigGraphManagementInstance();
final JanusGraph graph = open(graphName);
removeGraphFromCache(graph);
configManagementGraph.removeConfiguration(graphName);
}

private static void removeGraphFromCache(final JanusGraph graph) {
final JanusGraphManager jgm = JanusGraphManagerUtility.getInstance();
Preconditions.checkState(jgm != null, JANUS_GRAPH_MANAGER_EXPECTED_STATE_MSG);
jgm.removeGraph(((StandardJanusGraph) graph).getGraphName());
final ManagementSystem mgmt = (ManagementSystem) graph.openManagement();
mgmt.evictGraphFromCache();
mgmt.commit();
}

/**
* Remove template configuration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1743,7 +1743,7 @@ public Backend getBackend() {
}

public String getGraphName() {
return getConfigurationAtOpen().getString(GRAPH_NAME.toString());
return getConfigurationAtOpen().getString(GRAPH_NAME.toStringWithoutRoot());
}

public StoreFeatures getStoreFeatures() {
Expand Down Expand Up @@ -1792,4 +1792,4 @@ public static String getPath(File dir) {
return dir.getAbsolutePath() + File.separator;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2017 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.graphdb.database.management;

public enum GraphCacheEvictionAction {

EVICT, DO_NOT_EVICT
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@
import org.janusgraph.graphdb.database.idhandling.VariableLong;
import org.janusgraph.graphdb.database.serialize.DataOutput;
import org.janusgraph.graphdb.database.serialize.Serializer;
import org.janusgraph.graphdb.management.JanusGraphManager;
import org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex;
import static org.janusgraph.graphdb.database.management.GraphCacheEvictionAction.EVICT;
import static org.janusgraph.graphdb.database.management.GraphCacheEvictionAction.DO_NOT_EVICT;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.List;
Expand Down Expand Up @@ -88,7 +92,9 @@ public void read(Message message) {
long typeId = VariableLong.readPositive(in);
schemaCache.expireSchemaElement(typeId);
}
Thread ack = new Thread(new SendAckOnTxClose(evictionId, senderId, graph.getOpenTransactions()));
GraphCacheEvictionAction action = serializer.readObjectNotNull(in, GraphCacheEvictionAction.class);
Preconditions.checkNotNull(action);
Thread ack = new Thread(new SendAckOnTxClose(evictionId, senderId, graph.getOpenTransactions(), action, graph.getGraphName()));
ack.setDaemon(true);
ack.start();
break;
Expand All @@ -114,6 +120,7 @@ public void read(Message message) {
}

public void sendCacheEviction(Set<JanusGraphSchemaVertex> updatedTypes,
final boolean evictGraphFromCache,
List<Callable<Boolean>> updatedTypeTriggers,
Set<String> openInstances) {
Preconditions.checkArgument(!openInstances.isEmpty());
Expand All @@ -127,6 +134,8 @@ public void sendCacheEviction(Set<JanusGraphSchemaVertex> updatedTypes,
assert type.hasId();
VariableLong.writePositive(out,type.longId());
}
if (evictGraphFromCache) out.writeObjectNotNull(EVICT);
else out.writeObjectNotNull(DO_NOT_EVICT);
sysLog.add(out.getStaticBuffer());
}

Expand Down Expand Up @@ -170,11 +179,19 @@ private class SendAckOnTxClose implements Runnable {
private final long evictionId;
private final Set<? extends JanusGraphTransaction> openTx;
private final String originId;
private final GraphCacheEvictionAction action;
private final String graphName;

private SendAckOnTxClose(long evictionId, String originId, Set<? extends JanusGraphTransaction> openTx) {
private SendAckOnTxClose(long evictionId,
String originId,
Set<? extends JanusGraphTransaction> openTx,
GraphCacheEvictionAction action,
String graphName) {
this.evictionId = evictionId;
this.openTx = openTx;
this.originId = originId;
this.action = action;
this.graphName = graphName;
}

@Override
Expand All @@ -191,7 +208,16 @@ public void run() {
txStillOpen = true;
}
}
if (!txStillOpen) {
final JanusGraphManager jgm = JanusGraphManager.getInstance();
final boolean janusGraphManagerIsInBadState = null == jgm && action.equals(EVICT);

if (janusGraphManagerIsInBadState) {
log.error("The JanusGraphManager is null and therefore we cannot remove graph {} from the cache.", graphName);
} else {
jgm.removeGraph(graphName);
log.debug("Graph {} has been removed from the JanusGraphManager graph cache.", graphName);
}
if (!txStillOpen && !janusGraphManagerIsInBadState) {
//Send ack and finish up
DataOutput out = graph.getDataSerializer().getDataOutput(64);
out.writeObjectNotNull(MgmtLogType.CACHED_TYPE_EVICTION_ACK);
Expand All @@ -206,6 +232,9 @@ public void run() {
log.warn("System log has already shut down. Did not sent {}: evictionID={} originID={}",MgmtLogType.CACHED_TYPE_EVICTION_ACK,evictionId,originId);
}
break;
} else if (!txStillOpen && janusGraphManagerIsInBadState) {
log.error("JanusGraphManager should be instantiated on this server, but it is not. Please restart with proper server settings.");
break;
}
if (MAX_WAIT_TIME.compareTo(t.elapsed()) < 0) {
//Break out if waited too long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public class ManagementSystem implements JanusGraphManagement {
private final StandardJanusGraphTx transaction;

private final Set<JanusGraphSchemaVertex> updatedTypes;
private boolean evictGraphFromCache;
private final List<Callable<Boolean>> updatedTypeTriggers;

private final Instant txStartTime;
Expand All @@ -161,6 +162,7 @@ public ManagementSystem(StandardJanusGraph graph, KCVSConfiguration config, Log
this.userConfig = new UserModifiableConfiguration(modifyConfig, configVerifier);

this.updatedTypes = new HashSet<>();
this.evictGraphFromCache = false;
this.updatedTypeTriggers = new ArrayList<>();
this.graphShutdownRequired = false;

Expand Down Expand Up @@ -236,8 +238,8 @@ public synchronized void commit() {
transaction.commit();

//Communicate schema changes
if (!updatedTypes.isEmpty()) {
managementLogger.sendCacheEviction(updatedTypes, updatedTypeTriggers, getOpenInstancesInternal());
if (!updatedTypes.isEmpty() || evictGraphFromCache) {
managementLogger.sendCacheEviction(updatedTypes, evictGraphFromCache, updatedTypeTriggers, getOpenInstancesInternal());
for (JanusGraphSchemaVertex schemaVertex : updatedTypes) {
schemaCache.expireSchemaElement(schemaVertex.longId());
}
Expand Down Expand Up @@ -744,6 +746,30 @@ public IndexJobFuture updateIndex(Index index, SchemaAction updateAction) {
return future;
}

/**
* Upon the open managementsystem's commit, this graph will be asynchronously evicted from the cache on all JanusGraph nodes in your
* cluster, once there are no open transactions on this graph on each respective JanusGraph node.
*/
public void evictGraphFromCache() {
this.evictGraphFromCache = true;
setUpdateTrigger(new GraphCacheEvictionCompleteTrigger(this.graph.getGraphName()));
}

private static class GraphCacheEvictionCompleteTrigger implements Callable<Boolean> {
private static final Logger log = LoggerFactory.getLogger(GraphCacheEvictionCompleteTrigger.class);
private final String graphName;

private GraphCacheEvictionCompleteTrigger(String graphName) {
this.graphName = graphName;
}

@Override
public Boolean call() {
log.info("Graph {} has been removed from the graph cache on every JanusGraph node in the cluster.", graphName);
return true;
}
}

private static class EmptyIndexJobFuture implements IndexJobFuture {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.janusgraph.diskstorage.util.time.TimestampProviders;
import org.janusgraph.graphdb.database.idhandling.VariableLong;
import org.janusgraph.graphdb.database.log.LogTxStatus;
import org.janusgraph.graphdb.database.management.GraphCacheEvictionAction;
import org.janusgraph.graphdb.database.management.MgmtLogType;
import org.janusgraph.graphdb.database.serialize.attribute.*;
import org.janusgraph.graphdb.internal.ElementCategory;
Expand Down Expand Up @@ -115,6 +116,7 @@ public StandardSerializer() {
registerClassInternal(51,SchemaStatus.class, new EnumSerializer<>(SchemaStatus.class));
registerClassInternal(52,LogTxStatus.class, new EnumSerializer<>(LogTxStatus.class));
registerClassInternal(53,MgmtLogType.class, new EnumSerializer<>(MgmtLogType.class));
registerClassInternal(69,GraphCacheEvictionAction.class, new EnumSerializer<>(GraphCacheEvictionAction.class));
registerClassInternal(54,TimestampProviders.class, new EnumSerializer<>(TimestampProviders.class));
registerClassInternal(55,TimeUnit.class, new EnumSerializer<>(TimeUnit.class));
registerClassInternal(56,Mapping.class, new EnumSerializer<>(Mapping.class));
Expand Down

0 comments on commit 82cbadd

Please sign in to comment.