Skip to content

Commit

Permalink
Merge pull request #4199 from cbeams/refactor-rpc-server
Browse files Browse the repository at this point in the history
Refactor rpc server implementation
  • Loading branch information
cbeams committed Apr 29, 2020
2 parents e03c461 + 3badf29 commit 4277e62
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 119 deletions.
9 changes: 9 additions & 0 deletions common/src/main/java/bisq/common/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class Config {
public static final String DUMP_DELAYED_PAYOUT_TXS = "dumpDelayedPayoutTxs";
public static final String ALLOW_FAULTY_DELAYED_TXS = "allowFaultyDelayedTxs";
public static final String API_PASSWORD = "apiPassword";
public static final String API_PORT = "apiPort";

// Default values for certain options
public static final int UNSPECIFIED_PORT = -1;
Expand Down Expand Up @@ -201,6 +202,7 @@ public class Config {
public final boolean dumpDelayedPayoutTxs;
public final boolean allowFaultyDelayedTxs;
public final String apiPassword;
public final int apiPort;

// Properties derived from options but not exposed as options themselves
public final File torDir;
Expand Down Expand Up @@ -622,6 +624,12 @@ public Config(String defaultAppName, File defaultUserDataDir, String... args) {
.withRequiredArg()
.defaultsTo("");

ArgumentAcceptingOptionSpec<Integer> apiPortOpt =
parser.accepts(API_PORT, "gRPC API port")
.withRequiredArg()
.ofType(Integer.class)
.defaultsTo(9998);

try {
CompositeOptionSet options = new CompositeOptionSet();

Expand Down Expand Up @@ -735,6 +743,7 @@ public Config(String defaultAppName, File defaultUserDataDir, String... args) {
this.dumpDelayedPayoutTxs = options.valueOf(dumpDelayedPayoutTxsOpt);
this.allowFaultyDelayedTxs = options.valueOf(allowFaultyDelayedTxsOpt);
this.apiPassword = options.valueOf(apiPasswordOpt);
this.apiPort = options.valueOf(apiPortOpt);
} catch (OptionException ex) {
throw new ConfigException("problem parsing option '%s': %s",
ex.options().get(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,100 +42,112 @@
import bisq.proto.grpc.PlaceOfferGrpc;
import bisq.proto.grpc.PlaceOfferReply;
import bisq.proto.grpc.PlaceOfferRequest;
import bisq.proto.grpc.StopServerGrpc;
import bisq.proto.grpc.StopServerReply;
import bisq.proto.grpc.StopServerRequest;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;

import java.io.IOException;

import java.util.List;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;


/**
* gRPC server. Gets a instance of BisqFacade passed to access data from the running Bisq instance.
*/
@Slf4j
public class BisqGrpcServer {
public class GrpcServer {

private Server server;
private final CoreApi coreApi;
private final int port;

private static BisqGrpcServer instance;
private static Config config;
private static CoreApi coreApi;
public GrpcServer(Config config, CoreApi coreApi) {
this.coreApi = coreApi;
this.port = config.apiPort;

try {
var server = ServerBuilder.forPort(port)
.addService(new GetVersionService())
.addService(new GetBalanceService())
.addService(new GetTradeStatisticsService())
.addService(new GetOffersService())
.addService(new GetPaymentAccountsService())
.addService(new PlaceOfferService())
.intercept(new PasswordAuthInterceptor(config.apiPassword))
.build()
.start();

log.info("listening on port {}", port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
server.shutdown();
log.info("shutdown complete");
}));

///////////////////////////////////////////////////////////////////////////////////////////
// Services
///////////////////////////////////////////////////////////////////////////////////////////
} catch (IOException e) {
log.error(e.toString(), e);
}
}

static class GetVersionImpl extends GetVersionGrpc.GetVersionImplBase {
class GetVersionService extends GetVersionGrpc.GetVersionImplBase {
@Override
public void getVersion(GetVersionRequest req, StreamObserver<GetVersionReply> responseObserver) {
GetVersionReply reply = GetVersionReply.newBuilder().setVersion(coreApi.getVersion()).build();
var reply = GetVersionReply.newBuilder().setVersion(coreApi.getVersion()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

static class GetBalanceImpl extends GetBalanceGrpc.GetBalanceImplBase {
class GetBalanceService extends GetBalanceGrpc.GetBalanceImplBase {
@Override
public void getBalance(GetBalanceRequest req, StreamObserver<GetBalanceReply> responseObserver) {
GetBalanceReply reply = GetBalanceReply.newBuilder().setBalance(coreApi.getAvailableBalance()).build();
var reply = GetBalanceReply.newBuilder().setBalance(coreApi.getAvailableBalance()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

static class GetTradeStatisticsImpl extends GetTradeStatisticsGrpc.GetTradeStatisticsImplBase {
class GetTradeStatisticsService extends GetTradeStatisticsGrpc.GetTradeStatisticsImplBase {
@Override
public void getTradeStatistics(GetTradeStatisticsRequest req,
StreamObserver<GetTradeStatisticsReply> responseObserver) {
List<protobuf.TradeStatistics2> tradeStatistics = coreApi.getTradeStatistics().stream()

var tradeStatistics = coreApi.getTradeStatistics().stream()
.map(TradeStatistics2::toProtoTradeStatistics2)
.collect(Collectors.toList());
GetTradeStatisticsReply reply = GetTradeStatisticsReply.newBuilder().addAllTradeStatistics(tradeStatistics).build();

var reply = GetTradeStatisticsReply.newBuilder().addAllTradeStatistics(tradeStatistics).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

static class GetOffersImpl extends GetOffersGrpc.GetOffersImplBase {
class GetOffersService extends GetOffersGrpc.GetOffersImplBase {
@Override
public void getOffers(GetOffersRequest req, StreamObserver<GetOffersReply> responseObserver) {

List<protobuf.Offer> tradeStatistics = coreApi.getOffers().stream()
var tradeStatistics = coreApi.getOffers().stream()
.map(Offer::toProtoMessage)
.collect(Collectors.toList());

GetOffersReply reply = GetOffersReply.newBuilder().addAllOffers(tradeStatistics).build();
var reply = GetOffersReply.newBuilder().addAllOffers(tradeStatistics).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

static class GetPaymentAccountsImpl extends GetPaymentAccountsGrpc.GetPaymentAccountsImplBase {
class GetPaymentAccountsService extends GetPaymentAccountsGrpc.GetPaymentAccountsImplBase {
@Override
public void getPaymentAccounts(GetPaymentAccountsRequest req,
StreamObserver<GetPaymentAccountsReply> responseObserver) {

List<protobuf.PaymentAccount> tradeStatistics = coreApi.getPaymentAccounts().stream()
var tradeStatistics = coreApi.getPaymentAccounts().stream()
.map(PaymentAccount::toProtoMessage)
.collect(Collectors.toList());

GetPaymentAccountsReply reply = GetPaymentAccountsReply.newBuilder().addAllPaymentAccounts(tradeStatistics).build();
var reply = GetPaymentAccountsReply.newBuilder().addAllPaymentAccounts(tradeStatistics).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

static class PlaceOfferImpl extends PlaceOfferGrpc.PlaceOfferImplBase {
class PlaceOfferService extends PlaceOfferGrpc.PlaceOfferImplBase {
@Override
public void placeOffer(PlaceOfferRequest req, StreamObserver<PlaceOfferReply> responseObserver) {
TransactionResultHandler resultHandler = transaction -> {
Expand All @@ -156,75 +168,4 @@ public void placeOffer(PlaceOfferRequest req, StreamObserver<PlaceOfferReply> re
resultHandler);
}
}

static class StopServerImpl extends StopServerGrpc.StopServerImplBase {
@Override
public void stopServer(StopServerRequest req, StreamObserver<StopServerReply> responseObserver) {
StopServerReply reply = StopServerReply.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();

instance.stop();
}
}


///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////

public BisqGrpcServer(Config config, CoreApi coreApi) {
instance = this;

BisqGrpcServer.config = config;
BisqGrpcServer.coreApi = coreApi;

try {
start();

} catch (IOException e) {
log.error(e.toString(), e);
}
}


///////////////////////////////////////////////////////////////////////////////////////////
// API
///////////////////////////////////////////////////////////////////////////////////////////

public void stop() {
if (server != null) {
server.shutdown();
}
}

///////////////////////////////////////////////////////////////////////////////////////////
// Private
///////////////////////////////////////////////////////////////////////////////////////////

private void start() throws IOException {
// TODO add to options
int port = 9998;

// Config services
server = ServerBuilder.forPort(port)
.addService(new GetVersionImpl())
.addService(new GetBalanceImpl())
.addService(new GetTradeStatisticsImpl())
.addService(new GetOffersImpl())
.addService(new GetPaymentAccountsImpl())
.addService(new PlaceOfferImpl())
.addService(new StopServerImpl())
.intercept(new PasswordAuthInterceptor(config.apiPassword))
.build()
.start();

log.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
log.error("Shutting down gRPC server");
BisqGrpcServer.this.stop();
log.error("Server shut down");
}));
}
}
4 changes: 2 additions & 2 deletions daemon/src/main/java/bisq/daemon/app/BisqDaemonMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import bisq.core.app.BisqHeadlessAppMain;
import bisq.core.app.BisqSetup;
import bisq.core.app.CoreModule;
import bisq.core.grpc.BisqGrpcServer;
import bisq.core.grpc.GrpcServer;
import bisq.core.grpc.CoreApi;

import bisq.common.UserThread;
Expand Down Expand Up @@ -99,6 +99,6 @@ protected void onApplicationStarted() {
super.onApplicationStarted();

CoreApi coreApi = injector.getInstance(CoreApi.class);
new BisqGrpcServer(config, coreApi);
new GrpcServer(config, coreApi);
}
}
15 changes: 0 additions & 15 deletions proto/src/main/proto/grpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,3 @@ message PlaceOfferRequest {
message PlaceOfferReply {
bool result = 1;
}

///////////////////////////////////////////////////////////////////////////////////////////
// StopServer
///////////////////////////////////////////////////////////////////////////////////////////

service StopServer {
rpc StopServer (StopServerRequest) returns (StopServerReply) {
}
}

message StopServerRequest {
}

message StopServerReply {
}

0 comments on commit 4277e62

Please sign in to comment.