Skip to content

Commit

Permalink
Avoid Hbase delete() masking put() with same timestamp
Browse files Browse the repository at this point in the history
fixed: #230

Change-Id: I468b005c3c974d0ec4c46f1f989fbad7f914f2a3
  • Loading branch information
zhoney committed Nov 27, 2018
1 parent 3cf3bec commit bf997ad
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,14 @@ public String toString() {

@Override
public void run() {
assert this.status.code() < TaskStatus.RUNNING.code();
if (this.checkDependenciesSuccess()) {
this.status(TaskStatus.RUNNING);
super.run();
try {
assert this.status.code() < TaskStatus.RUNNING.code();
if (this.checkDependenciesSuccess()) {
this.status(TaskStatus.RUNNING);
super.run();
}
} catch (Throwable e) {
this.setException(e);
}
}

Expand Down Expand Up @@ -338,7 +342,7 @@ protected Object[] asArray() {
E.checkState(this.type != null, "Task type can't be null");
E.checkState(this.name != null, "Task name can't be null");

List<Object> list = new ArrayList<>(24);
List<Object> list = new ArrayList<>(28);

list.add(T.label);
list.add(P.TASK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,9 @@ public <V> void save(HugeTask<V> task) {
this.call(() -> {
// Construct vertex from task
HugeVertex vertex = this.tx().constructVertex(task);
// Delete the old record if exist
Iterator<Vertex> old = this.tx().queryVertices(vertex.id());
if (old.hasNext()) {
HugeVertex oldV = (HugeVertex) old.next();
assert !old.hasNext();
if (this.tx().indexValueChanged(oldV, vertex)) {
// Only delete vertex if index value changed else override
this.tx().removeVertex(oldV);
}
}
// Do update
// TODO: delete index of old vertex
// this.tx().deleteIndex(vertex);
// Add or update task info in backend store, stale index might exist
return this.tx().addVertex(vertex);
});
}
Expand Down Expand Up @@ -424,6 +416,20 @@ public boolean indexValueChanged(Vertex oldV, HugeVertex newV) {
return false;
}

private void deleteIndex(HugeVertex vertex) {
// Delete the old record if exist
Iterator<Vertex> old = this.queryVertices(vertex.id());
if (old.hasNext()) {
HugeVertex oldV = (HugeVertex) old.next();
assert !old.hasNext();
if (this.indexValueChanged(oldV, vertex)) {
// Only delete vertex if index value changed else override
// TODO: just delete index instead of removing old vertex
this.removeVertex(oldV);
}
}
}

protected void initSchema() {
HugeGraph graph = this.graph();
VertexLabel label = graph.schemaTransaction().getVertexLabel(TASK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,12 @@ public static synchronized HbaseOptions instance() {
rangeInt(1024, 10000),
2181
);

public static final ConfigOption<Integer> HBASE_THREADS_MAX =
new ConfigOption<>(
"hbase.threads_max",
"The max threads num of hbase connections.",
rangeInt(1, 1000),
64
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public synchronized void open(HugeConfig conf) throws IOException {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", hosts);
config.set("hbase.zookeeper.property.clientPort", String.valueOf(port));
// Set hbase.hconnection.threads.max 64 to avoid OOM(default value: 256)
config.setInt("hbase.hconnection.threads.max",
conf.get(HbaseOptions.HBASE_THREADS_MAX));

this.hbase = ConnectionFactory.createConnection(config);
}
Expand Down

0 comments on commit bf997ad

Please sign in to comment.