Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implements 'scan' command #4

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,4 @@ local.properties
# TeXlipse plugin
.texlipse

*.config
14 changes: 8 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ sourceSets.main.compileClasspath += configurations.providedCompile
sourceSets.test.compileClasspath += configurations.providedCompile

dependencies {
def slf4jVersion = '1.7.7'
def slf4jVersion = '1.7.21'

compile('org.hbase:asynchbase:1.7.2') {
exclude group: 'org.jboss.netty'
}

providedCompile(
[group: 'org.projectlombok', name: 'lombok', version: '1.14.8'])
[group: 'org.projectlombok', name: 'lombok', version: '1.16.10'])
compile(
[group: 'org.slf4j', name: 'slf4j-api', version: slf4jVersion],
[group: 'com.google.inject', name: 'guice', version: '3.0'],
[group: 'com.google.guava', name: 'guava', version: '18.0'],
[group: 'org.erlang.otp', name: 'jinterface', version: '1.5.6'],
[group: 'org.hbase', name: 'asynchbase', version: '1.6.0'])
[group: 'com.google.inject', name: 'guice', version: '4.1.0'],
[group: 'org.erlang.otp', name: 'jinterface', version: '1.6.1'])
runtime(
[group: 'org.slf4j', name: 'slf4j-simple', version: slf4jVersion])
}
Expand Down
201 changes: 201 additions & 0 deletions java_src/main/java/me/cmoz/diver/AsyncScanner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package me.cmoz.diver;

import com.ericsson.otp.erlang.*;
import com.stumbleupon.async.Callback;
import org.hbase.async.*;

import java.util.ArrayList;
import java.util.Arrays;

class AsyncScanner implements Callback<Object, ArrayList<ArrayList<KeyValue>>> {
private static final OtpErlangAtom ROW_ATOM = new OtpErlangAtom("row");
private static final OtpErlangAtom DONE_ATOM = new OtpErlangAtom("done");

private final OtpErlangTuple from;
private final OtpMbox mbox;
private final OtpErlangRef ref;
private final Scanner scanner;
int numRows = Integer.MAX_VALUE;

public AsyncScanner(OtpErlangTuple from, OtpMbox mbox, OtpErlangRef ref, Scanner scanner, OtpErlangList options)
throws OtpErlangDecodeException {
this.from = from;
this.mbox = mbox;
this.ref = ref;
this.scanner = scanner;

// prevent returning partial row by default
scanner.setMaxNumKeyValues(-1);

for (final OtpErlangObject option : options) {
final OtpErlangTuple tuple = (OtpErlangTuple) option;
final OtpErlangObject[] tupleElements = tuple.elements();
final String optionName = ((OtpErlangAtom) tupleElements[0]).atomValue();
final OtpErlangObject optionValue = tupleElements[1];

switch(optionName) {
case "num_rows":
numRows = (int)((OtpErlangLong) optionValue).longValue();
scanner.setMaxNumRows(numRows);
break;

// TODO: setFamilies
case "family":
scanner.setFamily(((OtpErlangBinary) optionValue).binaryValue());
break;
// TODO: setFilter
case "key_regexp":
scanner.setKeyRegexp(new String(((OtpErlangBinary) optionValue).binaryValue()));
break;
// TODO: setKeyRegexp(regesp, charset)
case "max_num_bytes":
scanner.setMaxTimestamp(((OtpErlangLong) optionValue).longValue());
break;
case "max_num_keyvalues":
scanner.setMaxNumKeyValues((int)((OtpErlangLong) optionValue).longValue());
break;
case "max_num_rows":
scanner.setMaxNumRows((int)((OtpErlangLong) optionValue).longValue());
break;
case "max_timestamp":
scanner.setMaxTimestamp(((OtpErlangLong) optionValue).longValue());
break;
case "max_versions":
scanner.setMaxVersions((int)((OtpErlangLong) optionValue).longValue());
break;
case "qualifier":
scanner.setQualifier(((OtpErlangBinary) optionValue).binaryValue());
break;
// TODO: setQualifiers
case "server_block_cache":
scanner.setServerBlockCache(((OtpErlangLong) optionValue).longValue() != 0);
break;
case "start_key":
scanner.setStartKey(((OtpErlangBinary) optionValue).binaryValue());
break;
case "stop_key":
scanner.setStopKey(((OtpErlangBinary) optionValue).binaryValue());
break;
case "time_range":
final OtpErlangObject[] timeRangeElems = ((OtpErlangTuple)optionValue).elements();
scanner.setTimeRange(
((OtpErlangLong) timeRangeElems[0]).longValue(),
((OtpErlangLong) timeRangeElems[1]).longValue());
break;
case "filter":
scanner.setFilter(filterFromTuple((OtpErlangTuple)optionValue));
break;
default:
final String message = String.format("Invalid scan option: \"%s\"", tuple);
throw new OtpErlangDecodeException(message);
}
}
}

private ScanFilter filterFromTuple(OtpErlangTuple tuple) {
OtpErlangObject[] objs = tuple.elements();
OtpErlangAtom name = (OtpErlangAtom)objs[0];

switch(name.atomValue()) {
//TODO: column pagination
case "column_prefix":
return new ColumnPrefixFilter(
((OtpErlangBinary)objs[1]).binaryValue()
);
case "column_range":
return new ColumnRangeFilter(
((OtpErlangBinary)objs[1]).binaryValue(),
((OtpErlangBinary)objs[2]).binaryValue()
);
//TODO: compare
case "first_key_only":
return new FirstKeyOnlyFilter();
case "fuzzy_row":
return fuzzyRowFilterFromList((OtpErlangList)objs[1]);
case "key_only":
return new KeyOnlyFilter();
case "key_regexp":
return new KeyRegexpFilter(((OtpErlangBinary)objs[1]).binaryValue());
//TODO: timestamps
default:
throw new IllegalArgumentException("unknown filter key: " + name.atomValue());
}
}

private FuzzyRowFilter fuzzyRowFilterFromList(OtpErlangList list) {
OtpErlangObject[] objects = list.elements();
FuzzyRowFilter.FuzzyFilterPair[] pairs = new FuzzyRowFilter.FuzzyFilterPair[objects.length];
for(int i = 0; i < objects.length; i++) {
OtpErlangTuple tup = (OtpErlangTuple)objects[i];
if(tup.arity() != 2) {
throw new IllegalArgumentException("invalid option for fuzzy row filter: " + tup.toString());
}
pairs[i] = new FuzzyRowFilter.FuzzyFilterPair(
((OtpErlangBinary)tup.elementAt(0)).binaryValue(),
((OtpErlangBinary)tup.elementAt(1)).binaryValue());
}
return new FuzzyRowFilter(Arrays.asList(pairs));
}

public void start() {
scanner.nextRows()
.addCallback(this)
.addErrback(new ScannerErrback(from, mbox, ref));
}

@Override
public Object call(ArrayList<ArrayList<KeyValue>> rows) throws Exception {
if (rows == null) {
sendDone();
return null;
}

for(final ArrayList<KeyValue> row : rows) {
sendRow(row);

numRows -= 1;
if(numRows == 0) {
sendDone();
return null;
}
}

scanner.nextRows()
.addCallback(this)
.addErrback(new ScannerErrback(from, mbox, ref));
return null;
}

public void sendDone() {
final OtpErlangObject[] body = new OtpErlangObject[] {
ref,
DONE_ATOM,
};

mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(body));
}

