Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new key generator "LeafSegmentKeyGenerator" and related functions. #2449

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions sharding-core/sharding-core-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,21 @@
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java7</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.core.strategy.keygen;

import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.api.config.TypeBasedSPIConfiguration;

import java.util.Properties;

/**
* Created by Jason on 2019/4/28.
*/
@Getter
@Setter
public final class LeafConfiguration extends TypeBasedSPIConfiguration {
/**
* Server list of registry center.
*/
private String serverLists;

/**
* Namespace of registry center.
*/
private String namespace="leaf";


/**
* Digest of registry center.
*/
private String digest;

/**
* Operation timeout time in milliseconds.
*/
private int operationTimeoutMilliseconds = 500;

/**
* Max number of times to retry.
*/
private int maxRetries = 3;

/**
* Time interval in milliseconds on each retry.
*/
private int retryIntervalMilliseconds = 500;

/**
* Time to live in seconds of ephemeral keys.
*/
private int timeToLiveSeconds = 60;

public LeafConfiguration(final String type,final String serverLists) {
super(type);
this.serverLists = serverLists;

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.core.strategy.keygen;

import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Created by Jason on 2019/4/28.
*/
public final class LeafCuratorZookeeper {

private CuratorFramework client;

public void init(final LeafConfiguration config) {
client = buildCuratorClient(config);
initCuratorClient(config);
}

private CuratorFramework buildCuratorClient(final LeafConfiguration config) {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(config.getServerLists())
.retryPolicy(new ExponentialBackoffRetry(config.getRetryIntervalMilliseconds(), config.getMaxRetries(), config.getRetryIntervalMilliseconds() * config.getMaxRetries()))
.namespace(config.getNamespace());
if (0 != config.getTimeToLiveSeconds()) {
builder.sessionTimeoutMs(config.getTimeToLiveSeconds() * 1000);
}
if (0 != config.getOperationTimeoutMilliseconds()) {
builder.connectionTimeoutMs(config.getOperationTimeoutMilliseconds());
}
if (!Strings.isNullOrEmpty(config.getDigest())) {
builder.authorization("digest", config.getDigest().getBytes(Charsets.UTF_8))
.aclProvider(new ACLProvider() {

@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}

@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
return builder.build();
}

private void initCuratorClient(final LeafConfiguration config) {
client.start();
try {
if (!client.blockUntilConnected(config.getRetryIntervalMilliseconds() * config.getMaxRetries(), TimeUnit.MILLISECONDS)) {
client.close();
throw new KeeperException.OperationTimeoutException();
}
} catch (final InterruptedException | KeeperException.OperationTimeoutException ex) {
LeafExceptionHandler.handleException(ex);
}
}

public String getDirectly(final String key) {
try {
return new String(client.getData().forPath(key), Charsets.UTF_8);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
LeafExceptionHandler.handleException(ex);
return null;
}
}

public boolean isExisted(final String key) {
try {
return null != client.checkExists().forPath(key);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
LeafExceptionHandler.handleException(ex);
return false;
}
}

public void persist(final String key, final String value) {
try {
if (!isExisted(key)) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
} else {
update(key, value);
}
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
LeafExceptionHandler.handleException(ex);
}
}

public void update(final String key, final String value) {
try {
client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
LeafExceptionHandler.handleException(ex);
}
}

public String getType() {
return "zookeeper";
}

public long incrementCacheId(final String tableName,final long step){
InterProcessMutex lock = new InterProcessMutex(client, tableName);
long result=Long.MIN_VALUE;
boolean lockIsAcquired = tryLock(lock);
if ( lockIsAcquired ) {
result = updateCacheIdInCenter(tableName, step);
tryRelease(lock);
}
return result;
}

private long updateCacheIdInCenter(final String tableName,final long step){
long cacheId = Long.parseLong(getDirectly(tableName));
long result = cacheId+step;
update(tableName, String.valueOf(result));
return result;
}

private boolean tryLock(final InterProcessMutex lock){
try{
return lock.acquire(5,TimeUnit.SECONDS);
}catch (Exception e){
LeafExceptionHandler.handleException(e);
return Boolean.FALSE;
}
}

private void tryRelease(final InterProcessMutex lock){
try{
lock.release();
}catch (Exception e){
LeafExceptionHandler.handleException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.core.strategy.keygen;


/**
* Created by Jason on 2019/4/28.
*/
public final class LeafException extends RuntimeException {

private static final long serialVersionUID = -6417179023552012190L;

public LeafException(final Exception cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.core.strategy.keygen;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.KeeperException;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class LeafExceptionHandler {

/**
* Handle exception.
*
* <p>Ignore interrupt and connection invalid exception.</p>
*
* @param cause to be handled exception
*/
public static void handleException(final Exception cause) {
if (null == cause) {
return;
}
if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
log.debug("Ignored exception for: {}", cause.getMessage());
} else if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
} else {
throw new LeafException(cause);
}
}

private static boolean isIgnoredException(final Throwable cause) {
return cause instanceof LeafException || cause instanceof KeeperException.NoNodeException || cause instanceof KeeperException.NodeExistsException;
}
}
Loading