From 36a9c653979d61782903d74eac0523fd1009ef34 Mon Sep 17 00:00:00 2001 From: guangyuan Date: Sat, 25 May 2019 21:11:13 +0800 Subject: [PATCH 1/6] Add LeafSegmentKeyGenerator related functions. --- sharding-core/sharding-core-common/pom.xml | 16 ++ .../strategy/keygen/LeafConfiguration.java | 74 ++++++++ .../strategy/keygen/LeafCuratorZookeeper.java | 172 ++++++++++++++++++ .../core/strategy/keygen/LeafException.java | 31 ++++ .../strategy/keygen/LeafExceptionHandler.java | 52 ++++++ .../keygen/LeafSegmentKeyGenerator.java | 155 ++++++++++++++++ ...dingsphere.spi.keygen.ShardingKeyGenerator | 1 + .../keygen/LeafSegmentKeyGeneratorTest.java | 136 ++++++++++++++ ...ShardingKeyGeneratorServiceLoaderTest.java | 7 +- ...dingsphere.spi.keygen.ShardingKeyGenerator | 2 +- 10 files changed, 644 insertions(+), 2 deletions(-) create mode 100644 sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafConfiguration.java create mode 100644 sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafCuratorZookeeper.java create mode 100644 sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafException.java create mode 100644 sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafExceptionHandler.java create mode 100644 sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGenerator.java create mode 100644 sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java diff --git a/sharding-core/sharding-core-common/pom.xml b/sharding-core/sharding-core-common/pom.xml index e2a651157e32b..b1d5c363e3ede 100644 --- a/sharding-core/sharding-core-common/pom.xml +++ b/sharding-core/sharding-core-common/pom.xml @@ -55,5 +55,21 @@ com.zaxxer HikariCP-java7 + + org.apache.curator + curator-framework + + + org.apache.curator + curator-client + + + org.apache.curator + curator-recipes + + + org.apache.curator + curator-test + diff --git a/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafConfiguration.java b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafConfiguration.java new file mode 100644 index 0000000000000..4698ed60ac5d6 --- /dev/null +++ b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafConfiguration.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.core.strategy.keygen; + +import lombok.Getter; +import lombok.Setter; +import org.apache.shardingsphere.api.config.TypeBasedSPIConfiguration; + +import java.util.Properties; + +/** + * Created by Jason on 2019/4/28. + */ +@Getter +@Setter +public final class LeafConfiguration extends TypeBasedSPIConfiguration { + /** + * Server list of registry center. + */ + private String serverLists; + + /** + * Namespace of registry center. + */ + private String namespace="leaf"; + + + /** + * Digest of registry center. + */ + private String digest; + + /** + * Operation timeout time in milliseconds. + */ + private int operationTimeoutMilliseconds = 500; + + /** + * Max number of times to retry. + */ + private int maxRetries = 3; + + /** + * Time interval in milliseconds on each retry. + */ + private int retryIntervalMilliseconds = 500; + + /** + * Time to live in seconds of ephemeral keys. + */ + private int timeToLiveSeconds = 60; + + public LeafConfiguration(final String type,final String serverLists) { + super(type); + this.serverLists = serverLists; + + } + +} diff --git a/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafCuratorZookeeper.java b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafCuratorZookeeper.java new file mode 100644 index 0000000000000..d5796cdec4da5 --- /dev/null +++ b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafCuratorZookeeper.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.core.strategy.keygen; + +import com.google.common.base.Charsets; +import com.google.common.base.Strings; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Created by Jason on 2019/4/28. + */ +public final class LeafCuratorZookeeper { + + private CuratorFramework client; + + public void init(final LeafConfiguration config) { + client = buildCuratorClient(config); + initCuratorClient(config); + } + + private CuratorFramework buildCuratorClient(final LeafConfiguration config) { + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .connectString(config.getServerLists()) + .retryPolicy(new ExponentialBackoffRetry(config.getRetryIntervalMilliseconds(), config.getMaxRetries(), config.getRetryIntervalMilliseconds() * config.getMaxRetries())) + .namespace(config.getNamespace()); + if (0 != config.getTimeToLiveSeconds()) { + builder.sessionTimeoutMs(config.getTimeToLiveSeconds() * 1000); + } + if (0 != config.getOperationTimeoutMilliseconds()) { + builder.connectionTimeoutMs(config.getOperationTimeoutMilliseconds()); + } + if (!Strings.isNullOrEmpty(config.getDigest())) { + builder.authorization("digest", config.getDigest().getBytes(Charsets.UTF_8)) + .aclProvider(new ACLProvider() { + + @Override + public List getDefaultAcl() { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(final String path) { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + }); + } + return builder.build(); + } + + private void initCuratorClient(final LeafConfiguration config) { + client.start(); + try { + if (!client.blockUntilConnected(config.getRetryIntervalMilliseconds() * config.getMaxRetries(), TimeUnit.MILLISECONDS)) { + client.close(); + throw new KeeperException.OperationTimeoutException(); + } + } catch (final InterruptedException | KeeperException.OperationTimeoutException ex) { + LeafExceptionHandler.handleException(ex); + } + } + + public String getDirectly(final String key) { + try { + return new String(client.getData().forPath(key), Charsets.UTF_8); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + LeafExceptionHandler.handleException(ex); + return null; + } + } + + public boolean isExisted(final String key) { + try { + return null != client.checkExists().forPath(key); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + LeafExceptionHandler.handleException(ex); + return false; + } + } + + public void persist(final String key, final String value) { + try { + if (!isExisted(key)) { + client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8)); + } else { + update(key, value); + } + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + LeafExceptionHandler.handleException(ex); + } + } + + public void update(final String key, final String value) { + try { + client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit(); + // CHECKSTYLE:OFF + } catch (final Exception ex) { + // CHECKSTYLE:ON + LeafExceptionHandler.handleException(ex); + } + } + + public String getType() { + return "zookeeper"; + } + + public long incrementCacheId(final String tableName,final long step){ + InterProcessMutex lock = new InterProcessMutex(client, tableName); + long result=Long.MIN_VALUE; + boolean lockIsAcquired = tryLock(lock); + if ( lockIsAcquired ) { + result = updateCacheIdInCenter(tableName, step); + tryRelease(lock); + } + return result; + } + + private long updateCacheIdInCenter(final String tableName,final long step){ + long cacheId = Long.parseLong(getDirectly(tableName)); + long result = cacheId+step; + update(tableName, String.valueOf(result)); + return result; + } + + private boolean tryLock(final InterProcessMutex lock){ + try{ + return lock.acquire(5,TimeUnit.SECONDS); + }catch (Exception e){ + LeafExceptionHandler.handleException(e); + return Boolean.FALSE; + } + } + + private void tryRelease(final InterProcessMutex lock){ + try{ + lock.release(); + }catch (Exception e){ + LeafExceptionHandler.handleException(e); + } + } +} diff --git a/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafException.java b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafException.java new file mode 100644 index 0000000000000..0f8bfe748db87 --- /dev/null +++ b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.core.strategy.keygen; + + +/** + * Created by Jason on 2019/4/28. + */ +public final class LeafException extends RuntimeException { + + private static final long serialVersionUID = -6417179023552012190L; + + public LeafException(final Exception cause) { + super(cause); + } +} \ No newline at end of file diff --git a/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafExceptionHandler.java b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafExceptionHandler.java new file mode 100644 index 0000000000000..42bea46bd7f8c --- /dev/null +++ b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafExceptionHandler.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.core.strategy.keygen; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.zookeeper.KeeperException; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public final class LeafExceptionHandler { + + /** + * Handle exception. + * + *

Ignore interrupt and connection invalid exception.

+ * + * @param cause to be handled exception + */ + public static void handleException(final Exception cause) { + if (null == cause) { + return; + } + if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) { + log.debug("Ignored exception for: {}", cause.getMessage()); + } else if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } else { + throw new LeafException(cause); + } + } + + private static boolean isIgnoredException(final Throwable cause) { + return cause instanceof LeafException || cause instanceof KeeperException.NoNodeException || cause instanceof KeeperException.NodeExistsException; + } +} \ No newline at end of file diff --git a/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGenerator.java b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGenerator.java new file mode 100644 index 0000000000000..fad79f2ea9ae9 --- /dev/null +++ b/sharding-core/sharding-core-common/src/main/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGenerator.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.core.strategy.keygen; + +import com.google.common.base.Preconditions; +import lombok.Getter; +import lombok.Setter; +import org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; + +/** + * Created by Jason on 2019/4/28. + */ +public class LeafSegmentKeyGenerator implements ShardingKeyGenerator { + + private LeafCuratorZookeeper leafCuratorZookeeper; + + @Getter + @Setter + private Properties properties = new Properties(); + + private long id; + + private static final String TYPE = "LEAFSEGMENT"; + + private ExecutorService incrementCacheIdExecutor; + + private boolean isInitialized = Boolean.FALSE; + + private SynchronousQueue cacheIdQueue; + + private static final float THRESHOLD=0.5F; + + private long step; + + @Override + public String getType() { + return TYPE; + } + + @Override + public synchronized Comparable generateKey() { + return null; + } + + public synchronized Comparable generateKey(final String tableName){ + if (isInitialized == Boolean.FALSE) { + initLeafSegmentKeyGenerator(tableName); + isInitialized = Boolean.TRUE; + return id; + } + id = generateKeyWhenTableStoredInCenter(tableName); + return id; + } + + private void initLeafSegmentKeyGenerator(final String tableName){ + LeafConfiguration leafConfiguration = new LeafConfiguration(TYPE,getServerList()); + leafCuratorZookeeper = new LeafCuratorZookeeper(); + leafCuratorZookeeper.init(leafConfiguration); + if(leafCuratorZookeeper.isExisted(tableName)){ + id = leafCuratorZookeeper.incrementCacheId(tableName,getStep()); + }else{ + id = getInitialValue(); + leafCuratorZookeeper.persist(tableName,String.valueOf(id)); + } + incrementCacheIdExecutor = Executors.newSingleThreadExecutor(); + cacheIdQueue = new SynchronousQueue(); + step = getStep(); + } + + private long generateKeyWhenTableStoredInCenter(final String tableName){ + ++id; + if(((id%step) >= (step*THRESHOLD-1)) && cacheIdQueue.isEmpty()){ + incrementCacheIdAsynchronous(tableName,step); + } + if((id%step) == (step-1)){ + id = tryTakeCacheId(); + } + return id; + } + + private void incrementCacheIdAsynchronous(final String tableName,final long step){ + incrementCacheIdExecutor.execute(new Runnable() { + @Override + public void run() { + long id = leafCuratorZookeeper.incrementCacheId(tableName,step); + tryPutCacheId(id); + } + }); + } + + private long tryTakeCacheId(){ + long id = Long.MIN_VALUE; + try{ + id = cacheIdQueue.take(); + }catch (Exception e){ + LeafExceptionHandler.handleException(e); + } + return id; + } + + private void tryPutCacheId(long id){ + try{ + cacheIdQueue.put(id); + }catch (Exception e){ + LeafExceptionHandler.handleException(e); + } + } + + private long getStep(){ + long result = Long.parseLong(properties.getProperty("step")); + Preconditions.checkArgument(result >= 0L && result < Long.MAX_VALUE); + return result; + } + + private long getInitialValue(){ + long result = Long.parseLong(properties.getProperty("initialValue")); + Preconditions.checkArgument(result >= 0L && result < Long.MAX_VALUE); + return result; + } + + private String getServerList(){ + String result = (String)properties.get("serverList"); + String pattern = "(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\." + + "(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d" + + "|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]" + + "\\d|25[0-5]):(6[0-4]\\d{4}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5]" + + "|[1-5]\\d{4}|[1-9]\\d{3}|[1-9]\\d{2}|[1-9]\\d|[0-9])((,(\\d|[1-9]\\d" + + "|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])" + + "\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]" + + "\\d|25[0-5]):(6[0-4]\\d{4}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5]|[1-5]\\d{4}" + + "|[1-9]\\d{3}|[1-9]\\d{2}|[1-9]\\d|[0-9]))*)"; + Preconditions.checkArgument(result.matches(pattern)); + return result; + } +} \ No newline at end of file diff --git a/sharding-core/sharding-core-common/src/main/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator b/sharding-core/sharding-core-common/src/main/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator index 2e13c5be45da4..220fe6a41ece0 100644 --- a/sharding-core/sharding-core-common/src/main/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator +++ b/sharding-core/sharding-core-common/src/main/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator @@ -15,5 +15,6 @@ # limitations under the License. # +org.apache.shardingsphere.core.strategy.keygen.LeafSegmentKeyGenerator org.apache.shardingsphere.core.strategy.keygen.SnowflakeShardingKeyGenerator org.apache.shardingsphere.core.strategy.keygen.UUIDShardingKeyGenerator diff --git a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java new file mode 100644 index 0000000000000..44df931110190 --- /dev/null +++ b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.core.strategy.keygen; + +import lombok.SneakyThrows; +import org.apache.shardingsphere.core.strategy.keygen.fixture.FixedTimeService; +import org.junit.Test; + +import java.lang.reflect.Method; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +public class LeafSegmentKeyGeneratorTest { + private LeafSegmentKeyGenerator leafSegmentKeyGenerator = new LeafSegmentKeyGenerator(); + + @Test + public void assertGetProperties() { + assertThat(leafSegmentKeyGenerator.getProperties().entrySet().size(), is(0)); + } + + @Test + public void assertSetProperties() { + Properties properties = new Properties(); + properties.setProperty("key1", "value1"); + leafSegmentKeyGenerator.setProperties(properties); + assertThat(leafSegmentKeyGenerator.getProperties().get("key1"), is((Object) "value1")); + } + + @Test + public void assertGenerateKeyWithSingleThread() { + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + leafSegmentKeyGenerator.setProperties(properties); + String tableName="/test_table_name1"; + List> expected = Arrays.>asList(100001L,100002L,100003L,100004L,100005L,100006L,100007L,100008L,100009L,100010L); + List> actual = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + actual.add(leafSegmentKeyGenerator.generateKey(tableName)); + } + assertThat(actual, is(expected)); + } + + @Test + @SneakyThrows + public void assertGenerateKeyWithMultipleThreads() { + int threadNumber = Runtime.getRuntime().availableProcessors() << 1; + ExecutorService executor = Executors.newFixedThreadPool(threadNumber); + int taskNumber = threadNumber << 2; + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + leafSegmentKeyGenerator.setProperties(properties); + final String tableName="/test_table_name2"; + Set> actual = new HashSet<>(); + for (int i = 0; i < taskNumber; i++) { + actual.add(executor.submit(new Callable>() { + + @Override + public Comparable call() { + return leafSegmentKeyGenerator.generateKey(tableName); + } + }).get()); + } + assertThat(actual.size(), is(taskNumber)); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetStepFailureWhenNegative() throws Exception{ + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("step", String.valueOf(-1L)); + leafSegmentKeyGenerator.setProperties(properties); + final String tableName="/test_table_name3"; + leafSegmentKeyGenerator.generateKey(tableName); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetInitialValueFailureWhenNegative() { + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step","3"); + properties.setProperty("initialValue", String.valueOf(-1L)); + leafSegmentKeyGenerator.setProperties(properties); + final String tableName="/test_table_name4"; + + leafSegmentKeyGenerator.generateKey(tableName); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetServerListFailureWhenPortIllegal() { + Properties properties = new Properties(); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + properties.setProperty("serverList", "192.168.123:90999"); + leafSegmentKeyGenerator.setProperties(properties); + final String tableName="/test_table_name5"; + leafSegmentKeyGenerator.generateKey(tableName); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetServerListFailureWhenIpIllegal() { + Properties properties = new Properties(); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + properties.setProperty("serverList", "267.168.123:8088"); + leafSegmentKeyGenerator.setProperties(properties); + final String tableName="/test_table_name6rmr"; + leafSegmentKeyGenerator.generateKey(tableName); + } + + +} diff --git a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/ShardingKeyGeneratorServiceLoaderTest.java b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/ShardingKeyGeneratorServiceLoaderTest.java index a5dfe70944635..5575828b14dbe 100644 --- a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/ShardingKeyGeneratorServiceLoaderTest.java +++ b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/ShardingKeyGeneratorServiceLoaderTest.java @@ -28,7 +28,12 @@ public final class ShardingKeyGeneratorServiceLoaderTest { private ShardingKeyGeneratorServiceLoader serviceLoader = new ShardingKeyGeneratorServiceLoader(); - + + @Test + public void assertNewLeafSegmentKeyGenerator() { + assertThat(serviceLoader.newService("LEAFSEGMENT", new Properties()), instanceOf(LeafSegmentKeyGenerator.class)); + } + @Test public void assertNewSnowflakeKeyGenerator() { assertThat(serviceLoader.newService("SNOWFLAKE", new Properties()), instanceOf(SnowflakeShardingKeyGenerator.class)); diff --git a/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator b/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator index 9f3aaf5bd437b..11cd89eb47c31 100644 --- a/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator +++ b/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +org.apache.shardingsphere.core.strategy.keygen.LeafSegmentKeyGenerator org.apache.shardingsphere.core.strategy.keygen.SnowflakeShardingKeyGenerator org.apache.shardingsphere.core.strategy.keygen.UUIDShardingKeyGenerator org.apache.shardingsphere.core.strategy.keygen.fixture.IncrementShardingKeyGenerator From bc2dad3d4c677e0992ecc2cc486b88f74914acaa Mon Sep 17 00:00:00 2001 From: guangyuan Date: Tue, 28 May 2019 08:31:46 +0800 Subject: [PATCH 2/6] Add LeafSegmentKeyGenerator related functions. --- .../keygen/LeafSegmentKeyGeneratorTest.java | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java index 44df931110190..ad7b126c549e2 100644 --- a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java +++ b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.core.strategy.keygen; import lombok.SneakyThrows; +import org.apache.curator.test.TestingServer; import org.apache.shardingsphere.core.strategy.keygen.fixture.FixedTimeService; import org.junit.Test; @@ -33,6 +34,7 @@ public class LeafSegmentKeyGeneratorTest { private LeafSegmentKeyGenerator leafSegmentKeyGenerator = new LeafSegmentKeyGenerator(); + @Test public void assertGetProperties() { assertThat(leafSegmentKeyGenerator.getProperties().entrySet().size(), is(0)); @@ -47,18 +49,21 @@ public void assertSetProperties() { } @Test - public void assertGenerateKeyWithSingleThread() { + @SneakyThrows + public void assertGenerateKeyWithSingleThread(){ Properties properties = new Properties(); - properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("serverList","127.0.0.1:2182"); properties.setProperty("initialValue","100001"); properties.setProperty("step","3"); leafSegmentKeyGenerator.setProperties(properties); String tableName="/test_table_name1"; List> expected = Arrays.>asList(100001L,100002L,100003L,100004L,100005L,100006L,100007L,100008L,100009L,100010L); List> actual = new ArrayList<>(); + TestingServer server = new TestingServer(2182,true); for (int i = 0; i < 10; i++) { actual.add(leafSegmentKeyGenerator.generateKey(tableName)); } + server.close(); assertThat(actual, is(expected)); } @@ -69,12 +74,13 @@ public void assertGenerateKeyWithMultipleThreads() { ExecutorService executor = Executors.newFixedThreadPool(threadNumber); int taskNumber = threadNumber << 2; Properties properties = new Properties(); - properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("serverList","127.0.0.1:2183"); properties.setProperty("initialValue","100001"); properties.setProperty("step","3"); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name2"; Set> actual = new HashSet<>(); + TestingServer server = new TestingServer(2183,true); for (int i = 0; i < taskNumber; i++) { actual.add(executor.submit(new Callable>() { @@ -84,33 +90,40 @@ public Comparable call() { } }).get()); } + server.close(); assertThat(actual.size(), is(taskNumber)); } @Test(expected = IllegalArgumentException.class) + @SneakyThrows public void assertSetStepFailureWhenNegative() throws Exception{ Properties properties = new Properties(); - properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("serverList","127.0.0.1:2184"); properties.setProperty("initialValue","100001"); properties.setProperty("step", String.valueOf(-1L)); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name3"; + TestingServer server = new TestingServer(2184,true); leafSegmentKeyGenerator.generateKey(tableName); + server.close(); } @Test(expected = IllegalArgumentException.class) + @SneakyThrows public void assertSetInitialValueFailureWhenNegative() { Properties properties = new Properties(); - properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("serverList","127.0.0.1:2185"); properties.setProperty("step","3"); properties.setProperty("initialValue", String.valueOf(-1L)); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name4"; - + TestingServer server = new TestingServer(2185,true); leafSegmentKeyGenerator.generateKey(tableName); + server.close(); } @Test(expected = IllegalArgumentException.class) + @SneakyThrows public void assertSetServerListFailureWhenPortIllegal() { Properties properties = new Properties(); properties.setProperty("initialValue","100001"); @@ -118,18 +131,23 @@ public void assertSetServerListFailureWhenPortIllegal() { properties.setProperty("serverList", "192.168.123:90999"); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name5"; + TestingServer server = new TestingServer(2186,true); leafSegmentKeyGenerator.generateKey(tableName); + server.close(); } @Test(expected = IllegalArgumentException.class) + @SneakyThrows public void assertSetServerListFailureWhenIpIllegal() { Properties properties = new Properties(); properties.setProperty("initialValue","100001"); properties.setProperty("step","3"); properties.setProperty("serverList", "267.168.123:8088"); leafSegmentKeyGenerator.setProperties(properties); - final String tableName="/test_table_name6rmr"; + final String tableName="/test_table_name6"; + TestingServer server = new TestingServer(2187,true); leafSegmentKeyGenerator.generateKey(tableName); + server.close(); } From 2eb8c39384451ea0805ad380afc8e7872dddd5cd Mon Sep 17 00:00:00 2001 From: guangyuan Date: Tue, 28 May 2019 08:43:00 +0800 Subject: [PATCH 3/6] Add LeafSegmentKeyGenerator related functions. --- .../keygen/LeafSegmentKeyGeneratorTest.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java index ad7b126c549e2..fa9ae4e08992b 100644 --- a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java +++ b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java @@ -52,14 +52,14 @@ public void assertSetProperties() { @SneakyThrows public void assertGenerateKeyWithSingleThread(){ Properties properties = new Properties(); - properties.setProperty("serverList","127.0.0.1:2182"); + properties.setProperty("serverList","127.0.0.1:2181"); properties.setProperty("initialValue","100001"); properties.setProperty("step","3"); leafSegmentKeyGenerator.setProperties(properties); String tableName="/test_table_name1"; List> expected = Arrays.>asList(100001L,100002L,100003L,100004L,100005L,100006L,100007L,100008L,100009L,100010L); List> actual = new ArrayList<>(); - TestingServer server = new TestingServer(2182,true); + TestingServer server = new TestingServer(2181,true); for (int i = 0; i < 10; i++) { actual.add(leafSegmentKeyGenerator.generateKey(tableName)); } @@ -74,13 +74,13 @@ public void assertGenerateKeyWithMultipleThreads() { ExecutorService executor = Executors.newFixedThreadPool(threadNumber); int taskNumber = threadNumber << 2; Properties properties = new Properties(); - properties.setProperty("serverList","127.0.0.1:2183"); + properties.setProperty("serverList","127.0.0.1:2181"); properties.setProperty("initialValue","100001"); properties.setProperty("step","3"); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name2"; Set> actual = new HashSet<>(); - TestingServer server = new TestingServer(2183,true); + TestingServer server = new TestingServer(2181,true); for (int i = 0; i < taskNumber; i++) { actual.add(executor.submit(new Callable>() { @@ -98,12 +98,12 @@ public Comparable call() { @SneakyThrows public void assertSetStepFailureWhenNegative() throws Exception{ Properties properties = new Properties(); - properties.setProperty("serverList","127.0.0.1:2184"); + properties.setProperty("serverList","127.0.0.1:2181"); properties.setProperty("initialValue","100001"); properties.setProperty("step", String.valueOf(-1L)); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name3"; - TestingServer server = new TestingServer(2184,true); + TestingServer server = new TestingServer(2181,true); leafSegmentKeyGenerator.generateKey(tableName); server.close(); } @@ -112,12 +112,12 @@ public void assertSetStepFailureWhenNegative() throws Exception{ @SneakyThrows public void assertSetInitialValueFailureWhenNegative() { Properties properties = new Properties(); - properties.setProperty("serverList","127.0.0.1:2185"); + properties.setProperty("serverList","127.0.0.1:2181"); properties.setProperty("step","3"); properties.setProperty("initialValue", String.valueOf(-1L)); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name4"; - TestingServer server = new TestingServer(2185,true); + TestingServer server = new TestingServer(2181,true); leafSegmentKeyGenerator.generateKey(tableName); server.close(); } @@ -131,7 +131,7 @@ public void assertSetServerListFailureWhenPortIllegal() { properties.setProperty("serverList", "192.168.123:90999"); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name5"; - TestingServer server = new TestingServer(2186,true); + TestingServer server = new TestingServer(2181,true); leafSegmentKeyGenerator.generateKey(tableName); server.close(); } @@ -145,7 +145,7 @@ public void assertSetServerListFailureWhenIpIllegal() { properties.setProperty("serverList", "267.168.123:8088"); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name6"; - TestingServer server = new TestingServer(2187,true); + TestingServer server = new TestingServer(2181,true); leafSegmentKeyGenerator.generateKey(tableName); server.close(); } From e85df597342b2391571a3a6be37a34edd1e50b2b Mon Sep 17 00:00:00 2001 From: guangyuan Date: Tue, 28 May 2019 08:57:45 +0800 Subject: [PATCH 4/6] Add LeafSegmentKeyGenerator related functions. --- .../keygen/LeafSegmentKeyGeneratorTest.java | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java index fa9ae4e08992b..19f541c5fde5b 100644 --- a/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java +++ b/sharding-core/sharding-core-common/src/test/java/org/apache/shardingsphere/core/strategy/keygen/LeafSegmentKeyGeneratorTest.java @@ -20,7 +20,7 @@ import lombok.SneakyThrows; import org.apache.curator.test.TestingServer; import org.apache.shardingsphere.core.strategy.keygen.fixture.FixedTimeService; -import org.junit.Test; +import org.junit.*; import java.lang.reflect.Method; import java.util.*; @@ -34,6 +34,8 @@ public class LeafSegmentKeyGeneratorTest { private LeafSegmentKeyGenerator leafSegmentKeyGenerator = new LeafSegmentKeyGenerator(); + private static TestingServer server; + @Test public void assertGetProperties() { @@ -49,7 +51,6 @@ public void assertSetProperties() { } @Test - @SneakyThrows public void assertGenerateKeyWithSingleThread(){ Properties properties = new Properties(); properties.setProperty("serverList","127.0.0.1:2181"); @@ -59,11 +60,9 @@ public void assertGenerateKeyWithSingleThread(){ String tableName="/test_table_name1"; List> expected = Arrays.>asList(100001L,100002L,100003L,100004L,100005L,100006L,100007L,100008L,100009L,100010L); List> actual = new ArrayList<>(); - TestingServer server = new TestingServer(2181,true); for (int i = 0; i < 10; i++) { actual.add(leafSegmentKeyGenerator.generateKey(tableName)); } - server.close(); assertThat(actual, is(expected)); } @@ -80,7 +79,6 @@ public void assertGenerateKeyWithMultipleThreads() { leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name2"; Set> actual = new HashSet<>(); - TestingServer server = new TestingServer(2181,true); for (int i = 0; i < taskNumber; i++) { actual.add(executor.submit(new Callable>() { @@ -90,12 +88,10 @@ public Comparable call() { } }).get()); } - server.close(); assertThat(actual.size(), is(taskNumber)); } @Test(expected = IllegalArgumentException.class) - @SneakyThrows public void assertSetStepFailureWhenNegative() throws Exception{ Properties properties = new Properties(); properties.setProperty("serverList","127.0.0.1:2181"); @@ -103,13 +99,10 @@ public void assertSetStepFailureWhenNegative() throws Exception{ properties.setProperty("step", String.valueOf(-1L)); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name3"; - TestingServer server = new TestingServer(2181,true); leafSegmentKeyGenerator.generateKey(tableName); - server.close(); } @Test(expected = IllegalArgumentException.class) - @SneakyThrows public void assertSetInitialValueFailureWhenNegative() { Properties properties = new Properties(); properties.setProperty("serverList","127.0.0.1:2181"); @@ -117,13 +110,10 @@ public void assertSetInitialValueFailureWhenNegative() { properties.setProperty("initialValue", String.valueOf(-1L)); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name4"; - TestingServer server = new TestingServer(2181,true); leafSegmentKeyGenerator.generateKey(tableName); - server.close(); } @Test(expected = IllegalArgumentException.class) - @SneakyThrows public void assertSetServerListFailureWhenPortIllegal() { Properties properties = new Properties(); properties.setProperty("initialValue","100001"); @@ -131,13 +121,10 @@ public void assertSetServerListFailureWhenPortIllegal() { properties.setProperty("serverList", "192.168.123:90999"); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name5"; - TestingServer server = new TestingServer(2181,true); leafSegmentKeyGenerator.generateKey(tableName); - server.close(); } @Test(expected = IllegalArgumentException.class) - @SneakyThrows public void assertSetServerListFailureWhenIpIllegal() { Properties properties = new Properties(); properties.setProperty("initialValue","100001"); @@ -145,10 +132,17 @@ public void assertSetServerListFailureWhenIpIllegal() { properties.setProperty("serverList", "267.168.123:8088"); leafSegmentKeyGenerator.setProperties(properties); final String tableName="/test_table_name6"; - TestingServer server = new TestingServer(2181,true); leafSegmentKeyGenerator.generateKey(tableName); - server.close(); } - + @BeforeClass + @SneakyThrows + public static void startServer(){ + server = new TestingServer(2181,true); + } + @AfterClass + @SneakyThrows + public static void closeServer(){ + server.close(); + } } From 981d380dade98e052f2d09d0804c5d0104895ab3 Mon Sep 17 00:00:00 2001 From: guangyuan Date: Tue, 28 May 2019 09:07:31 +0800 Subject: [PATCH 5/6] Add LeafSegmentKeyGenerator related functions --- .../org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator b/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator index 11cd89eb47c31..b3d62644bef28 100644 --- a/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator +++ b/sharding-core/sharding-core-common/src/test/resources/META-INF/services/org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator @@ -14,7 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.shardingsphere.core.strategy.keygen.LeafSegmentKeyGenerator + org.apache.shardingsphere.core.strategy.keygen.SnowflakeShardingKeyGenerator org.apache.shardingsphere.core.strategy.keygen.UUIDShardingKeyGenerator org.apache.shardingsphere.core.strategy.keygen.fixture.IncrementShardingKeyGenerator +org.apache.shardingsphere.core.strategy.keygen.LeafSegmentKeyGenerator \ No newline at end of file From 31871189b2637141ada7c837b980f9f6fd6f83e8 Mon Sep 17 00:00:00 2001 From: wangguangyuan Date: Thu, 30 May 2019 17:31:46 +0800 Subject: [PATCH 6/6] move leafgenerator from sharding-core-common to sharding-orchestration-core --- .../sharding-orchestration-core/pom.xml | 6 + .../internal/keygen/LeafException.java | 31 +++ .../internal/keygen/LeafExceptionHandler.java | 52 +++++ .../keygen/LeafSegmentKeyGenerator.java | 186 ++++++++++++++++++ .../keygen/LeafSegmentKeyGeneratorTest.java | 150 ++++++++++++++ .../CuratorZookeeperRegistryCenter.java | 24 +++ 6 files changed, 449 insertions(+) create mode 100644 sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafException.java create mode 100644 sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafExceptionHandler.java create mode 100644 sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java create mode 100644 sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java diff --git a/sharding-orchestration/sharding-orchestration-core/pom.xml b/sharding-orchestration/sharding-orchestration-core/pom.xml index f07c68028b75e..f86e0adafd23e 100644 --- a/sharding-orchestration/sharding-orchestration-core/pom.xml +++ b/sharding-orchestration/sharding-orchestration-core/pom.xml @@ -42,5 +42,11 @@ org.apache.commons commons-dbcp2 + + org.apache.shardingsphere + sharding-orchestration-reg-zookeeper-curator + 4.0.0-RC2-SNAPSHOT + compile + diff --git a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafException.java b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafException.java new file mode 100644 index 0000000000000..7801bbcb297ed --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.internal.keygen; + + +/** + * Created by Jason on 2019/4/28. + */ +public final class LeafException extends RuntimeException { + + private static final long serialVersionUID = -6417179023552012190L; + + public LeafException(final Exception cause) { + super(cause); + } +} \ No newline at end of file diff --git a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafExceptionHandler.java b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafExceptionHandler.java new file mode 100644 index 0000000000000..3d2063c3ca6a3 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafExceptionHandler.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.internal.keygen; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.zookeeper.KeeperException; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public final class LeafExceptionHandler { + + /** + * Handle exception. + * + *

