Skip to content

Commit

Permalink
Support PostgreSQL and CockroachDB backends (#484)
Browse files Browse the repository at this point in the history
implemented: #441
Change-Id: I979affd9f8b2916e6696672fc6e3fda75f9b76da
  • Loading branch information
zhoney authored and Linary committed May 22, 2019
1 parent 44dc443 commit 387cef5
Show file tree
Hide file tree
Showing 32 changed files with 1,055 additions and 113 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ env:
- BACKEND=mysql
- BACKEND=hbase
- BACKEND=rocksdb
- BACKEND=postgresql
global:
- RELEASE_BRANCH=^release-.*$
- RELEASE_TAG=^v[0-9]\..*$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,6 @@ protected Clause relation2Cql(Relation relation) {
String key = relation.serialKey().toString();
Object value = relation.serialValue();

// Serialize value (TODO: should move to Serializer)
value = serializeValue(value);

switch (relation.relation()) {
case EQ:
return QueryBuilder.eq(key, value);
Expand All @@ -281,12 +278,7 @@ protected Clause relation2Cql(Relation relation) {
case LTE:
return QueryBuilder.lte(key, value);
case IN:
List<?> values = (List<?>) value;
List<Object> serializedValues = new ArrayList<>(values.size());
for (Object v : values) {
serializedValues.add(serializeValue(v));
}
return QueryBuilder.in(key, serializedValues);
return QueryBuilder.in(key, value);
case CONTAINS:
return QueryBuilder.contains(key, value);
case CONTAINS_KEY:
Expand Down Expand Up @@ -331,14 +323,6 @@ protected static Select cloneSelect(Select select, String table) {
return CopyUtil.copy(select, QueryBuilder.select().from(table));
}

protected static Object serializeValue(Object value) {
// Serialize value (TODO: should move to Serializer)
if (value instanceof Id) {
value = ((Id) value).asObject();
}
return value;
}

protected Iterator<BackendEntry> results2Entries(Query q, ResultSet r) {
return new CassandraEntryIterator(r, q, (e1, row) -> {
CassandraBackendEntry e2 = row2Entry(q.resultType(), row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package com.baidu.hugegraph.backend.serializer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;

import com.baidu.hugegraph.HugeGraph;
Expand Down Expand Up @@ -339,7 +341,8 @@ protected Query writeQueryEdgeCondition(Query query) {
if (r.key() == HugeKeys.OWNER_VERTEX ||
r.key() == HugeKeys.OTHER_VERTEX) {
// Serialize vertex id
r.serialValue(IdUtil.writeString((Id) value));
String id = IdUtil.writeString((Id) value);
r.serialValue(this.escapeString(id));
} else {
// Serialize label id
r.serialValue(((Id) value).asObject());
Expand All @@ -353,16 +356,30 @@ protected Query writeQueryEdgeCondition(Query query) {

@Override
protected Query writeQueryCondition(Query query) {
if (query.resultType().isGraph()) {
ConditionQuery result = (ConditionQuery) query;
// No user-prop when serialize
assert result.allSysprop();
for (Condition.Relation r : result.relations()) {
if (r.relation() == Condition.RelationType.CONTAINS) {
r.serialValue(JsonUtil.toJson(r.value()));
ConditionQuery result = (ConditionQuery) query;
// No user-prop when serialize
assert result.allSysprop();
for (Condition.Relation r : result.relations()) {
if (!(r.value().equals(r.serialValue()))) {
continue;
}
if (r.relation() == Condition.RelationType.IN) {
List<?> values = (List<?>) r.value();
List<Object> serializedValues = new ArrayList<>(values.size());
for (Object v : values) {
serializedValues.add(this.serializeValue(v));
}
r.serialValue(serializedValues);
} else {
r.serialValue(this.serializeValue(r.value()));
}

if (query.resultType().isGraph() &&
r.relation() == Condition.RelationType.CONTAINS) {
r.serialValue(JsonUtil.toJson(r.serialValue()));
}
}

return query;
}

Expand Down Expand Up @@ -577,6 +594,20 @@ protected abstract void formatProperties(HugeElement element,
protected abstract void parseProperties(HugeElement element,
TableBackendEntry.Row row);

protected Object serializeValue(Object value) {
if (value instanceof Id) {
value = ((Id) value).asObject();
}
if (value instanceof String) {
value = this.escapeString((String) value);
}
return value;
}

protected String escapeString(String value) {
return value;
}

protected void writeEnableLabelIndex(SchemaLabel schema,
TableBackendEntry entry) {
entry.column(HugeKeys.ENABLE_LABEL_INDEX, schema.enableLabelIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,37 @@

import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.google.common.collect.ImmutableMap;

public class TableDefine {

private final Map<HugeKeys, String> columns;
private final List<HugeKeys> keys;
private final Map<String, String> typesMapping;

public TableDefine() {
this.columns = InsertionOrderUtil.newMap();
this.keys = InsertionOrderUtil.newList();
this.typesMapping = ImmutableMap.of();
}

public TableDefine(Map<String, String> typesMapping) {
this.columns = InsertionOrderUtil.newMap();
this.keys = InsertionOrderUtil.newList();
this.typesMapping = typesMapping;
}

public TableDefine column(HugeKeys key, String... desc) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < desc.length; i++) {
sb.append(desc[i]);
String type = desc[i];
// The first element of 'desc' is column data type, which may be
// mapped to actual data type supported by backend store
if (i == 0 && this.typesMapping.containsKey(type)) {
type = this.typesMapping.get(type);
}
assert type != null;
sb.append(type);
if (i != desc.length - 1) {
sb.append(" ");
}
Expand Down
5 changes: 5 additions & 0 deletions hugegraph-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
<artifactId>hugegraph-hbase</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-postgresql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion hugegraph-dist/src/assembly/static/conf/hugegraph.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ cassandra.password=


# mysql backend config
#jdbc.driver=com.mysql.jdbc.Driver
#jdbc.url=jdbc:mysql://127.0.0.1:3306
#jdbc.username=root
#jdbc.password=
#jdbc.reconnect_max_times=3
#jdbc.reconnect_interval=3

#jdbc.sslmode=disable

# palo backend config
#palo.host=127.0.0.1
Expand Down
2 changes: 2 additions & 0 deletions hugegraph-dist/src/assembly/travis/install-backend.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ elif [ "$BACKEND" == "hbase" ]; then
$TRAVIS_DIR/install-hbase.sh
elif [ "$BACKEND" == "mysql" ]; then
$TRAVIS_DIR/install-mysql.sh
elif [ "$BACKEND" == "postgresql" ]; then
$TRAVIS_DIR/install-postgresql.sh
fi
18 changes: 18 additions & 0 deletions hugegraph-dist/src/assembly/travis/install-postgresql.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

set -ev

TRAVIS_DIR=`dirname $0`
CONF=hugegraph-test/src/main/resources/hugegraph.properties

POSTGRESQL_DRIVER=org.postgresql.Driver
POSTGRESQL_URL=jdbc:postgresql://localhost:5432/
POSTGRESQL_USERNAME=postgres

# Set PostgreSQL configurations
sed -i "s/jdbc.driver=.*/jdbc.driver=$POSTGRESQL_DRIVER/" $CONF
sed -i "s?jdbc.url=.*?jdbc.url=$POSTGRESQL_URL?" $CONF
sed -i "s/jdbc.username=.*/jdbc.username=$POSTGRESQL_USERNAME/" $CONF

sudo service postgresql stop 9.2
sudo service postgresql start 9.5
16 changes: 15 additions & 1 deletion hugegraph-dist/src/assembly/travis/start-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,29 @@ BASE_DIR=hugegraph-$VERSION
BIN=$BASE_DIR/bin
CONF=$BASE_DIR/conf/hugegraph.properties

# PostgreSQL configurations
POSTGRESQL_DRIVER=org.postgresql.Driver
POSTGRESQL_URL=jdbc:postgresql://localhost:5432/
POSTGRESQL_USERNAME=postgres

declare -A backend_serializer_map=(["memory"]="text" ["cassandra"]="cassandra" \
["scylladb"]="scylladb" ["mysql"]="mysql" \
["hbase"]="hbase" ["rocksdb"]="binary")
["hbase"]="hbase" ["rocksdb"]="binary" \
["postgresql"]="postgresql")

SERIALIZER=${backend_serializer_map[$BACKEND]}

# Set backend and serializer
sed -i "s/backend=.*/backend=$BACKEND/" $CONF
sed -i "s/serializer=.*/serializer=$SERIALIZER/" $CONF

# Set PostgreSQL configurations if needed
if [ "$BACKEND" == "postgresql" ]; then
sed -i "s/#jdbc.driver=.*/jdbc.driver=$POSTGRESQL_DRIVER/" $CONF
sed -i "s?#jdbc.url=.*?jdbc.url=$POSTGRESQL_URL?" $CONF
sed -i "s/#jdbc.username=.*/jdbc.username=$POSTGRESQL_USERNAME/" $CONF
fi

# Append schema.sync_deletion=true to config file
echo "schema.sync_deletion=true" >> $CONF

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public static void registerBackends() {
String confFile = "/backend.properties";
InputStream input = RegisterUtil.class.getClass()
.getResourceAsStream(confFile);
E.checkState(input != null, "Can't read file '%s' as stream", confFile);
E.checkState(input != null,
"Can't read file '%s' as stream", confFile);

PropertiesConfiguration props = new PropertiesConfiguration();
props.setDelimiterParsingDisabled(true);
Expand Down Expand Up @@ -89,8 +90,12 @@ private static void registerBackend(String backend) {
case "palo":
registerPalo();
break;
case "postgresql":
registerPostgresql();
break;
default:
throw new HugeException("Unsupported backend type '%s'", backend);
throw new HugeException("Unsupported backend type '%s'",
backend);
}
}

Expand Down Expand Up @@ -165,6 +170,18 @@ public static void registerPalo() {
"com.baidu.hugegraph.backend.store.palo.PaloStoreProvider");
}

public static void registerPostgresql() {
// Register config
OptionSpace.register("postgresql",
"com.baidu.hugegraph.backend.store.postgresql.PostgresqlOptions");
// Register serializer
SerializerFactory.register("postgresql",
"com.baidu.hugegraph.backend.store.postgresql.PostgresqlSerializer");
// Register backend
BackendProviderFactory.register("postgresql",
"com.baidu.hugegraph.backend.store.postgresql.PostgresqlStoreProvider");
}

public static void registerServer() {
OptionSpace.register("server", "com.baidu.hugegraph.config.ServerOptions");
}
Expand Down
2 changes: 1 addition & 1 deletion hugegraph-dist/src/main/resources/backend.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
backends=[cassandra, scylladb, rocksdb, mysql, palo, hbase]
backends=[cassandra, scylladb, rocksdb, mysql, palo, hbase, postgresql]
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public class MysqlOptions extends OptionHolder {

private MysqlOptions() {
protected MysqlOptions() {
super();
}

Expand Down Expand Up @@ -89,4 +89,12 @@ public static synchronized MysqlOptions instance() {
rangeInt(1, 10),
3
);

public static final ConfigOption<String> SSL_MODE =
new ConfigOption<>(
"jdbc.ssl_mode",
"The SSL mode of connections with database.",
disallowEmpty(),
"disable"
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,9 @@ protected void readUserdata(SchemaElement schema,
schema.userdata(e.getKey(), e.getValue());
}
}

@Override
protected String escapeString(String value) {
return MysqlUtil.escapeString(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,11 @@ public void createDatabase() {
try (Connection conn = this.openWithoutDB(0)) {
conn.createStatement().execute(sql);
} catch (SQLException e) {
throw new BackendException("Failed to create database '%s'",
this.database);
if (!e.getMessage().endsWith("already exists")) {
throw new BackendException("Failed to create database '%s'", e,
this.database);
}
// Ignore exception if database already exists
}
}

Expand Down Expand Up @@ -194,7 +197,7 @@ public boolean existsDatabase() {
/**
* Connect DB without specified database
*/
private Connection openWithoutDB(int timeout) {
protected Connection openWithoutDB(int timeout) {
String jdbcUrl = this.config.get(MysqlOptions.JDBC_URL);
String url = new URIBuilder().setPath(jdbcUrl)
.setParameter("socketTimeout",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public synchronized void open(HugeConfig config) {
try {
this.sessions.open(config);
} catch (Exception e) {
if (!e.getMessage().startsWith("Unknown database")) {
if (!e.getMessage().startsWith("Unknown database") &&
!e.getMessage().endsWith("does not exist")) {
throw new BackendException("Failed connect with mysql, " +
"please ensure it's ok", e);
}
Expand Down
Loading

0 comments on commit 387cef5

Please sign in to comment.