Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support PostgreSQL and CockroachDB backends #484

Merged
merged 10 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ env:
- BACKEND=mysql
- BACKEND=hbase
- BACKEND=rocksdb
- BACKEND=postgresql
global:
- RELEASE_BRANCH=^release-.*$
- RELEASE_TAG=^v[0-9]\..*$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,37 @@
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.util.InsertionOrderUtil;

import jersey.repackaged.com.google.common.collect.ImmutableMap;
zhoney marked this conversation as resolved.
Show resolved Hide resolved

public class TableDefine {

private final Map<HugeKeys, String> columns;
private final List<HugeKeys> keys;
private final Map<String, String> typesMapping;

public TableDefine() {
this.columns = InsertionOrderUtil.newMap();
this.keys = InsertionOrderUtil.newList();
this.typesMapping = ImmutableMap.of();
}

public TableDefine(Map<String, String> typesMapping) {
this.columns = InsertionOrderUtil.newMap();
this.keys = InsertionOrderUtil.newList();
this.typesMapping = typesMapping;
}

public TableDefine column(HugeKeys key, String... desc) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < desc.length; i++) {
sb.append(desc[i]);
String type = desc[i];
// The first element of 'desc' is column data type, which may be
// mapped to actual data type supported by backend store
if (i == 0 && this.typesMapping.containsKey(type)) {
zhoney marked this conversation as resolved.
Show resolved Hide resolved
type = this.typesMapping.get(type);
}
assert type != null;
sb.append(type);
if (i != desc.length - 1) {
sb.append(" ");
}
Expand Down
5 changes: 5 additions & 0 deletions hugegraph-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
<artifactId>hugegraph-hbase</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-postgresql</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion hugegraph-dist/src/assembly/static/conf/hugegraph.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ cassandra.password=


# mysql backend config
#jdbc.driver=com.mysql.jdbc.Driver
#jdbc.url=jdbc:mysql://127.0.0.1:3306
#jdbc.username=root
#jdbc.password=
#jdbc.reconnect_max_times=3
#jdbc.reconnect_interval=3

#jdbc.sslmode=disable

# palo backend config
#palo.host=127.0.0.1
Expand Down
2 changes: 2 additions & 0 deletions hugegraph-dist/src/assembly/travis/install-backend.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ elif [ "$BACKEND" == "hbase" ]; then
$TRAVIS_DIR/install-hbase.sh
elif [ "$BACKEND" == "mysql" ]; then
$TRAVIS_DIR/install-mysql.sh
elif [ "$BACKEND" == "postgresql" ]; then
$TRAVIS_DIR/install-postgresql.sh
fi
18 changes: 18 additions & 0 deletions hugegraph-dist/src/assembly/travis/install-postgresql.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

set -ev

TRAVIS_DIR=`dirname $0`
CONF=hugegraph-test/src/main/resources/hugegraph.properties

POSTGRESQL_DRIVER=org.postgresql.Driver
POSTGRESQL_URL=jdbc:postgresql://localhost:5432/
POSTGRESQL_USERNAME=postgres

# Set PostgreSQL configurations
sed -i "s/jdbc.driver=.*/jdbc.driver=$POSTGRESQL_DRIVER/" $CONF
sed -i "s?jdbc.url=.*?jdbc.url=$POSTGRESQL_URL?" $CONF
sed -i "s/jdbc.username=.*/jdbc.username=$POSTGRESQL_USERNAME/" $CONF
zhoney marked this conversation as resolved.
Show resolved Hide resolved

sudo service postgresql stop 9.2
sudo service postgresql start 9.5
16 changes: 15 additions & 1 deletion hugegraph-dist/src/assembly/travis/start-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,29 @@ BASE_DIR=hugegraph-$VERSION
BIN=$BASE_DIR/bin
CONF=$BASE_DIR/conf/hugegraph.properties

# PostgreSQL configurations
POSTGRESQL_DRIVER=org.postgresql.Driver
POSTGRESQL_URL=jdbc:postgresql://localhost:5432/
POSTGRESQL_USERNAME=postgres

declare -A backend_serializer_map=(["memory"]="text" ["cassandra"]="cassandra" \
["scylladb"]="scylladb" ["mysql"]="mysql" \
["hbase"]="hbase" ["rocksdb"]="binary")
["hbase"]="hbase" ["rocksdb"]="binary" \
["postgresql"]="postgresql")

