Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: forbid large-size-value written to pegasus server #95

Merged
merged 21 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ public class ClientOptions {
public static final boolean DEFAULT_ENABLE_PERF_COUNTER = false;
public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = "";
public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10);
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;

private final String metaServers;
private final Duration operationTimeout;
private final int asyncWorkers;
private final boolean enablePerfCounter;
private final String falconPerfCounterTags;
private final Duration falconPushInterval;
private final boolean enableWriteLimit;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
Expand All @@ -52,6 +54,7 @@ protected ClientOptions(Builder builder) {
this.enablePerfCounter = builder.enablePerfCounter;
this.falconPerfCounterTags = builder.falconPerfCounterTags;
this.falconPushInterval = builder.falconPushInterval;
this.enableWriteLimit = builder.enableWriteLimit;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -61,6 +64,7 @@ protected ClientOptions(ClientOptions original) {
this.enablePerfCounter = original.isEnablePerfCounter();
this.falconPerfCounterTags = original.getFalconPerfCounterTags();
this.falconPushInterval = original.getFalconPushInterval();
this.enableWriteLimit = original.isEnableWriteLimit();
}

/**
Expand Down Expand Up @@ -103,7 +107,8 @@ public boolean equals(Object options) {
&& this.asyncWorkers == clientOptions.asyncWorkers
&& this.enablePerfCounter == clientOptions.enablePerfCounter
&& this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags)
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis();
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit;
}
return false;
}
Expand All @@ -125,6 +130,8 @@ public String toString() {
+ '\''
+ ", falconPushInterval(s)="
+ falconPushInterval.getSeconds()
+ ",enableWriteLimit="
+ enableWriteLimit
+ '}';
}

Expand All @@ -136,6 +143,7 @@ public static class Builder {
private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER;
private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS;
private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL;
private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT;

protected Builder() {}

Expand Down Expand Up @@ -213,6 +221,19 @@ public Builder falconPushInterval(Duration falconPushInterval) {
return this;
}

/**
* whether to enable limit write. if true, exceed the threshold set will throw exception, See
* {@linkplain com.xiaomi.infra.pegasus.tools.WriteLimiter WriteLimiter}. Defaults to Defaults
* to {@literal true}, see {@link #DEFAULT_ENABLE_WRITE_LIMIT}
*
* @param enableWriteLimit enableWriteLimit
* @return {@code this}
*/
public Builder enableWriteLimit(boolean enableWriteLimit) {
this.enableWriteLimit = enableWriteLimit;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand All @@ -238,7 +259,8 @@ public ClientOptions.Builder mutate() {
.asyncWorkers(getAsyncWorkers())
.enablePerfCounter(isEnablePerfCounter())
.falconPerfCounterTags(getFalconPerfCounterTags())
.falconPushInterval(getFalconPushInterval());
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isEnableWriteLimit());
return builder;
}

Expand Down Expand Up @@ -298,4 +320,15 @@ public String getFalconPerfCounterTags() {
public Duration getFalconPushInterval() {
return falconPushInterval;
}

/**
* whether to enable write size limit. if true, exceed the threshold set will throw exception, See
* {@linkplain com.xiaomi.infra.pegasus.tools.WriteLimiter WriteLimiter}. Defaults to Defaults to
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
* {@literal true}, See {@link #DEFAULT_ENABLE_WRITE_LIMIT}
*
* @return whether to enable write size limit
*/
public boolean isEnableWriteLimit() {
return enableWriteLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws
Cluster.PEGASUS_OPERATION_TIMEOUT_KEY,
Cluster.PEGASUS_ASYNC_WORKERS_KEY,
Cluster.PEGASUS_ENABLE_PERF_COUNTER_KEY,
Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY
Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY,
Cluster.PEGASUS_ENABLE_WRITE_LIMIT
};

// configPath could be:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static PegasusClientInterface createClient(ClientOptions options) throws
pegasusConfig.setProperty("perf_counter_tags", String.valueOf(options.isEnablePerfCounter()));
pegasusConfig.setProperty(
"push_counter_interval_secs", String.valueOf(options.getFalconPushInterval().getSeconds()));
pegasusConfig.setProperty("enable_write_limit", String.valueOf(options.isEnableWriteLimit()));
return new PegasusClient(pegasusConfig);
}

Expand Down
31 changes: 31 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration;
import com.xiaomi.infra.pegasus.tools.Tools;
import com.xiaomi.infra.pegasus.tools.WriteLimiter;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import java.net.UnknownHostException;
Expand All @@ -32,10 +33,12 @@
public class PegasusTable implements PegasusTableInterface {
private Table table;
private int defaultTimeout;
private WriteLimiter writeLimiter;

public PegasusTable(PegasusClient client, Table table) {
this.table = table;
this.defaultTimeout = table.getDefaultTimeout();
this.writeLimiter = new WriteLimiter(table.isEnableWriteLimit());
}

@Override
Expand Down Expand Up @@ -135,6 +138,13 @@ public Future<Void> asyncSet(
return promise;
}

try {
writeLimiter.validateSingleSet(hashKey, sortKey, value);
} catch (Exception e) {
promise.setFailure(new PException("Exceed write limit threshold:" + e.getMessage()));
return promise;
}

blob k = new blob(PegasusClient.generateKey(hashKey, sortKey));
blob v = new blob(value);
int expireSeconds = (ttlSeconds == 0 ? 0 : ttlSeconds + (int) Tools.epoch_now());
Expand Down Expand Up @@ -407,6 +417,13 @@ public Future<Void> asyncMultiSet(
return promise;
}

try {
writeLimiter.validateMultiSet(hashKey, values);
} catch (Exception e) {
promise.setFailure(new PException("Exceed write limit threshold:" + e.getMessage()));
return promise;
}

blob hash_key_blob = new blob(hashKey);
List<key_value> values_blob = new ArrayList<key_value>();
for (int i = 0; i < values.size(); i++) {
Expand Down Expand Up @@ -604,6 +621,13 @@ public Future<CheckAndSetResult> asyncCheckAndSet(
return promise;
}

try {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
writeLimiter.validateCheckAndSet(hashKey, setSortKey, setValue);
} catch (Exception e) {
promise.setFailure(new PException("Exceed write limit threshold:" + e.getMessage()));
return promise;
}

blob hashKeyBlob = new blob(hashKey);
blob checkSortKeyBlob = (checkSortKey == null ? null : new blob(checkSortKey));
cas_check_type type = cas_check_type.findByValue(checkType.getValue());
Expand Down Expand Up @@ -702,6 +726,13 @@ public Future<CheckAndMutateResult> asyncCheckAndMutate(
new PException("Invalid parameter: mutations should not be null or empty"));
}

try {
writeLimiter.validateCheckAndMutate(hashKey, mutations);
} catch (Exception e) {
promise.setFailure(new PException("Exceed write limit threshold:" + e.getMessage()));
return promise;
}

blob hashKeyBlob = new blob(hashKey);
blob checkSortKeyBlob = (checkSortKey == null ? null : new blob(checkSortKey));
cas_check_type type = cas_check_type.findByValue(checkType.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,22 @@
import org.apache.thrift.protocol.TProtocol;

public abstract class client_operator {
public client_operator(gpid gpid, String tableName, boolean enableBackupRequest) {

public client_operator(
gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) {
this.header = new ThriftHeader();
this.meta = new request_meta();
this.meta.setApp_id(gpid.get_app_id());
this.meta.setPartition_index(gpid.get_pidx());
this.meta.setPartition_hash(partitionHash);
this.pid = gpid;
this.tableName = tableName;
this.rpc_error = new error_code();
this.enableBackupRequest = enableBackupRequest;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}

public client_operator(
gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) {
this(gpid, tableName, enableBackupRequest);
this.meta.setPartition_hash(partitionHash);
}

public client_operator(gpid gpid, String tableName, long partitionHash) {
this(gpid, tableName, false);
this.meta.setPartition_hash(partitionHash);
this(gpid, tableName, partitionHash, false);
}

public final byte[] prepare_thrift_header(int meta_length, int body_length) {
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public abstract class Cluster {
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF = "60";

public static final String PEGASUS_ENABLE_WRITE_LIMIT = "enable_write_limit";
public static final String PEGASUS_ENABLE_WRITE_LIMIT_DEF = "true";

public static Cluster createCluster(Properties config) throws IllegalArgumentException {
int operatorTimeout =
Integer.parseInt(
Expand Down Expand Up @@ -55,17 +58,25 @@ public static Cluster createCluster(Properties config) throws IllegalArgumentExc
Integer.parseInt(
config.getProperty(
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF));

boolean enableWriteLimit =
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
Boolean.parseBoolean(
config.getProperty(PEGASUS_ENABLE_WRITE_LIMIT, PEGASUS_ENABLE_WRITE_LIMIT_DEF));

return new ClusterManager(
operatorTimeout,
asyncWorkers,
enablePerfCounter,
perfCounterTags,
pushIntervalSecs,
address);
address,
enableWriteLimit);
}

public abstract String[] getMetaList();

public abstract boolean isEnableWriteLimit();
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

public abstract Table openTable(String name, KeyHasher function, int backupRequestDelayMs)
throws ReplicationException, TException;

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public final <T> DefaultPromise<T> newPromise() {

public abstract int getPartitionCount();

public abstract boolean isEnableWriteLimit();
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

public abstract void operate(client_operator op, int timeoutMs) throws ReplicationException;

public abstract void asyncOperate(client_operator op, ClientOPCallback callback, int timeoutMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ClusterManager extends Cluster {
private int operationTimeout;
private int retryDelay;
private boolean enableCounter;
private boolean enableWriteLimit;

private ConcurrentHashMap<rpc_address, ReplicaSession> replicaSessions;
private EventLoopGroup metaGroup; // group used for handle meta logic
Expand All @@ -49,7 +50,8 @@ public ClusterManager(
boolean enableCounter,
String perfCounterTags,
int pushIntervalSecs,
String[] address_list)
String[] address_list,
boolean enableWriteLimit)
throws IllegalArgumentException {
setTimeout(timeout);
this.enableCounter = enableCounter;
Expand All @@ -63,6 +65,7 @@ public ClusterManager(
tableGroup = getEventLoopGroupInstance(1);

metaList = address_list;
this.enableWriteLimit = enableWriteLimit;
// the constructor of meta session is depend on the replicaSessions,
// so the replicaSessions should be initialized earlier
metaSession = new MetaSession(this, address_list, timeout, 10, metaGroup);
Expand Down Expand Up @@ -115,6 +118,10 @@ public void setTimeout(int t) {
retryDelay = (t < 3 ? 1 : t / 3);
}

public boolean isEnableWriteLimit() {
return enableWriteLimit;
}

public static EventLoopGroup getEventLoopGroupInstance(int threadsCount) {
logger.debug("create nio eventloop group");
return new NioEventLoopGroup(threadsCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ static final class TableConfiguration {
AtomicBoolean inQuerying_;
long lastQueryTime_;
int backupRequestDelayMs;
private boolean enableWriteLimit;

public TableHandler(ClusterManager mgr, String name, KeyHasher h, int backupRequestDelayMs)
throws ReplicationException {
Expand Down Expand Up @@ -95,6 +96,7 @@ public TableHandler(ClusterManager mgr, String name, KeyHasher h, int backupRequ
manager_ = mgr;
executor_ = manager_.getExecutor(name, 1);
this.backupRequestDelayMs = backupRequestDelayMs;
this.enableWriteLimit = manager_.isEnableWriteLimit();
if (backupRequestDelayMs > 0) {
logger.info("the delay time of backup request is \"{}\"", backupRequestDelayMs);
}
Expand Down Expand Up @@ -474,6 +476,7 @@ public void asyncOperate(client_operator op, ClientOPCallback callback, int time

ClientRequestRound round =
new ClientRequestRound(op, callback, manager_.counterEnabled(), (long) timeoutMs);

call(round, 1);
}

Expand Down Expand Up @@ -502,4 +505,8 @@ private void handleMetaException(error_types err_type, ClusterManager mgr, Strin
private boolean isBackupRequestEnabled() {
return backupRequestDelayMs > 0;
}

public boolean isEnableWriteLimit() {
return enableWriteLimit;
}
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}
Loading