getAclForPath(final String path) {
+ return ZooDefs.Ids.CREATOR_ALL_ACL;
+ }
+ });
+ }
+ return builder.build();
+ }
+
+ private void initCuratorClient(final LeafConfiguration config) {
+ client.start();
+ try {
+ if (!client.blockUntilConnected(config.getRetryIntervalMilliseconds() * config.getMaxRetries(), TimeUnit.MILLISECONDS)) {
+ client.close();
+ throw new KeeperException.OperationTimeoutException();
+ }
+ } catch (final InterruptedException | KeeperException.OperationTimeoutException ex) {
+ LeafExceptionHandler.handleException(ex);
+ }
+ }
+
+ public String getDirectly(final String key) {
+ try {
+ return new String(client.getData().forPath(key), Charsets.UTF_8);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ LeafExceptionHandler.handleException(ex);
+ return null;
+ }
+ }
+
+ public boolean isExisted(final String key) {
+ try {
+ return null != client.checkExists().forPath(key);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ LeafExceptionHandler.handleException(ex);
+ return false;
+ }
+ }
+
+ public void persist(final String key, final String value) {
+ try {
+ if (!isExisted(key)) {
+ client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
+ } else {
+ update(key, value);
+ }
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ LeafExceptionHandler.handleException(ex);
+ }
+ }
+
+ public void update(final String key, final String value) {
+ try {
+ client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ LeafExceptionHandler.handleException(ex);
+ }
+ }
+
+ public String getType() {
+ return "zookeeper";
+ }
+
+ public long incrementCacheId(final String tableName,final long step){
+ InterProcessMutex lock = new InterProcessMutex(client, tableName);
+ long result=Long.MIN_VALUE;
+ boolean lockIsAcquired = tryLock(lock);
+ if ( lockIsAcquired ) {
+ result = updateCacheIdInCenter(tableName, step);
+ tryRelease(lock);
+ }
+ return result;
+ }
+
+ private long updateCacheIdInCenter(final String tableName,final long step){
+ long cacheId = Long.parseLong(getDirectly(tableName));
+ long result = cacheId+step;
+ update(tableName, String.valueOf(result));
+ return result;
+ }
+
+ private boolean tryLock(final InterProcessMutex lock){
+ try{
+ return lock.acquire(5,TimeUnit.SECONDS);
+ }catch (Exception e){
+ LeafExceptionHandler.handleException(e);
+ return Boolean.FALSE;
+ }
+ }
+
+ private void tryRelease(final InterProcessMutex lock){
+ try{
+ lock.release();
+ }catch (Exception e){
+ LeafExceptionHandler.handleException(e);
+ }
+ }
+}
diff --git a/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafException.java b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafException.java
new file mode 100644
index 0000000000000..0f8bfe748db87
--- /dev/null
+++ b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.core.strategy.keygen;
+
+
+/**
+ * Created by Jason on 2019/4/28.
+ */
+public final class LeafException extends RuntimeException {
+
+ private static final long serialVersionUID = -6417179023552012190L;
+
+ public LeafException(final Exception cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafExceptionHandler.java b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafExceptionHandler.java
new file mode 100644
index 0000000000000..42bea46bd7f8c
--- /dev/null
+++ b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafExceptionHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.core.strategy.keygen;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.zookeeper.KeeperException;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
+public final class LeafExceptionHandler {
+
+ /**
+ * Handle exception.
+ *
+ * Ignore interrupt and connection invalid exception.
+ *
+ * @param cause to be handled exception
+ */
+ public static void handleException(final Exception cause) {
+ if (null == cause) {
+ return;
+ }
+ if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
+ log.debug("Ignored exception for: {}", cause.getMessage());
+ } else if (cause instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ } else {
+ throw new LeafException(cause);
+ }
+ }
+
+ private static boolean isIgnoredException(final Throwable cause) {
+ return cause instanceof LeafException || cause instanceof KeeperException.NoNodeException || cause instanceof KeeperException.NodeExistsException;
+ }
+}
\ No newline at end of file
diff --git a/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGenerator.java b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGenerator.java
new file mode 100644
index 0000000000000..fad79f2ea9ae9
--- /dev/null
+++ b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGenerator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.core.strategy.keygen;
+
+import com.google.common.base.Preconditions;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+
+/**
+ * Created by Jason on 2019/4/28.
+ */
+public class LeafSegmentKeyGenerator implements ShardingKeyGenerator {
+
+ private LeafCuratorZookeeper leafCuratorZookeeper;
+
+ @Getter
+ @Setter
+ private Properties properties = new Properties();
+
+ private long id;
+
+ private static final String TYPE = "LEAFSEGMENT";
+
+ private ExecutorService incrementCacheIdExecutor;
+
+ private boolean isInitialized = Boolean.FALSE;
+
+ private SynchronousQueue cacheIdQueue;
+
+ private static final float THRESHOLD=0.5F;
+
+ private long step;
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public synchronized Comparable> generateKey() {
+ return null;
+ }
+
+ public synchronized Comparable> generateKey(final String tableName){
+ if (isInitialized == Boolean.FALSE) {
+ initLeafSegmentKeyGenerator(tableName);
+ isInitialized = Boolean.TRUE;
+ return id;
+ }
+ id = generateKeyWhenTableStoredInCenter(tableName);
+ return id;
+ }
+
+ private void initLeafSegmentKeyGenerator(final String tableName){
+ LeafConfiguration leafConfiguration = new LeafConfiguration(TYPE,getServerList());
+ leafCuratorZookeeper = new LeafCuratorZookeeper();
+ leafCuratorZookeeper.init(leafConfiguration);
+ if(leafCuratorZookeeper.isExisted(tableName)){
+ id = leafCuratorZookeeper.incrementCacheId(tableName,getStep());
+ }else{
+ id = getInitialValue();
+ leafCuratorZookeeper.persist(tableName,String.valueOf(id));
+ }
+ incrementCacheIdExecutor = Executors.newSingleThreadExecutor();
+ cacheIdQueue = new SynchronousQueue();
+ step = getStep();
+ }
+
+ private long generateKeyWhenTableStoredInCenter(final String tableName){
+ ++id;
+ if(((id%step) >= (step*THRESHOLD-1)) && cacheIdQueue.isEmpty()){
+ incrementCacheIdAsynchronous(tableName,step);
+ }
+ if((id%step) == (step-1)){
+ id = tryTakeCacheId();
+ }
+ return id;
+ }
+
+ private void incrementCacheIdAsynchronous(final String tableName,final long step){
+ incrementCacheIdExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ long id = leafCuratorZookeeper.incrementCacheId(tableName,step);
+ tryPutCacheId(id);
+ }
+ });
+ }
+
+ private long tryTakeCacheId(){
+ long id = Long.MIN_VALUE;
+ try{
+ id = cacheIdQueue.take();
+ }catch (Exception e){
+ LeafExceptionHandler.handleException(e);
+ }
+ return id;
+ }
+
+ private void tryPutCacheId(long id){
+ try{
+ cacheIdQueue.put(id);
+ }catch (Exception e){
+ LeafExceptionHandler.handleException(e);
+ }
+ }
+
+ private long getStep(){
+ long result = Long.parseLong(properties.getProperty("step"));
+ Preconditions.checkArgument(result >= 0L && result < Long.MAX_VALUE);
+ return result;
+ }
+
+ private long getInitialValue(){
+ long result = Long.parseLong(properties.getProperty("initialValue"));
+ Preconditions.checkArgument(result >= 0L && result < Long.MAX_VALUE);
+ return result;
+ }
+
+ private String getServerList(){
+ String result = (String)properties.get("serverList");
+ String pattern = "(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\." +
+ "(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d" +
+ "|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]" +
+ "\\d|25[0-5]):(6[0-4]\\d{4}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5]" +
+ "|[1-5]\\d{4}|[1-9]\\d{3}|[1-9]\\d{2}|[1-9]\\d|[0-9])((,(\\d|[1-9]\\d" +
+ "|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])" +
+ "\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]" +
+ "\\d|25[0-5]):(6[0-4]\\d{4}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5]|[1-5]\\d{4}" +
+ "|[1-9]\\d{3}|[1-9]\\d{2}|[1-9]\\d|[0-9]))*)";
+ Preconditions.checkArgument(result.matches(pattern));
+ return result;
+ }
+}
\ No newline at end of file
diff --git a/sharding-core/sharding-core-common/src/main/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator b/sharding-core/sharding-core-common/src/main/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator
index 2e13c5be45da4..220fe6a41ece0 100644
--- a/sharding-core/sharding-core-common/src/main/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator
+++ b/sharding-core/sharding-core-common/src/main/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator
@@ -15,5 +15,6 @@
# limitations under the License.
#
+org.apache.shardingsphere.core.strategy.keygen.LeafSegmentKeyGenerator
org.apache.shardingsphere.core.strategy.keygen.SnowflakeShardingKeyGenerator
org.apache.shardingsphere.core.strategy.keygen.UUIDShardingKeyGenerator
diff --git a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java
new file mode 100644
index 0000000000000..19f541c5fde5b
--- /dev/null
+++ b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.core.strategy.keygen;
+
+import lombok.SneakyThrows;
+import org.apache.curator.test.TestingServer;
+import org.apache.shardingsphere.core.strategy.keygen.fixture.FixedTimeService;
+import org.junit.*;
+
+import java.lang.reflect.Method;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class LeafSegmentKeyGeneratorTest {
+ private LeafSegmentKeyGenerator leafSegmentKeyGenerator = new LeafSegmentKeyGenerator();
+
+ private static TestingServer server;
+
+
+ @Test
+ public void assertGetProperties() {
+ assertThat(leafSegmentKeyGenerator.getProperties().entrySet().size(), is(0));
+ }
+
+ @Test
+ public void assertSetProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("key1", "value1");
+ leafSegmentKeyGenerator.setProperties(properties);
+ assertThat(leafSegmentKeyGenerator.getProperties().get("key1"), is((Object) "value1"));
+ }
+
+ @Test
+ public void assertGenerateKeyWithSingleThread(){
+ Properties properties = new Properties();
+ properties.setProperty("serverList","127.0.0.1:2181");
+ properties.setProperty("initialValue","100001");
+ properties.setProperty("step","3");
+ leafSegmentKeyGenerator.setProperties(properties);
+ String tableName="/test_table_name1";
+ List> expected = Arrays.>asList(100001L,100002L,100003L,100004L,100005L,100006L,100007L,100008L,100009L,100010L);
+ List> actual = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ actual.add(leafSegmentKeyGenerator.generateKey(tableName));
+ }
+ assertThat(actual, is(expected));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertGenerateKeyWithMultipleThreads() {
+ int threadNumber = Runtime.getRuntime().availableProcessors() << 1;
+ ExecutorService executor = Executors.newFixedThreadPool(threadNumber);
+ int taskNumber = threadNumber << 2;
+ Properties properties = new Properties();
+ properties.setProperty("serverList","127.0.0.1:2181");
+ properties.setProperty("initialValue","100001");
+ properties.setProperty("step","3");
+ leafSegmentKeyGenerator.setProperties(properties);
+ final String tableName="/test_table_name2";
+ Set> actual = new HashSet<>();
+ for (int i = 0; i < taskNumber; i++) {
+ actual.add(executor.submit(new Callable>() {
+
+ @Override
+ public Comparable> call() {
+ return leafSegmentKeyGenerator.generateKey(tableName);
+ }
+ }).get());
+ }
+ assertThat(actual.size(), is(taskNumber));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertSetStepFailureWhenNegative() throws Exception{
+ Properties properties = new Properties();
+ properties.setProperty("serverList","127.0.0.1:2181");
+ properties.setProperty("initialValue","100001");
+ properties.setProperty("step", String.valueOf(-1L));
+ leafSegmentKeyGenerator.setProperties(properties);
+ final String tableName="/test_table_name3";
+ leafSegmentKeyGenerator.generateKey(tableName);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertSetInitialValueFailureWhenNegative() {
+ Properties properties = new Properties();
+ properties.setProperty("serverList","127.0.0.1:2181");
+ properties.setProperty("step","3");
+ properties.setProperty("initialValue", String.valueOf(-1L));
+ leafSegmentKeyGenerator.setProperties(properties);
+ final String tableName="/test_table_name4";
+ leafSegmentKeyGenerator.generateKey(tableName);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertSetServerListFailureWhenPortIllegal() {
+ Properties properties = new Properties();
+ properties.setProperty("initialValue","100001");
+ properties.setProperty("step","3");
+ properties.setProperty("serverList", "192.168.123:90999");
+ leafSegmentKeyGenerator.setProperties(properties);
+ final String tableName="/test_table_name5";
+ leafSegmentKeyGenerator.generateKey(tableName);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertSetServerListFailureWhenIpIllegal() {
+ Properties properties = new Properties();
+ properties.setProperty("initialValue","100001");
+ properties.setProperty("step","3");
+ properties.setProperty("serverList", "267.168.123:8088");
+ leafSegmentKeyGenerator.setProperties(properties);
+ final String tableName="/test_table_name6";
+ leafSegmentKeyGenerator.generateKey(tableName);
+ }
+
+ @BeforeClass
+ @SneakyThrows
+ public static void startServer(){
+ server = new TestingServer(2181,true);
+ }
+ @AfterClass
+ @SneakyThrows
+ public static void closeServer(){
+ server.close();
+ }
+}
diff --git a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/ShardingKeyGeneratorServiceLoaderTest.java b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/ShardingKeyGeneratorServiceLoaderTest.java
index a5dfe70944635..5575828b14dbe 100644
--- a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/ShardingKeyGeneratorServiceLoaderTest.java
+++ b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/ShardingKeyGeneratorServiceLoaderTest.java
@@ -28,7 +28,12 @@
public final class ShardingKeyGeneratorServiceLoaderTest {
private ShardingKeyGeneratorServiceLoader serviceLoader = new ShardingKeyGeneratorServiceLoader();
-
+
+ @Test
+ public void assertNewLeafSegmentKeyGenerator() {
+ assertThat(serviceLoader.newService("LEAFSEGMENT", new Properties()), instanceOf(LeafSegmentKeyGenerator.class));
+ }
+
@Test
public void assertNewSnowflakeKeyGenerator() {
assertThat(serviceLoader.newService("SNOWFLAKE", new Properties()), instanceOf(SnowflakeShardingKeyGenerator.class));
diff --git a/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator b/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator
index 9f3aaf5bd437b..b3d62644bef28 100644
--- a/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator
+++ b/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator
@@ -18,3 +18,4 @@
org.apache.shardingsphere.core.strategy.keygen.SnowflakeShardingKeyGenerator
org.apache.shardingsphere.core.strategy.keygen.UUIDShardingKeyGenerator
org.apache.shardingsphere.core.strategy.keygen.fixture.IncrementShardingKeyGenerator
+org.apache.shardingsphere.core.strategy.keygen.LeafSegmentKeyGenerator
\ No newline at end of file
diff --git a/sharding-orchestration/sharding-orchestration-core/pom.xml b/sharding-orchestration/sharding-orchestration-core/pom.xml
index f07c68028b75e..f86e0adafd23e 100644
--- a/sharding-orchestration/sharding-orchestration-core/pom.xml
+++ b/sharding-orchestration/sharding-orchestration-core/pom.xml
@@ -42,5 +42,11 @@
org.apache.commons
commons-dbcp2
+
+ org.apache.shardingsphere
+ sharding-orchestration-reg-zookeeper-curator
+ 4.0.0-RC2-SNAPSHOT
+ compile
+
diff --git a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafException.java b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafException.java
new file mode 100644
index 0000000000000..7801bbcb297ed
--- /dev/null
+++ b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.orchestration.internal.keygen;
+
+
+/**
+ * Created by Jason on 2019/4/28.
+ */
+public final class LeafException extends RuntimeException {
+
+ private static final long serialVersionUID = -6417179023552012190L;
+
+ public LeafException(final Exception cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafExceptionHandler.java b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafExceptionHandler.java
new file mode 100644
index 0000000000000..3d2063c3ca6a3
--- /dev/null
+++ b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafExceptionHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.orchestration.internal.keygen;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.zookeeper.KeeperException;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
+public final class LeafExceptionHandler {
+
+ /**
+ * Handle exception.
+ *
+ * Ignore interrupt and connection invalid exception.
+ *
+ * @param cause to be handled exception
+ */
+ public static void handleException(final Exception cause) {
+ if (null == cause) {
+ return;
+ }
+ if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) {
+ log.debug("Ignored exception for: {}", cause.getMessage());
+ } else if (cause instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ } else {
+ throw new LeafException(cause);
+ }
+ }
+
+ private static boolean isIgnoredException(final Throwable cause) {
+ return cause instanceof LeafException || cause instanceof KeeperException.NoNodeException || cause instanceof KeeperException.NodeExistsException;
+ }
+}
\ No newline at end of file
diff --git a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java
new file mode 100644
index 0000000000000..1ec2070f13707
--- /dev/null
+++ b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.orchestration.internal.keygen;
+
+import com.google.common.base.Preconditions;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration;
+import org.apache.shardingsphere.orchestration.reg.zookeeper.curator.CuratorZookeeperRegistryCenter;
+import org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+
+/**
+ * Created by Jason on 2019/4/28.
+ */
+public class LeafSegmentKeyGenerator implements ShardingKeyGenerator {
+
+ private CuratorZookeeperRegistryCenter leafCuratorZookeeper;
+
+ @Getter
+ @Setter
+ private Properties properties = new Properties();
+
+ private long id;
+
+ private static final String TYPE = "LEAFSEGMENT";
+
+ private static final String NAMESPACE = "leaf_segment";
+
+ private ExecutorService incrementCacheIdExecutor;
+
+ private boolean isInitialized = Boolean.FALSE;
+
+ private SynchronousQueue cacheIdQueue;
+
+ private static final float THRESHOLD=0.5F;
+
+ private long step;
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public synchronized Comparable> generateKey() {
+ String leafKey = (String)properties.get("leaf.key");
+ if (isInitialized == Boolean.FALSE) {
+ initLeafSegmentKeyGenerator(leafKey);
+ isInitialized = Boolean.TRUE;
+ return id;
+ }
+ id = generateKeyWhenLeafKeyStoredInCenter(leafKey);
+ return id;
+ }
+
+ private void initLeafSegmentKeyGenerator(final String leafKey){
+ RegistryCenterConfiguration leafConfiguration = new RegistryCenterConfiguration(TYPE,properties);
+ leafConfiguration.setNamespace(NAMESPACE);
+ leafConfiguration.setServerLists(getServerList());
+ leafConfiguration.setDigest(getDigest());
+ leafCuratorZookeeper = new CuratorZookeeperRegistryCenter ();
+ leafCuratorZookeeper.init(leafConfiguration);
+ if(leafCuratorZookeeper.isExisted(leafKey)){
+ id = incrementCacheId(leafKey,getStep());
+ }else{
+ id = getInitialValue();
+ leafCuratorZookeeper.persist(leafKey,String.valueOf(id));
+ }
+ incrementCacheIdExecutor = Executors.newSingleThreadExecutor();
+ cacheIdQueue = new SynchronousQueue<>();
+ step = getStep();
+ }
+
+ private long generateKeyWhenLeafKeyStoredInCenter(final String leafKey){
+ ++id;
+ if(((id%step) >= (step*THRESHOLD-1)) && cacheIdQueue.isEmpty()){
+ incrementCacheIdAsynchronous(leafKey,step);
+ }
+ if((id%step) == (step-1)){
+ id = tryTakeCacheId();
+ }
+ return id;
+ }
+
+ private void incrementCacheIdAsynchronous(final String leafKey,final long step){
+ incrementCacheIdExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ long id = incrementCacheId(leafKey,step);
+ tryPutCacheId(id);
+ }
+ });
+ }
+
+ private long tryTakeCacheId(){
+ long id = Long.MIN_VALUE;
+ try{
+ id = cacheIdQueue.take();
+ }catch (Exception ex){
+ Thread.currentThread().interrupt();
+ }
+ return id;
+ }
+
+ private void tryPutCacheId(long id){
+ try{
+ cacheIdQueue.put(id);
+ }catch (Exception ex){
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private long getStep(){
+ long result = Long.parseLong(properties.getProperty("step"));
+ Preconditions.checkArgument(result >= 0L && result < Long.MAX_VALUE);
+ return result;
+ }
+
+ private long getInitialValue(){
+ long result = Long.parseLong(properties.getProperty("initialValue"));
+ Preconditions.checkArgument(result >= 0L && result < Long.MAX_VALUE);
+ return result;
+ }
+
+ private String getServerList(){
+ String result = (String)properties.get("serverList");
+ String pattern = "(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\." +
+ "(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d" +
+ "|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]" +
+ "\\d|25[0-5]):(6[0-4]\\d{4}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5]" +
+ "|[1-5]\\d{4}|[1-9]\\d{3}|[1-9]\\d{2}|[1-9]\\d|[0-9])((,(\\d|[1-9]\\d" +
+ "|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])" +
+ "\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]" +
+ "\\d|25[0-5]):(6[0-4]\\d{4}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5]|[1-5]\\d{4}" +
+ "|[1-9]\\d{3}|[1-9]\\d{2}|[1-9]\\d|[0-9]))*)";
+ Preconditions.checkArgument(result.matches(pattern));
+ return result;
+ }
+
+ private String getDigest(){
+ String result = (String)properties.get("digest");
+ String pattern = "\\w+:\\w+|";
+ Preconditions.checkArgument(result.matches(pattern));
+ return result;
+ }
+
+ private long incrementCacheId(final String leafKey,final long step){
+ InterProcessMutex lock = leafCuratorZookeeper.initLock(leafKey);
+ long result=Long.MIN_VALUE;
+ boolean lockIsAcquired = leafCuratorZookeeper.tryLock(lock);
+ if ( lockIsAcquired ) {
+ result = updateCacheIdInCenter(leafKey, step);
+ leafCuratorZookeeper.tryRelease(lock);
+ }
+ return result;
+ }
+
+ private long updateCacheIdInCenter(final String leafKey,final long step){
+ long cacheId = Long.parseLong(leafCuratorZookeeper.getDirectly(leafKey));
+ long result = cacheId+step;
+ leafCuratorZookeeper.update(leafKey, String.valueOf(result));
+ return result;
+ }
+
+}
\ No newline at end of file
diff --git a/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java b/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java
new file mode 100644
index 0000000000000..1919c58c0c7cf
--- /dev/null
+++ b/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.orchestration.internal.keygen;
+
+import lombok.SneakyThrows;
+import org.apache.curator.test.TestingServer;
+import org.apache.shardingsphere.core.strategy.keygen.LeafSegmentKeyGenerator;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class LeafSegmentKeyGeneratorTest {
+ private LeafSegmentKeyGenerator leafSegmentKeyGenerator = new LeafSegmentKeyGenerator();
+
+ private static TestingServer server;
+
+
+ @Test
+ public void assertGetProperties() {
+ assertThat(leafSegmentKeyGenerator.getProperties().entrySet().size(), is(0));
+ }
+
+ @Test
+ public void assertSetProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("key1", "value1");
+ leafSegmentKeyGenerator.setProperties(properties);
+ assertThat(leafSegmentKeyGenerator.getProperties().get("key1"), is((Object) "value1"));
+ }
+
+ @Test
+ public void assertGenerateKeyWithSingleThread(){
+ Properties properties = new Properties();
+ properties.setProperty("serverList","127.0.0.1:2181");
+ properties.setProperty("initialValue","100001");
+ properties.setProperty("step","3");
+ leafSegmentKeyGenerator.setProperties(properties);
+ String tableName="/test_table_name1";
+ List> expected = Arrays.>asList(100001L,100002L,100003L,100004L,100005L,100006L,100007L,100008L,100009L,100010L);
+ List> actual = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ actual.add(leafSegmentKeyGenerator.generateKey(tableName));
+ }
+ assertThat(actual, is(expected));
+ }
+
+ @Test
+ @SneakyThrows
+ public void assertGenerateKeyWithMultipleThreads() {
+ int threadNumber = Runtime.getRuntime().availableProcessors() << 1;
+ ExecutorService executor = Executors.newFixedThreadPool(threadNumber);
+ int taskNumber = threadNumber << 2;
+ Properties properties = new Properties();
+ properties.setProperty("serverList","127.0.0.1:2181");
+ properties.setProperty("initialValue","100001");
+ properties.setProperty("step","3");
+ leafSegmentKeyGenerator.setProperties(properties);
+ final String tableName="/test_table_name2";
+ Set> actual = new HashSet<>();
+ for (int i = 0; i < taskNumber; i++) {
+ actual.add(executor.submit(new Callable>() {
+
+ @Override
+ public Comparable> call() {
+ return leafSegmentKeyGenerator.generateKey(tableName);
+ }
+ }).get());
+ }
+ assertThat(actual.size(), is(taskNumber));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertSetStepFailureWhenNegative() throws Exception{
+ Properties properties = new Properties();
+ properties.setProperty("serverList","127.0.0.1:2181");
+ properties.setProperty("initialValue","100001");
+ properties.setProperty("step", String.valueOf(-1L));
+ leafSegmentKeyGenerator.setProperties(properties);
+ final String tableName="/test_table_name3";
+ leafSegmentKeyGenerator.generateKey(tableName);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertSetInitialValueFailureWhenNegative() {
+ Properties properties = new Properties();
+ properties.setProperty("serverList","127.0.0.1:2181");
+ properties.setProperty("step","3");
+ properties.setProperty("initialValue", String.valueOf(-1L));
+ leafSegmentKeyGenerator.setProperties(properties);
+ final String tableName="/test_table_name4";
+ leafSegmentKeyGenerator.generateKey(tableName);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertSetServerListFailureWhenPortIllegal() {
+ Properties properties = new Properties();
+ properties.setProperty("initialValue","100001");
+ properties.setProperty("step","3");
+ properties.setProperty("serverList", "192.168.123:90999");
+ leafSegmentKeyGenerator.setProperties(properties);
+ final String tableName="/test_table_name5";
+ leafSegmentKeyGenerator.generateKey(tableName);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void assertSetServerListFailureWhenIpIllegal() {
+ Properties properties = new Properties();
+ properties.setProperty("initialValue","100001");
+ properties.setProperty("step","3");
+ properties.setProperty("serverList", "267.168.123:8088");
+ leafSegmentKeyGenerator.setProperties(properties);
+ final String tableName="/test_table_name6";
+ leafSegmentKeyGenerator.generateKey(tableName);
+ }
+
+ @BeforeClass
+ @SneakyThrows
+ public static void startServer(){
+ server = new TestingServer(2181,true);
+ }
+
+ @AfterClass
+ @SneakyThrows
+ public static void closeServer(){
+ server.close();
+ }
+}
diff --git a/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java b/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java
index 95538b1e098bb..289f48fba4aef 100644
--- a/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java
+++ b/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java
@@ -28,6 +28,7 @@
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter;
@@ -290,4 +291,27 @@ private void waitForCacheClose() {
public String getType() {
return "zookeeper";
}
+
+ public InterProcessMutex initLock(String key){
+ InterProcessMutex lock = new InterProcessMutex(client, key);
+ return lock;
+ }
+
+
+ public boolean tryLock(final InterProcessMutex lock){
+ try{
+ return lock.acquire(5,TimeUnit.SECONDS);
+ }catch (Exception ex){
+ CuratorZookeeperExceptionHandler.handleException(ex);
+ return Boolean.FALSE;
+ }
+ }
+
+ public void tryRelease(final InterProcessMutex lock){
+ try{
+ lock.release();
+ }catch (Exception ex){
+ CuratorZookeeperExceptionHandler.handleException(ex);
+ }
+ }
}