SERIALIZER=${backend_serializer_map[$BACKEND]}

# Set backend and serializer
sed -i "s/backend=.*/backend=$BACKEND/" $CONF
sed -i "s/serializer=.*/serializer=$SERIALIZER/" $CONF

# Set PostgreSQL configurations if needed
if [ "$BACKEND" == "postgresql" ]; then
sed -i "s/#jdbc.driver=.*/jdbc.driver=$POSTGRESQL_DRIVER/" $CONF
sed -i "s?#jdbc.url=.*?jdbc.url=$POSTGRESQL_URL?" $CONF
sed -i "s/#jdbc.username=.*/jdbc.username=$POSTGRESQL_USERNAME/" $CONF
fi

# Append schema.sync_deletion=true to config file
echo "schema.sync_deletion=true" >> $CONF

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public static void registerBackends() {
String confFile = "/backend.properties";
InputStream input = RegisterUtil.class.getClass()
.getResourceAsStream(confFile);
E.checkState(input != null, "Can't read file '%s' as stream", confFile);
E.checkState(input != null,
"Can't read file '%s' as stream", confFile);

PropertiesConfiguration props = new PropertiesConfiguration();
props.setDelimiterParsingDisabled(true);
Expand Down Expand Up @@ -89,8 +90,12 @@ private static void registerBackend(String backend) {
case "palo":
registerPalo();
break;
case "postgresql":
registerPostgresql();
break;
default:
throw new HugeException("Unsupported backend type '%s'", backend);
throw new HugeException("Unsupported backend type '%s'",
backend);
}
}

Expand Down Expand Up @@ -165,6 +170,18 @@ public static void registerPalo() {
"com.baidu.hugegraph.backend.store.palo.PaloStoreProvider");
}

public static void registerPostgresql() {
// Register config
OptionSpace.register("postgresql",
"com.baidu.hugegraph.backend.store.postgresql.PostgresqlOptions");
// Register serializer
SerializerFactory.register("postgresql",
"com.baidu.hugegraph.backend.store.postgresql.PostgresqlSerializer");
// Register backend
BackendProviderFactory.register("postgresql",
"com.baidu.hugegraph.backend.store.postgresql.PostgresqlStoreProvider");
}

