diff --git a/.gitignore b/.gitignore index 635135c..8d6b0a0 100644 --- a/.gitignore +++ b/.gitignore @@ -111,3 +111,4 @@ local.properties # TeXlipse plugin .texlipse +*.config diff --git a/build.gradle b/build.gradle index cb94ac5..346bf08 100644 --- a/build.gradle +++ b/build.gradle @@ -34,16 +34,18 @@ sourceSets.main.compileClasspath += configurations.providedCompile sourceSets.test.compileClasspath += configurations.providedCompile dependencies { - def slf4jVersion = '1.7.7' + def slf4jVersion = '1.7.21' + + compile('org.hbase:asynchbase:1.7.2') { + exclude group: 'org.jboss.netty' + } providedCompile( - [group: 'org.projectlombok', name: 'lombok', version: '1.14.8']) + [group: 'org.projectlombok', name: 'lombok', version: '1.16.10']) compile( [group: 'org.slf4j', name: 'slf4j-api', version: slf4jVersion], - [group: 'com.google.inject', name: 'guice', version: '3.0'], - [group: 'com.google.guava', name: 'guava', version: '18.0'], - [group: 'org.erlang.otp', name: 'jinterface', version: '1.5.6'], - [group: 'org.hbase', name: 'asynchbase', version: '1.6.0']) + [group: 'com.google.inject', name: 'guice', version: '4.1.0'], + [group: 'org.erlang.otp', name: 'jinterface', version: '1.6.1']) runtime( [group: 'org.slf4j', name: 'slf4j-simple', version: slf4jVersion]) } diff --git a/java_src/main/java/me/cmoz/diver/AsyncScanner.java b/java_src/main/java/me/cmoz/diver/AsyncScanner.java new file mode 100644 index 0000000..5a6b79a --- /dev/null +++ b/java_src/main/java/me/cmoz/diver/AsyncScanner.java @@ -0,0 +1,201 @@ +package me.cmoz.diver; + +import com.ericsson.otp.erlang.*; +import com.stumbleupon.async.Callback; +import org.hbase.async.*; + +import java.util.ArrayList; +import java.util.Arrays; + +class AsyncScanner implements Callback>> { + private static final OtpErlangAtom ROW_ATOM = new OtpErlangAtom("row"); + private static final OtpErlangAtom DONE_ATOM = new OtpErlangAtom("done"); + + private final OtpErlangTuple from; + private final OtpMbox mbox; + private final OtpErlangRef ref; + private final Scanner scanner; + int numRows = Integer.MAX_VALUE; + + public AsyncScanner(OtpErlangTuple from, OtpMbox mbox, OtpErlangRef ref, Scanner scanner, OtpErlangList options) + throws OtpErlangDecodeException { + this.from = from; + this.mbox = mbox; + this.ref = ref; + this.scanner = scanner; + + // prevent returning partial row by default + scanner.setMaxNumKeyValues(-1); + + for (final OtpErlangObject option : options) { + final OtpErlangTuple tuple = (OtpErlangTuple) option; + final OtpErlangObject[] tupleElements = tuple.elements(); + final String optionName = ((OtpErlangAtom) tupleElements[0]).atomValue(); + final OtpErlangObject optionValue = tupleElements[1]; + + switch(optionName) { + case "num_rows": + numRows = (int)((OtpErlangLong) optionValue).longValue(); + scanner.setMaxNumRows(numRows); + break; + + // TODO: setFamilies + case "family": + scanner.setFamily(((OtpErlangBinary) optionValue).binaryValue()); + break; + // TODO: setFilter + case "key_regexp": + scanner.setKeyRegexp(new String(((OtpErlangBinary) optionValue).binaryValue())); + break; + // TODO: setKeyRegexp(regesp, charset) + case "max_num_bytes": + scanner.setMaxTimestamp(((OtpErlangLong) optionValue).longValue()); + break; + case "max_num_keyvalues": + scanner.setMaxNumKeyValues((int)((OtpErlangLong) optionValue).longValue()); + break; + case "max_num_rows": + scanner.setMaxNumRows((int)((OtpErlangLong) optionValue).longValue()); + break; + case "max_timestamp": + scanner.setMaxTimestamp(((OtpErlangLong) optionValue).longValue()); + break; + case "max_versions": + scanner.setMaxVersions((int)((OtpErlangLong) optionValue).longValue()); + break; + case "qualifier": + scanner.setQualifier(((OtpErlangBinary) optionValue).binaryValue()); + break; + // TODO: setQualifiers + case "server_block_cache": + scanner.setServerBlockCache(((OtpErlangLong) optionValue).longValue() != 0); + break; + case "start_key": + scanner.setStartKey(((OtpErlangBinary) optionValue).binaryValue()); + break; + case "stop_key": + scanner.setStopKey(((OtpErlangBinary) optionValue).binaryValue()); + break; + case "time_range": + final OtpErlangObject[] timeRangeElems = ((OtpErlangTuple)optionValue).elements(); + scanner.setTimeRange( + ((OtpErlangLong) timeRangeElems[0]).longValue(), + ((OtpErlangLong) timeRangeElems[1]).longValue()); + break; + case "filter": + scanner.setFilter(filterFromTuple((OtpErlangTuple)optionValue)); + break; + default: + final String message = String.format("Invalid scan option: \"%s\"", tuple); + throw new OtpErlangDecodeException(message); + } + } + } + + private ScanFilter filterFromTuple(OtpErlangTuple tuple) { + OtpErlangObject[] objs = tuple.elements(); + OtpErlangAtom name = (OtpErlangAtom)objs[0]; + + switch(name.atomValue()) { + //TODO: column pagination + case "column_prefix": + return new ColumnPrefixFilter( + ((OtpErlangBinary)objs[1]).binaryValue() + ); + case "column_range": + return new ColumnRangeFilter( + ((OtpErlangBinary)objs[1]).binaryValue(), + ((OtpErlangBinary)objs[2]).binaryValue() + ); + //TODO: compare + case "first_key_only": + return new FirstKeyOnlyFilter(); + case "fuzzy_row": + return fuzzyRowFilterFromList((OtpErlangList)objs[1]); + case "key_only": + return new KeyOnlyFilter(); + case "key_regexp": + return new KeyRegexpFilter(((OtpErlangBinary)objs[1]).binaryValue()); + //TODO: timestamps + default: + throw new IllegalArgumentException("unknown filter key: " + name.atomValue()); + } + } + + private FuzzyRowFilter fuzzyRowFilterFromList(OtpErlangList list) { + OtpErlangObject[] objects = list.elements(); + FuzzyRowFilter.FuzzyFilterPair[] pairs = new FuzzyRowFilter.FuzzyFilterPair[objects.length]; + for(int i = 0; i < objects.length; i++) { + OtpErlangTuple tup = (OtpErlangTuple)objects[i]; + if(tup.arity() != 2) { + throw new IllegalArgumentException("invalid option for fuzzy row filter: " + tup.toString()); + } + pairs[i] = new FuzzyRowFilter.FuzzyFilterPair( + ((OtpErlangBinary)tup.elementAt(0)).binaryValue(), + ((OtpErlangBinary)tup.elementAt(1)).binaryValue()); + } + return new FuzzyRowFilter(Arrays.asList(pairs)); + } + + public void start() { + scanner.nextRows() + .addCallback(this) + .addErrback(new ScannerErrback(from, mbox, ref)); + } + + @Override + public Object call(ArrayList> rows) throws Exception { + if (rows == null) { + sendDone(); + return null; + } + + for(final ArrayList row : rows) { + sendRow(row); + + numRows -= 1; + if(numRows == 0) { + sendDone(); + return null; + } + } + + scanner.nextRows() + .addCallback(this) + .addErrback(new ScannerErrback(from, mbox, ref)); + return null; + } + + public void sendDone() { + final OtpErlangObject[] body = new OtpErlangObject[] { + ref, + DONE_ATOM, + }; + + mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(body)); + } + + public void sendRow(final ArrayList data) throws Exception { + final OtpErlangObject[] items = new OtpErlangObject[data.size()]; + int i = 0; + for (final KeyValue keyValue : data) { + final OtpErlangObject[] erldata = new OtpErlangObject[] { + new OtpErlangBinary(keyValue.key()), + new OtpErlangBinary(keyValue.family()), + new OtpErlangBinary(keyValue.qualifier()), + new OtpErlangBinary(keyValue.value()), + new OtpErlangLong(keyValue.timestamp()) + }; + items[i] = new OtpErlangTuple(erldata); + i++; + } + + final OtpErlangObject[] body = new OtpErlangObject[] { + ref, + ROW_ATOM, + new OtpErlangList(items) + }; + + mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(body)); + } +} diff --git a/java_src/main/java/me/cmoz/diver/GenServerCallback.java b/java_src/main/java/me/cmoz/diver/GenServerCallback.java new file mode 100644 index 0000000..dbf95f1 --- /dev/null +++ b/java_src/main/java/me/cmoz/diver/GenServerCallback.java @@ -0,0 +1,22 @@ +package me.cmoz.diver; + +import com.ericsson.otp.erlang.*; +import com.stumbleupon.async.Callback; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public abstract class GenServerCallback implements Callback { + private final OtpErlangTuple from; + private final OtpMbox mbox; + + @Override + public R call(final T data) throws Exception { + final OtpErlangPid pid = (OtpErlangPid) from.elementAt(0); + final OtpErlangRef ref = (OtpErlangRef) from.elementAt(1); + + mbox.send(pid, TypeUtil.tuple(ref, handle(data))); + return null; + } + + protected abstract OtpErlangObject handle(T t); +} \ No newline at end of file diff --git a/java_src/main/java/me/cmoz/diver/GenServerErrback.java b/java_src/main/java/me/cmoz/diver/GenServerErrback.java index 9c4fb1c..0b3dea2 100644 --- a/java_src/main/java/me/cmoz/diver/GenServerErrback.java +++ b/java_src/main/java/me/cmoz/diver/GenServerErrback.java @@ -1,33 +1,19 @@ package me.cmoz.diver; import com.ericsson.otp.erlang.*; -import com.stumbleupon.async.Callback; -import lombok.RequiredArgsConstructor; -@RequiredArgsConstructor -class GenServerErrback implements Callback { - - private static final OtpErlangAtom ERROR_ATOM = new OtpErlangAtom("error"); - - private final OtpErlangTuple from; - - private final OtpMbox mbox; +class GenServerErrback extends GenServerCallback { + public GenServerErrback(OtpErlangTuple from, OtpMbox mbox) { + super(from, mbox); + } @Override - public Object call(final Exception e) throws Exception { - final OtpErlangObject[] body = new OtpErlangObject[] { - ERROR_ATOM, - new OtpErlangString(e.getClass().getSimpleName()), - new OtpErlangString(e.getLocalizedMessage()) + protected OtpErlangObject handle(Exception e) { + final OtpErlangObject[] body = new OtpErlangObject[]{ + JavaServer.ATOM_ERROR, + new OtpErlangString(e.getClass().getSimpleName()), + new OtpErlangString(e.getLocalizedMessage()) }; - - final OtpErlangObject[] resp = new OtpErlangObject[] { - from.elementAt(1), // Ref - new OtpErlangTuple(body) - }; - - mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(resp)); - return null; + return new OtpErlangTuple(body); } - } diff --git a/java_src/main/java/me/cmoz/diver/GenServerGetCallback.java b/java_src/main/java/me/cmoz/diver/GenServerGetCallback.java index 8f459ed..1567f03 100644 --- a/java_src/main/java/me/cmoz/diver/GenServerGetCallback.java +++ b/java_src/main/java/me/cmoz/diver/GenServerGetCallback.java @@ -1,49 +1,35 @@ package me.cmoz.diver; import com.ericsson.otp.erlang.*; -import com.stumbleupon.async.Callback; -import lombok.RequiredArgsConstructor; import org.hbase.async.KeyValue; - import java.util.ArrayList; -@RequiredArgsConstructor -class GenServerGetCallback implements Callback> { - - private static final OtpErlangAtom OK_ATOM = new OtpErlangAtom("ok"); - - private final OtpErlangTuple from; - - private final OtpMbox mbox; +class GenServerGetCallback extends GenServerCallback> { + public GenServerGetCallback(OtpErlangTuple from, OtpMbox mbox) { + super(from, mbox); + } @Override - public Object call(final ArrayList data) throws Exception { + protected OtpErlangObject handle(ArrayList data) { final OtpErlangObject[] items = new OtpErlangObject[data.size()]; int i = 0; for (final KeyValue keyValue : data) { final OtpErlangObject[] erldata = new OtpErlangObject[] { - new OtpErlangBinary(keyValue.key()), - new OtpErlangBinary(keyValue.family()), - new OtpErlangBinary(keyValue.qualifier()), - new OtpErlangBinary(keyValue.value()), - new OtpErlangLong(keyValue.timestamp()) + new OtpErlangBinary(keyValue.key()), + new OtpErlangBinary(keyValue.family()), + new OtpErlangBinary(keyValue.qualifier()), + new OtpErlangBinary(keyValue.value()), + new OtpErlangLong(keyValue.timestamp()) }; items[i] = new OtpErlangTuple(erldata); i++; } final OtpErlangObject[] body = new OtpErlangObject[] { - OK_ATOM, - new OtpErlangList(items) - }; - - final OtpErlangObject[] resp = new OtpErlangObject[] { - from.elementAt(1), // Ref - new OtpErlangTuple(body) + JavaServer.ATOM_OK, + new OtpErlangList(items) }; - - mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(resp)); - return null; + return new OtpErlangTuple(body); } } diff --git a/java_src/main/java/me/cmoz/diver/GenServerOkCallback.java b/java_src/main/java/me/cmoz/diver/GenServerOkCallback.java index 45ea9a3..bced876 100644 --- a/java_src/main/java/me/cmoz/diver/GenServerOkCallback.java +++ b/java_src/main/java/me/cmoz/diver/GenServerOkCallback.java @@ -1,27 +1,14 @@ package me.cmoz.diver; import com.ericsson.otp.erlang.*; -import com.stumbleupon.async.Callback; -import lombok.RequiredArgsConstructor; -@RequiredArgsConstructor -class GenServerOkCallback implements Callback { - - private static final OtpErlangAtom OK_ATOM = new OtpErlangAtom("ok"); - - private final OtpErlangTuple from; - - private final OtpMbox mbox; +class GenServerOkCallback extends GenServerCallback { + public GenServerOkCallback(OtpErlangTuple from, OtpMbox mbox) { + super(from, mbox); + } @Override - public Object call(final Object object) throws Exception { - final OtpErlangObject[] resp = new OtpErlangObject[] { - from.elementAt(1), // Ref - new OtpErlangTuple(OK_ATOM) - }; - - mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(resp)); - return null; + protected OtpErlangObject handle(Object o) { + return JavaServer.ATOM_OK; } - -} +} \ No newline at end of file diff --git a/java_src/main/java/me/cmoz/diver/JavaServer.java b/java_src/main/java/me/cmoz/diver/JavaServer.java index 25d4837..4af56c4 100644 --- a/java_src/main/java/me/cmoz/diver/JavaServer.java +++ b/java_src/main/java/me/cmoz/diver/JavaServer.java @@ -4,10 +4,8 @@ import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.google.inject.name.Named; import com.google.inject.Inject; -import com.stumbleupon.async.Callback; -import com.stumbleupon.async.Deferred; import lombok.extern.slf4j.Slf4j; -import org.hbase.async.HBaseClient; +import org.hbase.async.*; @Slf4j class JavaServer extends AbstractExecutionThreadService { @@ -22,6 +20,12 @@ class JavaServer extends AbstractExecutionThreadService { /** A mailbox for exchanging messages with Erlang processes. */ private OtpMbox mbox; + public static final OtpErlangAtom ATOM_OK = new OtpErlangAtom("ok"); + public static final OtpErlangAtom ATOM_ERROR = new OtpErlangAtom("error"); + + public static final OtpErlangAtom ATOM_TRUE = new OtpErlangAtom("true"); + public static final OtpErlangAtom ATOM_FALSE = new OtpErlangAtom("false"); + @Inject public JavaServer( final OtpNode otpNode, @@ -58,15 +62,6 @@ private void handleCall(final OtpErlangTuple from, final OtpErlangTuple req) case "client_stats": reply(from, TypeUtil.clientStats(hbaseClient.stats())); break; - case "delete": - final OtpErlangBinary table1 = (OtpErlangBinary) elements[1]; - final OtpErlangBinary key1 = (OtpErlangBinary) elements[2]; - final OtpErlangBinary family1 = (OtpErlangBinary) elements[3]; - final OtpErlangList qualifiers1 = (OtpErlangList) elements[4]; - hbaseClient.delete(TypeUtil.deleteRequest(table1, key1, family1, qualifiers1)) - .addCallback(new GenServerOkCallback(from, mbox)) - .addErrback(new GenServerErrback(from, mbox)); - break; case "ensure_table_exists": final OtpErlangBinary table2 = (OtpErlangBinary) elements[1]; hbaseClient.ensureTableExists(table2.binaryValue()) @@ -85,11 +80,33 @@ private void handleCall(final OtpErlangTuple from, final OtpErlangTuple req) .addCallback(new GenServerOkCallback(from, mbox)) .addErrback(new GenServerErrback(from, mbox)); break; + + case "pid": + reply(from, TypeUtil.tuple(ATOM_OK, mbox.self())); + break; + + case "prefetch_meta": + final OtpErlangBinary table6 = (OtpErlangBinary) elements[1]; + hbaseClient.prefetchMeta(table6.binaryValue()) + .addCallback(new GenServerOkCallback(from, mbox)) + .addErrback(new GenServerErrback(from, mbox)); + break; + + case "get_conf": + reply(from, handleGetConf((OtpErlangAtom)elements[1])); + break; + case "set_conf": + reply(from, handleSetConf(elements)); + break; + case "get": final OtpErlangBinary table4 = (OtpErlangBinary) elements[1]; final OtpErlangBinary key3 = (OtpErlangBinary) elements[2]; - final OtpErlangBinary family2 = (OtpErlangBinary) elements[3]; + OtpErlangBinary family2 = null; OtpErlangBinary qualifier = null; + if(elements.length > 3) { + family2 = (OtpErlangBinary) elements[3]; + } if(elements.length > 4) { qualifier = (OtpErlangBinary) elements[4]; } @@ -97,60 +114,120 @@ private void handleCall(final OtpErlangTuple from, final OtpErlangTuple req) .addCallback(new GenServerGetCallback(from, mbox)) .addErrback(new GenServerErrback(from, mbox)); break; - case "get_flush_interval": - final short flushInterval1 = hbaseClient.getFlushInterval(); - reply(from, TypeUtil.tuple(new OtpErlangAtom("ok"), new OtpErlangShort(flushInterval1))); - break; - case "get_increment_buffer_size": - final int incrementBufferSize1 = hbaseClient.getIncrementBufferSize(); - reply(from, TypeUtil.tuple(new OtpErlangAtom("ok"), new OtpErlangInt(incrementBufferSize1))); - break; - case "pid": - reply(from, TypeUtil.tuple(reqType, mbox.self())); - break; - case "prefetch_meta": + + case "scan": final OtpErlangBinary table5 = (OtpErlangBinary) elements[1]; - hbaseClient.prefetchMeta(table5.binaryValue()) - .addCallback(new GenServerOkCallback(from, mbox)) - .addErrback(new GenServerErrback(from, mbox)); + final OtpErlangList options = (OtpErlangList) elements[2]; + final OtpErlangRef ref = (OtpErlangRef) elements[3]; + final Scanner scanner = hbaseClient.newScanner(table5.binaryValue()); + final AsyncScanner asyncScanner = new AsyncScanner(from, mbox, ref, scanner, options); + asyncScanner.start(); + reply(from, ATOM_OK); break; + case "put": - final OtpErlangBinary table6 = (OtpErlangBinary) elements[1]; - final OtpErlangBinary key4 = (OtpErlangBinary) elements[2]; - final OtpErlangBinary family3 = (OtpErlangBinary) elements[3]; - final OtpErlangList qualifiers2 = (OtpErlangList) elements[4]; - final OtpErlangList values2 = (OtpErlangList) elements[5]; - hbaseClient.put(TypeUtil.putRequest(table6, key4, family3, qualifiers2, values2)) + hbaseClient.put(parsePut((OtpErlangTuple)elements[1])) .addCallback(new GenServerOkCallback(from, mbox)) .addErrback(new GenServerErrback(from, mbox)); break; - case "set_flush_interval": - final OtpErlangShort flushInterval2 = (OtpErlangShort) elements[1]; - try { - final short resp1 = hbaseClient.setFlushInterval(flushInterval2.shortValue()); - reply(from, TypeUtil.tuple(new OtpErlangAtom("ok"), new OtpErlangShort(resp1))); - } catch (final OtpErlangRangeException e) { - reply(from, TypeUtil.tuple(new OtpErlangAtom("error"), new OtpErlangString(e.getClass().getName()), new OtpErlangString(e.getLocalizedMessage()))); - } + + case "compare_and_set": + OtpErlangBinary expected = (OtpErlangBinary)elements[2]; + hbaseClient.compareAndSet(parsePut((OtpErlangTuple)elements[1]), expected.binaryValue()) + .addCallback(new GenServerCallback(from, mbox) { + @Override + protected OtpErlangObject handle(Boolean bool) { + return TypeUtil.tuple(ATOM_OK, bool ? ATOM_TRUE : ATOM_FALSE); + } + }) + .addErrback(new GenServerErrback(from, mbox)); break; - case "set_increment_buffer_size": - final OtpErlangInt incrementBufferSize2 = (OtpErlangInt) elements[1]; - try { - final int resp2 = hbaseClient.setIncrementBufferSize(incrementBufferSize2.intValue()); - reply(from, TypeUtil.tuple(new OtpErlangAtom("ok"), new OtpErlangInt(resp2))); - } catch (final OtpErlangRangeException e) { - reply(from, TypeUtil.tuple(new OtpErlangAtom("error"), new OtpErlangString(e.getClass().getName()), new OtpErlangString(e.getLocalizedMessage()))); + + case "increment": + final AtomicIncrementRequest incrReq = new AtomicIncrementRequest( + ((OtpErlangBinary) elements[1]).binaryValue(), + ((OtpErlangBinary) elements[2]).binaryValue(), + ((OtpErlangBinary) elements[3]).binaryValue(), + ((OtpErlangBinary) elements[4]).binaryValue() + ); + hbaseClient.atomicIncrement(incrReq) + .addCallback(new GenServerCallback(from, mbox) { + @Override + protected OtpErlangObject handle(Long value) { + return TypeUtil.tuple(ATOM_OK, new OtpErlangLong(value)); + } + }) + .addErrback(new GenServerErrback(from, mbox)); + break; + + case "delete": + final OtpErlangBinary table1 = (OtpErlangBinary) elements[1]; + final OtpErlangBinary key1 = (OtpErlangBinary) elements[2]; + OtpErlangBinary family1 = null; + OtpErlangList qualifiers1 = null; + if(elements.length > 3) { + family1 = (OtpErlangBinary) elements[3]; } + if(elements.length > 4) { + qualifiers1 = (OtpErlangList) elements[4]; + } + hbaseClient.delete(TypeUtil.deleteRequest(table1, key1, family1, qualifiers1)) + .addCallback(new GenServerOkCallback(from, mbox)) + .addErrback(new GenServerErrback(from, mbox)); break; + default: final String message = String.format("Invalid request: \"%s\"", req); throw new OtpErlangDecodeException(message); } } + private PutRequest parsePut(OtpErlangTuple tuple) { + final OtpErlangObject[] elements = tuple.elements(); + final OtpErlangBinary table = (OtpErlangBinary) elements[0]; + final OtpErlangBinary key = (OtpErlangBinary) elements[1]; + final OtpErlangBinary family = (OtpErlangBinary) elements[2]; + final OtpErlangList qualifiers = (OtpErlangList) elements[3]; + final OtpErlangList values = (OtpErlangList) elements[4]; + return TypeUtil.putRequest(table, key, family, qualifiers, values); + } + + private OtpErlangObject handleGetConf(OtpErlangAtom confType) { + switch(confType.atomValue()) { + case "flush_interval": + return TypeUtil.tuple(ATOM_OK, new OtpErlangShort(hbaseClient.getFlushInterval())); + case "increment_buffer_size": + return TypeUtil.tuple(ATOM_OK, new OtpErlangInt(hbaseClient.getIncrementBufferSize())); + default: + return TypeUtil.tuple(new OtpErlangAtom("error"), new OtpErlangAtom("unknown_conf_type")); + } + } + + private OtpErlangObject handleSetConf(OtpErlangObject[] elements) { + final OtpErlangAtom confType = (OtpErlangAtom) elements[1]; + try { + switch(confType.atomValue()) { + case "flush_interval": + final OtpErlangLong flushInterval = (OtpErlangLong) elements[2]; + final short nextInterval = hbaseClient.setFlushInterval(flushInterval.shortValue()); + return TypeUtil.tuple(ATOM_OK, new OtpErlangShort(nextInterval)); + case "increment_buffer_size": + final OtpErlangLong bufferSize = (OtpErlangLong) elements[2]; + final int nextBufferSize = hbaseClient.setIncrementBufferSize(bufferSize.intValue()); + return TypeUtil.tuple(ATOM_OK, new OtpErlangInt(nextBufferSize)); + default: + return TypeUtil.tuple(new OtpErlangAtom("error"), new OtpErlangAtom("unknown_conf_type")); + } + } catch (final OtpErlangRangeException e) { + return TypeUtil.tuple(new OtpErlangAtom("error"), new OtpErlangString(e.getClass().getName()), new OtpErlangString(e.getLocalizedMessage())); + } + } + private void reply(final OtpErlangTuple from, OtpErlangObject reply) { - final OtpErlangTuple resp = TypeUtil.tuple(from.elementAt(1), reply); - mbox.send((OtpErlangPid) from.elementAt(0), resp); + OtpErlangPid pid = (OtpErlangPid)from.elementAt(0); + OtpErlangObject ref = from.elementAt(1); + final OtpErlangTuple resp = TypeUtil.tuple(ref, reply); + mbox.send(pid, resp); } @Override @@ -162,6 +239,8 @@ protected void run() throws Exception { } catch (final OtpErlangDecodeException | ClassCastException | ArrayIndexOutOfBoundsException e) { log.error(e.getMessage()); log.info("Unrecognised message, ignored."); + } catch (final Exception e) { + log.error(e.getMessage()); } } } diff --git a/java_src/main/java/me/cmoz/diver/ScannerErrback.java b/java_src/main/java/me/cmoz/diver/ScannerErrback.java new file mode 100644 index 0000000..d8ffd54 --- /dev/null +++ b/java_src/main/java/me/cmoz/diver/ScannerErrback.java @@ -0,0 +1,24 @@ +package me.cmoz.diver; + +import com.ericsson.otp.erlang.*; + +class ScannerErrback extends GenServerCallback { + private final OtpErlangRef ref; + + public ScannerErrback(OtpErlangTuple from, OtpMbox mbox, OtpErlangRef ref) { + super(from, mbox); + this.ref = ref; + } + + @Override + protected OtpErlangObject handle(Exception e) { + final OtpErlangObject[] body = new OtpErlangObject[] { + ref, + JavaServer.ATOM_ERROR, + new OtpErlangString(e.getClass().getSimpleName()), + new OtpErlangString(e.getLocalizedMessage()) + }; + return new OtpErlangTuple(body); + } + +} diff --git a/java_src/main/java/me/cmoz/diver/TypeUtil.java b/java_src/main/java/me/cmoz/diver/TypeUtil.java index e9a9896..c404a8c 100644 --- a/java_src/main/java/me/cmoz/diver/TypeUtil.java +++ b/java_src/main/java/me/cmoz/diver/TypeUtil.java @@ -28,7 +28,7 @@ static OtpErlangTuple clientStats(final ClientStats stats) { map.put("scanners_opened", stats.scannersOpened()); map.put("scans", stats.scans()); map.put("uncontended_meta_lookups", stats.uncontendedMetaLookups()); - return tuple(new OtpErlangAtom("ok"), proplist(map)); + return tuple(JavaServer.ATOM_OK, proplist(map)); } static OtpErlangList proplist(final Map map) { @@ -59,18 +59,19 @@ static PutRequest putRequest( final OtpErlangBinary family, final OtpErlangList qualifiers, final OtpErlangList values) { - final byte[][] byteQualifiers = new byte[qualifiers.arity()][]; - int i = 0; - for (final OtpErlangObject qualifier : qualifiers) { - byteQualifiers[i] = ((OtpErlangBinary) qualifier).binaryValue(); - i++; + if(qualifiers.arity() != values.arity()) { + throw new IllegalArgumentException("dimension mismatch: " + qualifiers.arity() + " != " + values.arity()); } - final byte[][] byteValues = new byte[values.arity()][]; - i = 0; - for (final OtpErlangObject value : values) { - byteValues[i] = ((OtpErlangBinary) value).binaryValue(); - i++; + + int size = qualifiers.arity(); + final byte[][] byteQualifiers = new byte[size][]; + final byte[][] byteValues = new byte[size][]; + + for(int i = 0; i < size; i++) { + byteQualifiers[i] = ((OtpErlangBinary) qualifiers.elementAt(i)).binaryValue(); + byteValues[i] = ((OtpErlangBinary) values.elementAt(i)).binaryValue(); } + return new PutRequest(table.binaryValue(), key.binaryValue(), family.binaryValue(), byteQualifiers, byteValues); } @@ -79,11 +80,16 @@ static DeleteRequest deleteRequest( final OtpErlangBinary key, final OtpErlangBinary family, final OtpErlangList qualifiers) { - final byte[][] byteQualifiers = new byte[qualifiers.arity()][]; - int i = 0; - for (final OtpErlangObject qualifier : qualifiers) { - byteQualifiers[i] = ((OtpErlangBinary) qualifier).binaryValue(); - i++; + if(family == null) { + return new DeleteRequest(table.binaryValue(), key.binaryValue()); + } else if(qualifiers == null) { + return new DeleteRequest(table.binaryValue(), key.binaryValue(), family.binaryValue()); + } + + int size = qualifiers.arity(); + final byte[][] byteQualifiers = new byte[size][]; + for(int i = 0; i < size; i++) { + byteQualifiers[i] = ((OtpErlangBinary) qualifiers.elementAt(i)).binaryValue(); } return new DeleteRequest(table.binaryValue(), key.binaryValue(), family.binaryValue(), byteQualifiers); } @@ -93,7 +99,9 @@ static GetRequest getRequest( final OtpErlangBinary key, final OtpErlangBinary family, final OtpErlangBinary qualifier) { - if(qualifier == null) { + if(family == null) { + return new GetRequest(table.binaryValue(), key.binaryValue()); + } else if(qualifier == null) { return new GetRequest(table.binaryValue(), key.binaryValue(), family.binaryValue()); } else { return new GetRequest(table.binaryValue(), key.binaryValue(), family.binaryValue(), qualifier.binaryValue()); diff --git a/lib/diver-0.3.0-dev.jar b/lib/diver-0.3.0-dev.jar new file mode 100644 index 0000000..0ada6f7 Binary files /dev/null and b/lib/diver-0.3.0-dev.jar differ diff --git a/priv/diver-0.3.0-dev.jar b/priv/diver-0.3.0-dev.jar index 743514d..a6540f1 100644 Binary files a/priv/diver-0.3.0-dev.jar and b/priv/diver-0.3.0-dev.jar differ diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..9edfd40 --- /dev/null +++ b/rebar.config @@ -0,0 +1,5 @@ +{erl_opts, [debug_info]}. +{deps, []}. +{dist_node, [ + {name, 'test@127.0.0.1'} +]}. diff --git a/rebar.lock b/rebar.lock new file mode 100644 index 0000000..57afcca --- /dev/null +++ b/rebar.lock @@ -0,0 +1 @@ +[]. diff --git a/src/hbase.app.src b/src/hbase.app.src new file mode 100644 index 0000000..75549c7 --- /dev/null +++ b/src/hbase.app.src @@ -0,0 +1,11 @@ +{application,hbase, + [{description,"hbase connector with jinterface"}, + {vsn,"0.0.4"}, + {registered,[]}, + {mod,{hbase_app,[]}}, + {applications,[kernel,stdlib]}, + {env,[]}, + {modules,[]}, + {maintainers,[]}, + {licenses,[]}, + {links,[]}]}. diff --git a/src/hbase.erl b/src/hbase.erl new file mode 100644 index 0000000..0e431f4 --- /dev/null +++ b/src/hbase.erl @@ -0,0 +1,5 @@ +-module(hbase). +-export([start/0]). + +start() -> + application:ensure_all_started(hbase). diff --git a/src/hbase_app.erl b/src/hbase_app.erl new file mode 100644 index 0000000..5b1c185 --- /dev/null +++ b/src/hbase_app.erl @@ -0,0 +1,11 @@ +-module(hbase_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_StartType, _StartArgs) -> + hbase_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/hbase_server.erl b/src/hbase_server.erl new file mode 100644 index 0000000..3b46e22 --- /dev/null +++ b/src/hbase_server.erl @@ -0,0 +1,234 @@ +-module(hbase_server). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-export([server/0]). +-export([ensure_table_exists/1, ensure_table_family_exists/2]). +-export([get_config/1, set_config/2]). +-export([flush/0, prefetch_meta/1]). + +-export([get/2, get/3, get/4, scan/3, scan_sync/2]). +-export([put/5, compare_and_set/6, increment/4]). +-export([delete/2, delete/3, delete/4]). + +-define(CONNECT_TIMEOUT, (10 * 1000)). + +-type table() :: binary(). +-type cf() :: binary(). +-type rowkey() :: binary(). +-type qualifier() :: binary(). +-type value() :: binary(). +-type ts() :: integer(). + +-type hbase_tuples() :: [hbase_tuple()]. +-type hbase_tuple() :: {cf(), rowkey(), qualifier(), value(), ts()}. + +-type config_key() :: flush_interval | increment_buffer_size. +-type scan_opts() :: [scan_opt()]. +-type scan_opt() :: {num_rows, integer()} + | {family, binary()} + | {key_regexp, binary()} + | {max_num_bytes, integer()} + | {max_num_keyvalues, integer()} + | {max_num_rows, integer()} + | {max_timestamp, integer()} + | {max_versions, integer()} + | {qualifier, integer()} + | {server_block_cache, integer()} + | {start_key, binary()} + | {stop_key, binary()} + | {time_range, integer(), integer()} + | {filter, filter_opts()}. + +-type filter_opts() :: [filter_opt()]. +-type filter_opt() :: {column_prefix, binary()} + | {column_range, binary(), binary()} + | {first_key_only} + | {fuzzy_row, [{binary(), binary()}]} + | {key_only} + | {key_regexp, binary()}. + +-type error() :: {error, binary(), binary()} | {error, atom()}. + +-spec server() -> {atom(), atom()}. +server() -> + {ok, NodeName} = gen_server:call(?MODULE, nodename), + NodeName. + +-spec ensure_table_exists(table()) -> ok | error(). +ensure_table_exists(Table) -> + gen_server:call(server(), {ensure_table_exists, Table}). + +-spec ensure_table_family_exists(table(), cf()) -> ok | error(). +ensure_table_family_exists(Table, CF) -> + gen_server:call(server(), {ensure_table_family_exists, Table, CF}). + +-spec get_config(config_key()) -> {ok, integer()} | error(). +get_config(Option) -> + gen_server:call(server(), {get_conf, Option}). + +-spec set_config(config_key(), integer()) -> {ok, integer()} | error(). +set_config(Option, Value) -> + gen_server:call(server(), {set_conf, Option, Value}). + +-spec flush() -> ok | error(). +flush() -> + gen_server:call(server(), {flush}). + +-spec prefetch_meta(table()) -> ok | error(). +prefetch_meta(Table) -> + gen_server:call(server(), {prefetch_meta, Table}). + +-spec get(table(), rowkey()) -> {ok, hbase_tuples()} | error(). +get(Table, Key) -> + gen_server:call(server(), {get, Table, Key}). + +-spec get(table(), rowkey(), cf()) -> {ok, hbase_tuples()} | error(). +get(Table, Key, CF) -> + gen_server:call(server(), {get, Table, Key, CF}). + +-spec get(table(), rowkey(), cf(), qualifier()) -> {ok, hbase_tuples()} | error(). +get(Table, Key, CF, Qualifier) -> + gen_server:call(server(), {get, Table, Key, CF, Qualifier}). + +-spec scan(binary(), scan_opts(), reference()) -> ok | error(). +scan(Table, Opts, Ref) -> + gen_server:call(server(), {scan, Table, Opts, Ref}). + +-spec scan_sync(binary(), scan_opts()) -> {ok, [hbase_tuples()]} | error(). +scan_sync(Table, Opts) -> + Ref = make_ref(), + ok = scan(Table, Opts, Ref), + receive_scan(Ref). + +receive_scan(Ref) -> + receive_scan(Ref, []). + +receive_scan(Ref, Acc) -> + receive + {Ref, row, Row} -> + receive_scan(Ref, [Row | Acc]); + {Ref, done} -> + {ok, lists:reverse(Acc)}; + {Ref, error, _, _, _} -> + {error, internal} + after 5000 -> {error, timeout} + end. + +-spec put(table(), rowkey(), cf(), [qualifier()], [value()]) -> {ok, list()}. +put(Table, Key, CF, Qualifiers, Values) -> + gen_server:call(server(), {put, {Table, Key, CF, Qualifiers, Values}}). + +-spec compare_and_set(table(), rowkey(), cf(), qualifier(), value(), value()) -> {ok, true | false}. +compare_and_set(Table, Key, CF, Qualifier, Value, Expected) -> + gen_server:call(server(), {compare_and_set, {Table, Key, CF, [Qualifier], [Value]}, Expected}). + +-spec increment(table(), rowkey(), cf(), qualifier()) -> {ok, number()}. +increment(Table, Key, CF, Qualifier) -> + gen_server:call(server(), {increment, Table, Key, CF, Qualifier}). + +-spec delete(table(), rowkey()) -> ok. +delete(Table, Key) -> + gen_server:call(server(), {delete, Table, Key}). + +-spec delete(table(), rowkey(), cf()) -> ok. +delete(Table, Key, CF) -> + gen_server:call(server(), {delete, Table, Key, CF}). + +-spec delete(table(), rowkey(), cf(), [qualifier()]) -> ok. +delete(Table, Key, CF, Qualifiers) -> + gen_server:call(server(), {delete, Table, Key, CF, Qualifiers}). + +-define(PROC_NAME, java_diver_server). +-define(JAR_NAME, "diver-0.3.0-dev.jar"). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + case node() of + 'nonode@nohost' -> + error_logger:error_msg("hbase_server requires distirubed Erlang." + " Use '-name' or '-sname' flag to start distributed Erlang."), + {stop, net_kernel_not_started}; + Node -> + JavaNode = list_to_atom("__diver__" ++ atom_to_list(Node)), + Pid = start_jvm(JavaNode), + case wait_start(Pid, JavaNode) of + ok -> + Server = {?PROC_NAME, JavaNode}, + % force connect to HBase by sending arbitrary query + gen_server:call(Server, {ensure_table_exists, <<"foo">>}, ?CONNECT_TIMEOUT), + % disable query batching + gen_server:call(Server, {set_conf, flush_interval, 0}, ?CONNECT_TIMEOUT), + {ok, #{pid => Pid, java_node => JavaNode}}; + {stop, Reason} -> + {stop, Reason} + end + end. + +start_jvm(NodeName) -> + Self = atom_to_list(node()), + JarFile = code:priv_dir(hbase) ++ "/" ?JAR_NAME, + {ok, HbaseQuorum} = application:get_env(hbase, hbase_quorum), + {ok, HbasePath} = application:get_env(hbase, hbase_path), + Args = ["-jar", JarFile, Self, NodeName, erlang:get_cookie(), atom_to_list(?PROC_NAME), HbaseQuorum, HbasePath], + error_logger:info_msg("JVM args: ~p~n", [Args]), + case os:find_executable("java") of + false -> + {stop, no_java_executable}; + ExecPath -> + open_port({spawn_executable, ExecPath}, [{line, 1000}, {args, Args}, stderr_to_stdout, exit_status]) + end. + +wait_start(Pid, JavaNode) -> + receive + {Pid, {data, {eol, "READY"}}} -> + error_logger:info_msg("JVM process started"), + net_kernel:connect(JavaNode), + {ok, NodePid} = gen_server:call({?PROC_NAME, JavaNode}, {pid}), + true = link(NodePid), + true = erlang:monitor_node(JavaNode, true), + ok; + {Pid, {data, {eol, Data}}} -> + error_logger:info_msg("unknown output from JVM, stopping: ~p", [Data]), + {stop, Data}; + {Pid, {exit_status, Status}} -> + error_logger:info_msg("JVM exited with status code: ~p", [Status]), + {stop, exit}; + Msg -> + {stop, Msg} + end. + +handle_call(nodename, _From, #{java_node := JavaNode} = State) -> + {reply, {ok, {?PROC_NAME, JavaNode}}, State}; +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({node_down, JavaNode}, #{java_node := JavaNode} = S) -> + error_logger:info_msg("Java server process is down."), + {stop, nodedown, S}; +handle_info({Pid, {data, {eol, Log}}}, #{pid := Pid} = S) -> + error_logger:info_msg(Log), + {noreply, S}; +handle_info(_Msg, S) -> + {noreply, S}. + +terminate(_Reason, #{pid := Pid} = _State) -> + port_close(Pid), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/hbase_sup.erl b/src/hbase_sup.erl new file mode 100644 index 0000000..bf1a4ca --- /dev/null +++ b/src/hbase_sup.erl @@ -0,0 +1,19 @@ +-module(hbase_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 1000, Type, [I]}). + +init([]) -> + {ok, { {one_for_all, 0, 1}, [ + ?CHILD(hbase_server, worker) + ]} }.