Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: using "interceptor" to enhance the api(compress) #126

Merged
merged 36 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d6b0d19
init
foreverneverer Aug 25, 2020
9dae4ce
init the api
foreverneverer Aug 25, 2020
a3637aa
add compress interceptor
foreverneverer Aug 26, 2020
cd24eed
fix open
foreverneverer Aug 27, 2020
482e335
move and after
foreverneverer Aug 31, 2020
2edb6a0
move and after
foreverneverer Aug 31, 2020
e7ecefb
move and after
foreverneverer Aug 31, 2020
49ac5fd
move and after
foreverneverer Aug 31, 2020
ef7aa21
merge lastest code
foreverneverer Aug 31, 2020
44eb64a
fix
foreverneverer Sep 1, 2020
e7c98c8
fix
foreverneverer Sep 1, 2020
151dd6a
fix
foreverneverer Sep 1, 2020
e3626d6
merge
foreverneverer Sep 2, 2020
181ed6f
merge
foreverneverer Sep 2, 2020
d8f1daa
merge
foreverneverer Sep 2, 2020
e4d5ba5
merge
foreverneverer Sep 2, 2020
660ac79
merge master
foreverneverer Sep 2, 2020
30e1567
fix
foreverneverer Sep 2, 2020
501a155
fix
foreverneverer Sep 2, 2020
5b4dcc0
fix
foreverneverer Sep 2, 2020
08fd240
fix
foreverneverer Sep 2, 2020
834888e
fix
foreverneverer Sep 2, 2020
3ced11e
add test
foreverneverer Sep 2, 2020
148b584
add test
foreverneverer Sep 2, 2020
4c0998e
add interface
foreverneverer Sep 2, 2020
9850a37
add interface
foreverneverer Sep 2, 2020
3aea6ca
add interface
foreverneverer Sep 2, 2020
ad8140b
add interface
foreverneverer Sep 2, 2020
779a80f
add interface
foreverneverer Sep 2, 2020
7fea1c6
fix options
foreverneverer Sep 3, 2020
e1602fe
fix options
foreverneverer Sep 3, 2020
fa94399
fix options
foreverneverer Sep 3, 2020
88b5e88
fix options
foreverneverer Sep 3, 2020
a098625
fix comment
foreverneverer Sep 4, 2020
2a09369
fix comment
foreverneverer Sep 4, 2020
d9acc5a
Update src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInt…
foreverneverer Sep 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ public long hash(byte[] key) {
}

private PegasusTable getTable(String tableName) throws PException {
return getTable(tableName, 0);
return getTable(tableName, new InternalTableOptions(new PegasusHasher(), new TableOptions()));
}