public static void registerServer() {
OptionSpace.register("server", "com.baidu.hugegraph.config.ServerOptions");
}
Expand Down
2 changes: 1 addition & 1 deletion hugegraph-dist/src/main/resources/backend.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
backends=[cassandra, scylladb, rocksdb, mysql, palo, hbase]
backends=[cassandra, scylladb, rocksdb, mysql, palo, hbase, postgresql]
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public class MysqlOptions extends OptionHolder {

private MysqlOptions() {
protected MysqlOptions() {
super();
}

Expand Down Expand Up @@ -89,4 +89,12 @@ public static synchronized MysqlOptions instance() {
rangeInt(1, 10),
3
);

public static final ConfigOption<String> SSL_MODE =
new ConfigOption<>(
"jdbc.ssl_mode",
"The url of database in JDBC format.",
zhoney marked this conversation as resolved.
Show resolved Hide resolved
disallowEmpty(),
"disable"
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,11 @@ public void createDatabase() {
try (Connection conn = this.openWithoutDB(0)) {
conn.createStatement().execute(sql);
} catch (SQLException e) {
throw new BackendException("Failed to create database '%s'",
this.database);
if (!e.getMessage().endsWith("already exists")) {
throw new BackendException("Failed to create database '%s'", e,
this.database);
}
// Ignore exception if database already exists
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class MysqlStore extends AbstractBackendStore<Session> {

private final Map<HugeType, MysqlTable> tables;

private MysqlSessions sessions;
protected MysqlSessions sessions;

public MysqlStore(final BackendStoreProvider provider,
final String database, final String store) {
Expand Down Expand Up @@ -114,7 +114,8 @@ public synchronized void open(HugeConfig config) {
try {
this.sessions.open(config);
} catch (Exception e) {
if (!e.getMessage().startsWith("Unknown database")) {
if (!e.getMessage().startsWith("Unknown database") &&
!e.getMessage().endsWith("does not exist")) {
throw new BackendException("Failed connect with mysql, " +
"please ensure it's ok", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;

import com.baidu.hugegraph.backend.BackendException;
Expand All @@ -50,6 +51,8 @@ public abstract class MysqlTable

private static final Logger LOG = Log.logger(MysqlStore.class);

protected String dropTableTemplate = "DROP TABLE IF EXISTS %s;";
protected String truncateTableTemplate = "TRUNCATE TABLE %s;";
// The template for insert and delete statements
private String insertTemplate;
private String deleteTemplate;
Expand Down Expand Up @@ -91,14 +94,16 @@ protected void createTable(Session session, TableDefine tableDefine) {
// Specified primary keys
sql.append(" PRIMARY KEY (");
int i = 0;
int size = tableDefine.keys().size();
for (HugeKeys key : tableDefine.keys()) {
sql.append(key);
if (++i != tableDefine.keys().size()) {
sql.append(formatKey(key));
zhoney marked this conversation as resolved.
Show resolved Hide resolved
if (++i != size) {
sql.append(", ");
}
}

sql.append(")) ENGINE=InnoDB;");
sql.append("))");
sql.append(this.engine());
sql.append(";");

LOG.debug("Create table: {}", sql);
try {
Expand All @@ -109,9 +114,13 @@ protected void createTable(Session session, TableDefine tableDefine) {
}
}

protected String engine() {
return " ENGINE=InnoDB";
}

protected void dropTable(Session session) {
LOG.debug("Drop table: {}", this.table());
String sql = String.format("DROP TABLE IF EXISTS %s;", this.table());
String sql = this.buildDropTemplate();
try {
session.execute(sql);
} catch (SQLException e) {
Expand All @@ -122,7 +131,7 @@ protected void dropTable(Session session) {

protected void truncateTable(Session session) {
LOG.debug("Truncate table: {}", this.table());
String sql = String.format("TRUNCATE TABLE %s;", this.table());
String sql = this.buildTruncateTemplate();
try {
session.execute(sql);
} catch (SQLException e) {
Expand Down Expand Up @@ -190,6 +199,14 @@ protected String buildDeleteTemplate(List<HugeKeys> idNames) {
return this.deleteTemplate;
}

protected String buildDropTemplate() {
return String.format(this.dropTableTemplate, this.table());
zhoney marked this conversation as resolved.
Show resolved Hide resolved
}

protected String buildTruncateTemplate() {
return String.format(this.truncateTableTemplate, this.table());
zhoney marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Insert an entire row
*/
Expand All @@ -202,7 +219,7 @@ public void insert(Session session, MysqlBackendEntry.Row entry) {
// Create or get insert prepare statement
insertStmt = session.prepareStatement(template);
int i = 1;
for (Object object : entry.columns().values()) {
for (Object object : this.buildInsertObjects(entry)) {
insertStmt.setObject(i++, object);
}
} catch (SQLException e) {
Expand Down Expand Up @@ -413,7 +430,7 @@ protected StringBuilder relation2Sql(Condition.Relation relation) {
Object value = relation.serialValue();

// Serialize value (TODO: should move to Serializer)
value = serializeValue(value);
value = this.serializeValue(value);

zhoney marked this conversation as resolved.
Show resolved Hide resolved
StringBuilder sql = new StringBuilder(32);
sql.append(key);
Expand Down Expand Up @@ -503,13 +520,19 @@ protected void wrapPage(StringBuilder select, Query query) {
select.append(where.build());
}

select.append(this.orderByKeys());

assert query.limit() != Query.NO_LIMIT;
// Fetch `limit + 1` records for judging whether reached the last page
select.append(" limit ");
select.append(query.limit() + 1);
select.append(";");
}

protected String orderByKeys() {
return Strings.EMPTY;
}

protected void wrapOffset(StringBuilder select, Query query) {
assert query.limit() >= 0;
assert query.offset() >= 0;
Expand All @@ -521,7 +544,7 @@ protected void wrapOffset(StringBuilder select, Query query) {
select.append(";");
}

private static Object serializeValue(Object value) {
protected Object serializeValue(Object value) {
if (value instanceof Id) {
value = ((Id) value).asObject();
}
Expand All @@ -545,6 +568,14 @@ protected void appendPartition(StringBuilder delete) {
// pass
}

protected List<Object> buildInsertObjects(MysqlBackendEntry.Row entry) {
List<Object> objects = new ArrayList<>();
for (Object key : entry.columns().keySet()) {
objects.add(entry.columns().get(key));
}
return objects;
}

public static String formatKey(HugeKeys key) {
return key.name();
}
Expand Down
Loading