Ignore interrupt and connection invalid exception.

+ * + * @param cause to be handled exception + */ + public static void handleException(final Exception cause) { + if (null == cause) { + return; + } + if (isIgnoredException(cause) || null != cause.getCause() && isIgnoredException(cause.getCause())) { + log.debug("Ignored exception for: {}", cause.getMessage()); + } else if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } else { + throw new LeafException(cause); + } + } + + private static boolean isIgnoredException(final Throwable cause) { + return cause instanceof LeafException || cause instanceof KeeperException.NoNodeException || cause instanceof KeeperException.NodeExistsException; + } +} \ No newline at end of file diff --git a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java new file mode 100644 index 0000000000000..1ec2070f13707 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.internal.keygen; + +import com.google.common.base.Preconditions; +import lombok.Getter; +import lombok.Setter; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration; +import org.apache.shardingsphere.orchestration.reg.zookeeper.curator.CuratorZookeeperRegistryCenter; +import org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; + +/** + * Created by Jason on 2019/4/28. + */ +public class LeafSegmentKeyGenerator implements ShardingKeyGenerator { + + private CuratorZookeeperRegistryCenter leafCuratorZookeeper; + + @Getter + @Setter + private Properties properties = new Properties(); + + private long id; + + private static final String TYPE = "LEAFSEGMENT"; + + private static final String NAMESPACE = "leaf_segment"; + + private ExecutorService incrementCacheIdExecutor; + + private boolean isInitialized = Boolean.FALSE; + + private SynchronousQueue cacheIdQueue; + + private static final float THRESHOLD=0.5F; + + private long step; + + @Override + public String getType() { + return TYPE; + } + + @Override + public synchronized Comparable generateKey() { + String leafKey = (String)properties.get("leaf.key"); + if (isInitialized == Boolean.FALSE) { + initLeafSegmentKeyGenerator(leafKey); + isInitialized = Boolean.TRUE; + return id; + } + id = generateKeyWhenLeafKeyStoredInCenter(leafKey); + return id; + } + + private void initLeafSegmentKeyGenerator(final String leafKey){ + RegistryCenterConfiguration leafConfiguration = new RegistryCenterConfiguration(TYPE,properties); + leafConfiguration.setNamespace(NAMESPACE); + leafConfiguration.setServerLists(getServerList()); + leafConfiguration.setDigest(getDigest()); + leafCuratorZookeeper = new CuratorZookeeperRegistryCenter (); + leafCuratorZookeeper.init(leafConfiguration); + if(leafCuratorZookeeper.isExisted(leafKey)){ + id = incrementCacheId(leafKey,getStep()); + }else{ + id = getInitialValue(); + leafCuratorZookeeper.persist(leafKey,String.valueOf(id)); + } + incrementCacheIdExecutor = Executors.newSingleThreadExecutor(); + cacheIdQueue = new SynchronousQueue<>(); + step = getStep(); + } + + private long generateKeyWhenLeafKeyStoredInCenter(final String leafKey){ + ++id; + if(((id%step) >= (step*THRESHOLD-1)) && cacheIdQueue.isEmpty()){ + incrementCacheIdAsynchronous(leafKey,step); + } + if((id%step) == (step-1)){ + id = tryTakeCacheId(); + } + return id; + } + + private void incrementCacheIdAsynchronous(final String leafKey,final long step){ + incrementCacheIdExecutor.execute(new Runnable() { + @Override + public void run() { + long id = incrementCacheId(leafKey,step); + tryPutCacheId(id); + } + }); + } + + private long tryTakeCacheId(){ + long id = Long.MIN_VALUE; + try{ + id = cacheIdQueue.take(); + }catch (Exception ex){ + Thread.currentThread().interrupt(); + } + return id; + } + + private void tryPutCacheId(long id){ + try{ + cacheIdQueue.put(id); + }catch (Exception ex){ + Thread.currentThread().interrupt(); + } + } + + private long getStep(){ + long result = Long.parseLong(properties.getProperty("step")); + Preconditions.checkArgument(result >= 0L && result < Long.MAX_VALUE); + return result; + } + + private long getInitialValue(){ + long result = Long.parseLong(properties.getProperty("initialValue")); + Preconditions.checkArgument(result >= 0L && result < Long.MAX_VALUE); + return result; + } + + private String getServerList(){ + String result = (String)properties.get("serverList"); + String pattern = "(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\." + + "(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d" + + "|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]" + + "\\d|25[0-5]):(6[0-4]\\d{4}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5]" + + "|[1-5]\\d{4}|[1-9]\\d{3}|[1-9]\\d{2}|[1-9]\\d|[0-9])((,(\\d|[1-9]\\d" + + "|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])" + + "\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]" + + "\\d|25[0-5]):(6[0-4]\\d{4}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5]|[1-5]\\d{4}" + + "|[1-9]\\d{3}|[1-9]\\d{2}|[1-9]\\d|[0-9]))*)"; + Preconditions.checkArgument(result.matches(pattern)); + return result; + } + + private String getDigest(){ + String result = (String)properties.get("digest"); + String pattern = "\\w+:\\w+|"; + Preconditions.checkArgument(result.matches(pattern)); + return result; + } + + private long incrementCacheId(final String leafKey,final long step){ + InterProcessMutex lock = leafCuratorZookeeper.initLock(leafKey); + long result=Long.MIN_VALUE; + boolean lockIsAcquired = leafCuratorZookeeper.tryLock(lock); + if ( lockIsAcquired ) { + result = updateCacheIdInCenter(leafKey, step); + leafCuratorZookeeper.tryRelease(lock); + } + return result; + } + + private long updateCacheIdInCenter(final String leafKey,final long step){ + long cacheId = Long.parseLong(leafCuratorZookeeper.getDirectly(leafKey)); + long result = cacheId+step; + leafCuratorZookeeper.update(leafKey, String.valueOf(result)); + return result; + } + +} \ No newline at end of file diff --git a/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java b/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java new file mode 100644 index 0000000000000..1919c58c0c7cf --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.internal.keygen; + +import lombok.SneakyThrows; +import org.apache.curator.test.TestingServer; +import org.apache.shardingsphere.core.strategy.keygen.LeafSegmentKeyGenerator; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +public class LeafSegmentKeyGeneratorTest { + private LeafSegmentKeyGenerator leafSegmentKeyGenerator = new LeafSegmentKeyGenerator(); + + private static TestingServer server; + + + @Test + public void assertGetProperties() { + assertThat(leafSegmentKeyGenerator.getProperties().entrySet().size(), is(0)); + } + + @Test + public void assertSetProperties() { + Properties properties = new Properties(); + properties.setProperty("key1", "value1"); + leafSegmentKeyGenerator.setProperties(properties); + assertThat(leafSegmentKeyGenerator.getProperties().get("key1"), is((Object) "value1")); + } + + @Test + public void assertGenerateKeyWithSingleThread(){ + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + leafSegmentKeyGenerator.setProperties(properties); + String tableName="/test_table_name1"; + List> expected = Arrays.>asList(100001L,100002L,100003L,100004L,100005L,100006L,100007L,100008L,100009L,100010L); + List> actual = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + actual.add(leafSegmentKeyGenerator.generateKey(tableName)); + } + assertThat(actual, is(expected)); + } + + @Test + @SneakyThrows + public void assertGenerateKeyWithMultipleThreads() { + int threadNumber = Runtime.getRuntime().availableProcessors() << 1; + ExecutorService executor = Executors.newFixedThreadPool(threadNumber); + int taskNumber = threadNumber << 2; + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + leafSegmentKeyGenerator.setProperties(properties); + final String tableName="/test_table_name2"; + Set> actual = new HashSet<>(); + for (int i = 0; i < taskNumber; i++) { + actual.add(executor.submit(new Callable>() { + + @Override + public Comparable call() { + return leafSegmentKeyGenerator.generateKey(tableName); + } + }).get()); + } + assertThat(actual.size(), is(taskNumber)); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetStepFailureWhenNegative() throws Exception{ + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("step", String.valueOf(-1L)); + leafSegmentKeyGenerator.setProperties(properties); + final String tableName="/test_table_name3"; + leafSegmentKeyGenerator.generateKey(tableName); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetInitialValueFailureWhenNegative() { + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step","3"); + properties.setProperty("initialValue", String.valueOf(-1L)); + leafSegmentKeyGenerator.setProperties(properties); + final String tableName="/test_table_name4"; + leafSegmentKeyGenerator.generateKey(tableName); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetServerListFailureWhenPortIllegal() { + Properties properties = new Properties(); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + properties.setProperty("serverList", "192.168.123:90999"); + leafSegmentKeyGenerator.setProperties(properties); + final String tableName="/test_table_name5"; + leafSegmentKeyGenerator.generateKey(tableName); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetServerListFailureWhenIpIllegal() { + Properties properties = new Properties(); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + properties.setProperty("serverList", "267.168.123:8088"); + leafSegmentKeyGenerator.setProperties(properties); + final String tableName="/test_table_name6"; + leafSegmentKeyGenerator.generateKey(tableName); + } + + @BeforeClass + @SneakyThrows + public static void startServer(){ + server = new TestingServer(2181,true); + } + + @AfterClass + @SneakyThrows + public static void closeServer(){ + server.close(); + } +} diff --git a/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java b/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java index 95538b1e098bb..289f48fba4aef 100644 --- a/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java +++ b/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java @@ -28,6 +28,7 @@ import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter; @@ -290,4 +291,27 @@ private void waitForCacheClose() { public String getType() { return "zookeeper"; } + + public InterProcessMutex initLock(String key){ + InterProcessMutex lock = new InterProcessMutex(client, key); + return lock; + } + + + public boolean tryLock(final InterProcessMutex lock){ + try{ + return lock.acquire(5,TimeUnit.SECONDS); + }catch (Exception ex){ + CuratorZookeeperExceptionHandler.handleException(ex); + return Boolean.FALSE; + } + } + + public void tryRelease(final InterProcessMutex lock){ + try{ + lock.release(); + }catch (Exception ex){ + CuratorZookeeperExceptionHandler.handleException(ex); + } + } }