public void sendRow(final ArrayList<KeyValue> data) throws Exception {
final OtpErlangObject[] items = new OtpErlangObject[data.size()];
int i = 0;
for (final KeyValue keyValue : data) {
final OtpErlangObject[] erldata = new OtpErlangObject[] {
new OtpErlangBinary(keyValue.key()),
new OtpErlangBinary(keyValue.family()),
new OtpErlangBinary(keyValue.qualifier()),
new OtpErlangBinary(keyValue.value()),
new OtpErlangLong(keyValue.timestamp())
};
items[i] = new OtpErlangTuple(erldata);
i++;
}

final OtpErlangObject[] body = new OtpErlangObject[] {
ref,
ROW_ATOM,
new OtpErlangList(items)
};

mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(body));
}
}
22 changes: 22 additions & 0 deletions java_src/main/java/me/cmoz/diver/GenServerCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package me.cmoz.diver;

import com.ericsson.otp.erlang.*;
import com.stumbleupon.async.Callback;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public abstract class GenServerCallback<R, T> implements Callback<R, T> {
private final OtpErlangTuple from;
private final OtpMbox mbox;

@Override
public R call(final T data) throws Exception {
final OtpErlangPid pid = (OtpErlangPid) from.elementAt(0);
final OtpErlangRef ref = (OtpErlangRef) from.elementAt(1);

mbox.send(pid, TypeUtil.tuple(ref, handle(data)));
return null;
}

protected abstract OtpErlangObject handle(T t);
}
34 changes: 10 additions & 24 deletions java_src/main/java/me/cmoz/diver/GenServerErrback.java
Original file line number Diff line number Diff line change
@@ -1,33 +1,19 @@
package me.cmoz.diver;