private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws PException {
private PegasusTable getTable(String tableName, InternalTableOptions internalTableOptions)
throws PException {
PegasusTable table = tableMap.get(tableName);
if (table == null) {
synchronized (tableMapLock) {
table = tableMap.get(tableName);
if (table == null) {
try {
TableOptions options = new TableOptions(new PegasusHasher(), backupRequestDelayMs);
Table internalTable = cluster.openTable(tableName, options);
Table internalTable = cluster.openTable(tableName, internalTableOptions);
table = new PegasusTable(this, internalTable);
} catch (Throwable e) {
throw new PException(e);
Expand Down Expand Up @@ -191,7 +191,17 @@ public PegasusTableInterface openTable(String tableName) throws PException {
@Override
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException {
return getTable(tableName, backupRequestDelayMs);
return getTable(
tableName,
new InternalTableOptions(
new PegasusHasher(),
new TableOptions().withBackupRequestDelayMs(backupRequestDelayMs)));
}

@Override
public PegasusTableInterface openTable(String tableName, TableOptions tableOptions)
throws PException {
return getTable(tableName, new InternalTableOptions(new PegasusHasher(), tableOptions));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public interface PegasusClientInterface {
/**
* Open a table, and prepare the sessions and route-table to the replica-servers.
*
* <p>Note: this interface is deprecated, retaining it only for compatibility, please see {@link
* PegasusClientInterface#openTable(String, TableOptions)}
*
* <p>Please notice that pegasus support two kinds of API: 1. the client-interface way, which is
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
* provided in this class. 2. the table-interface way, which is provided by {@link
* PegasusTableInterface}. With the client-interface, you don't need to create
Expand All @@ -61,9 +64,33 @@ public interface PegasusClientInterface {
* @return the table handler
* @throws PException throws exception if any error occurs.
*/
@Deprecated
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException;

/**
* Open a table, and prepare the sessions and route-table to the replica-servers.
*
* <p>Please notice that pegasus support two kinds of API: 1. the client-interface way, which is
* provided in this class. 2. the table-interface way, which is provided by {@link
* PegasusTableInterface}. With the client-interface, you don't need to create
* PegasusTableInterface by openTable, so you can access the pegasus cluster conveniently.
* However, the client-interface's api also has some restrictions: 1. we don't provide async
* methods in client-interface. 2. the timeout in client-interface isn't as accurate as the
* table-interface. 3. the client-interface may throw an exception when open table fails. It means
* that you may need to handle this exception in every data access operation, which is annoying.
* 4. You can't specify a per-operation timeout. So we recommend you to use the table-interface.
*
* @param tableName the table should be exist on the server, which is created before by the system
* * administrator
* @param tableOptions control the table feature, such as open backup-request, compress and etc,
* see {@link TableOptions}
* @return
* @throws PException
*/
public PegasusTableInterface openTable(String tableName, TableOptions tableOptions)
throws PException;

/**
* Check value exist by key from the cluster
*
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

/** TableOptions is the internal options for opening a Pegasus table. */
public class TableOptions {
private int backupRequestDelayMs;
private boolean enableCompression;

public TableOptions() {
this.backupRequestDelayMs = 0;
this.enableCompression = false;
}

public TableOptions withBackupRequestDelayMs(int backupRequestDelayMs) {
this.backupRequestDelayMs = backupRequestDelayMs;
return this;
}

public TableOptions withCompression(boolean enableCompression) {
this.enableCompression = enableCompression;
return this;
}

public int backupRequestDelayMs() {
return this.backupRequestDelayMs;
}

public boolean enableBackupRequest() {
return backupRequestDelayMs > 0;
}

public boolean enableCompression() {
return enableCompression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_mutate_response get_response() {
return resp;
}

public check_and_mutate_request get_request() {
return request;
}

private check_and_mutate_request request;
private check_and_mutate_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_set_response get_response() {
return resp;
}

public check_and_set_request get_request() {
return request;
}

private check_and_set_request request;
private check_and_set_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public update_response get_response() {
return resp;
}

public multi_put_request get_request() {
return request;
}

private multi_put_request request;
private update_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public update_response get_response() {
return resp;
}

public update_request get_request() {
return request;
}

private update_request request;
private update_response resp;
}
2 changes: 1 addition & 1 deletion src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static Cluster createCluster(ClientOptions clientOptions)

public abstract String[] getMetaList();

public abstract Table openTable(String name, TableOptions options)
public abstract Table openTable(String name, InternalTableOptions options)
throws ReplicationException, TException;

public abstract void close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.xiaomi.infra.pegasus.rpc;

import com.xiaomi.infra.pegasus.client.TableOptions;

public class InternalTableOptions {
private final KeyHasher keyHasher;
private final TableOptions tableOptions;

public InternalTableOptions(KeyHasher keyHasher, TableOptions tableOptions) {
this.keyHasher = keyHasher;
this.tableOptions = tableOptions;
}
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

public KeyHasher keyHasher() {
return keyHasher;
}

public TableOptions tableOptions() {
return tableOptions;
}

public static InternalTableOptions forTest() {
return new InternalTableOptions(KeyHasher.DEFAULT, new TableOptions());
}
}
31 changes: 0 additions & 31 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import com.xiaomi.infra.pegasus.client.ClientOptions;
import com.xiaomi.infra.pegasus.metrics.MetricsManager;
import com.xiaomi.infra.pegasus.rpc.Cluster;
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.TableOptions;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand Down Expand Up @@ -126,8 +126,9 @@ public String[] getMetaList() {
}

@Override
public TableHandler openTable(String name, TableOptions options) throws ReplicationException {
return new TableHandler(this, name, options);
public TableHandler openTable(String name, InternalTableOptions internalTableOptions)
throws ReplicationException {
return new TableHandler(this, name, internalTableOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import com.xiaomi.infra.pegasus.replication.partition_configuration;
import com.xiaomi.infra.pegasus.replication.query_cfg_request;
import com.xiaomi.infra.pegasus.replication.query_cfg_response;
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
import com.xiaomi.infra.pegasus.rpc.TableOptions;
import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManger;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.EventExecutor;
Expand Down Expand Up @@ -53,7 +53,7 @@ static final class TableConfiguration {
int backupRequestDelayMs;
private InterceptorManger interceptorManger;

public TableHandler(ClusterManager mgr, String name, TableOptions options)
public TableHandler(ClusterManager mgr, String name, InternalTableOptions internalTableOptions)
throws ReplicationException {
int i = 0;
for (; i < name.length(); i++) {
Expand Down Expand Up @@ -93,12 +93,12 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options)
// superclass members
tableName_ = name;
appID_ = resp.app_id;
hasher_ = options.keyHasher();
hasher_ = internalTableOptions.keyHasher();

// members of this
manager_ = mgr;
executor_ = manager_.getExecutor();
this.backupRequestDelayMs = options.backupRequestDelayMs();
this.backupRequestDelayMs = internalTableOptions.tableOptions().backupRequestDelayMs();
if (backupRequestDelayMs > 0) {
logger.info("the delay time of backup request is \"{}\"", backupRequestDelayMs);
}
Expand All @@ -109,7 +109,7 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options)
inQuerying_ = new AtomicBoolean(false);
lastQueryTime_ = 0;

this.interceptorManger = new InterceptorManger(options);
this.interceptorManger = new InterceptorManger(internalTableOptions.tableOptions());
}

public ReplicaConfiguration getReplicaConfig(int index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@

public class BackupRequestInterceptor implements TableInterceptor {

private boolean isOpen;

public BackupRequestInterceptor(boolean isOpen) {
this.isOpen = isOpen;
}

@Override
public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
backupCall(clientRequestRound, tableHandler);
Expand All @@ -33,7 +27,7 @@ public void after(
}

private void backupCall(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
if (!isOpen || !clientRequestRound.getOperator().supportBackupRequest()) {
if (!clientRequestRound.getOperator().supportBackupRequest()) {
return;
}

Expand Down
Loading