import com.ericsson.otp.erlang.*;
import com.stumbleupon.async.Callback;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
class GenServerErrback implements Callback<Object, Exception> {

private static final OtpErlangAtom ERROR_ATOM = new OtpErlangAtom("error");

private final OtpErlangTuple from;

private final OtpMbox mbox;
class GenServerErrback extends GenServerCallback<Object, Exception> {
public GenServerErrback(OtpErlangTuple from, OtpMbox mbox) {
super(from, mbox);
}

@Override
public Object call(final Exception e) throws Exception {
final OtpErlangObject[] body = new OtpErlangObject[] {
ERROR_ATOM,
new OtpErlangString(e.getClass().getSimpleName()),
new OtpErlangString(e.getLocalizedMessage())
protected OtpErlangObject handle(Exception e) {
final OtpErlangObject[] body = new OtpErlangObject[]{
JavaServer.ATOM_ERROR,
new OtpErlangString(e.getClass().getSimpleName()),
new OtpErlangString(e.getLocalizedMessage())
};

final OtpErlangObject[] resp = new OtpErlangObject[] {
from.elementAt(1), // Ref
new OtpErlangTuple(body)
};

mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(resp));
return null;
return new OtpErlangTuple(body);
}

}
40 changes: 13 additions & 27 deletions java_src/main/java/me/cmoz/diver/GenServerGetCallback.java
Original file line number Diff line number Diff line change
@@ -1,49 +1,35 @@
package me.cmoz.diver;

import com.ericsson.otp.erlang.*;
import com.stumbleupon.async.Callback;
import lombok.RequiredArgsConstructor;
import org.hbase.async.KeyValue;

import java.util.ArrayList;

@RequiredArgsConstructor
class GenServerGetCallback implements Callback<Object, ArrayList<KeyValue>> {

private static final OtpErlangAtom OK_ATOM = new OtpErlangAtom("ok");

private final OtpErlangTuple from;

private final OtpMbox mbox;
class GenServerGetCallback extends GenServerCallback<Object,ArrayList<KeyValue>> {
public GenServerGetCallback(OtpErlangTuple from, OtpMbox mbox) {
super(from, mbox);
}

@Override
public Object call(final ArrayList<KeyValue> data) throws Exception {
protected OtpErlangObject handle(ArrayList<KeyValue> data) {
final OtpErlangObject[] items = new OtpErlangObject[data.size()];
int i = 0;
for (final KeyValue keyValue : data) {
final OtpErlangObject[] erldata = new OtpErlangObject[] {
new OtpErlangBinary(keyValue.key()),
new OtpErlangBinary(keyValue.family()),
new OtpErlangBinary(keyValue.qualifier()),
new OtpErlangBinary(keyValue.value()),
new OtpErlangLong(keyValue.timestamp())
new OtpErlangBinary(keyValue.key()),
new OtpErlangBinary(keyValue.family()),
new OtpErlangBinary(keyValue.qualifier()),
new OtpErlangBinary(keyValue.value()),
new OtpErlangLong(keyValue.timestamp())
};
items[i] = new OtpErlangTuple(erldata);
i++;
}

final OtpErlangObject[] body = new OtpErlangObject[] {
OK_ATOM,
new OtpErlangList(items)
};

final OtpErlangObject[] resp = new OtpErlangObject[] {
from.elementAt(1), // Ref
new OtpErlangTuple(body)
JavaServer.ATOM_OK,
new OtpErlangList(items)
};

mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(resp));
return null;
return new OtpErlangTuple(body);
}

}
27 changes: 7 additions & 20 deletions java_src/main/java/me/cmoz/diver/GenServerOkCallback.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,14 @@
package me.cmoz.diver;

import com.ericsson.otp.erlang.*;
import com.stumbleupon.async.Callback;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
class GenServerOkCallback implements Callback<Object, Object> {

private static final OtpErlangAtom OK_ATOM = new OtpErlangAtom("ok");

private final OtpErlangTuple from;

private final OtpMbox mbox;
class GenServerOkCallback extends GenServerCallback<Object,Object> {
public GenServerOkCallback(OtpErlangTuple from, OtpMbox mbox) {
super(from, mbox);
}

@Override
public Object call(final Object object) throws Exception {
final OtpErlangObject[] resp = new OtpErlangObject[] {
from.elementAt(1), // Ref
new OtpErlangTuple(OK_ATOM)
};

mbox.send((OtpErlangPid) from.elementAt(0), new OtpErlangTuple(resp));
return null;
protected OtpErlangObject handle(Object o) {
return JavaServer.ATOM_OK;
}

}
}
Loading