From 04c1b46ac6c929a6329c7d4b83d182a1f2cc7369 Mon Sep 17 00:00:00 2001 From: wgy8283335 Date: Sun, 2 Jun 2019 11:43:17 +0800 Subject: [PATCH 1/6] Add LeafRelated Functions --- .../keygen/LeafSegmentKeyGenerator.java | 199 ++++++++++++++++++ 1 file changed, 199 insertions(+) create mode 100644 sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java diff --git a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java new file mode 100644 index 0000000000000..d3c02c64fd280 --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.internal.keygen; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import lombok.Getter; +import lombok.Setter; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration; +import org.apache.shardingsphere.orchestration.reg.zookeeper.curator.CuratorZookeeperRegistryCenter; +import org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; + +/** + * Created by Jason on 2019/4/28. + */ +public class LeafSegmentKeyGenerator implements ShardingKeyGenerator { + + + private static final String TYPE = "LEAFSEGMENT"; + + private static final String NAMESPACE = "leaf_segment"; + + private static final String SLANTING_BAR = "/"; + + private static final String REGULAR_PATTERN = "^((?!/).)*$"; + + private static final String STEP = "10000"; + + private static final String INITIAL_VALUE = "1"; + + private static final float THRESHOLD=0.5F; + + private boolean isInitialized = Boolean.FALSE; + + private CuratorZookeeperRegistryCenter leafRegistryCenter; + + private long id; + + private ExecutorService incrementCacheIdExecutor; + + private SynchronousQueue cacheIdQueue; + + private long step; + + @Getter + @Setter + private Properties properties = new Properties(); + + + @Override + public String getType() { + return TYPE; + } + + @Override + public synchronized Comparable generateKey() { + String leafKey = getLeafKey(); + if (isInitialized == Boolean.FALSE) { + initLeafSegmentKeyGenerator(leafKey); + isInitialized = Boolean.TRUE; + return id; + } + id = generateKeyWhenLeafKeyStoredInCenter(leafKey); + return id; + } + + private void initLeafSegmentKeyGenerator(final String leafKey){ + leafRegistryCenter = new CuratorZookeeperRegistryCenter(); + RegistryCenterConfiguration leafConfiguration = getRegistryCenterConfiguration(); + leafRegistryCenter.init(leafConfiguration); + if(leafRegistryCenter.isExisted(leafKey)){ + id = incrementCacheId(leafKey,getStep()); + }else{ + id = getInitialValue(); + leafRegistryCenter.persist(leafKey,String.valueOf(id)); + } + incrementCacheIdExecutor = Executors.newSingleThreadExecutor(); + cacheIdQueue = new SynchronousQueue<>(); + step = getStep(); + } + + + private long generateKeyWhenLeafKeyStoredInCenter(final String leafKey){ + ++id; + if(((id%step) >= (step*THRESHOLD-1)) && cacheIdQueue.isEmpty()){ + incrementCacheIdAsynchronous(leafKey,step); + } + if((id%step) == (step-1)){ + id = tryTakeCacheId(); + } + return id; + } + + private RegistryCenterConfiguration getRegistryCenterConfiguration(){ + RegistryCenterConfiguration registryCenterConfiguration = new RegistryCenterConfiguration(TYPE,properties); + registryCenterConfiguration.setNamespace(NAMESPACE); + registryCenterConfiguration.setServerLists(getServerList()); + registryCenterConfiguration.setDigest(getDigest()); + return registryCenterConfiguration; + } + + private void incrementCacheIdAsynchronous(final String leafKey,final long step){ + incrementCacheIdExecutor.execute(new Runnable() { + @Override + public void run() { + long id = incrementCacheId(leafKey,step); + tryPutCacheId(id); + } + }); + } + + private long incrementCacheId(final String leafKey,final long step){ + InterProcessMutex lock = leafRegistryCenter.initLock(leafKey); + long result=Long.MIN_VALUE; + boolean lockIsAcquired = leafRegistryCenter.tryLock(lock); + if ( lockIsAcquired ) { + result = updateCacheIdInCenter(leafKey, step); + leafRegistryCenter.tryRelease(lock); + } + return result; + } + + + private void tryPutCacheId(long id){ + try{ + cacheIdQueue.put(id); + }catch (Exception ex){ + Thread.currentThread().interrupt(); + } + } + + private long tryTakeCacheId(){ + long id = Long.MIN_VALUE; + try{ + id = cacheIdQueue.take(); + }catch (Exception ex){ + Thread.currentThread().interrupt(); + } + return id; + } + + private long updateCacheIdInCenter(final String leafKey,final long step){ + long cacheId = Long.parseLong(leafRegistryCenter.getDirectly(leafKey)); + long result = cacheId+step; + leafRegistryCenter.update(leafKey, String.valueOf(result)); + return result; + } + + private long getStep(){ + long step = Long.parseLong(properties.getProperty("step",STEP)); + Preconditions.checkArgument(step > 0L && step < Long.MAX_VALUE); + return step; + } + + private long getInitialValue(){ + long initialValue = Long.parseLong(properties.getProperty("initialValue",INITIAL_VALUE)); + Preconditions.checkArgument(initialValue >= 0L && initialValue < Long.MAX_VALUE); + return initialValue; + } + + private String getLeafKey(){ + String leafKey = properties.getProperty("leaf.key"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(leafKey)); + Preconditions.checkArgument(leafKey.matches(REGULAR_PATTERN)); + return SLANTING_BAR+leafKey; + } + + private String getServerList(){ + String serverList = properties.getProperty("serverList"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(serverList)); + return serverList; + } + + private String getDigest(){ + return properties.getProperty("digest"); + } + +} \ No newline at end of file From ffa6107116764723cde453016345a37e6d3081fa Mon Sep 17 00:00:00 2001 From: wgy8283335 Date: Sun, 2 Jun 2019 11:43:47 +0800 Subject: [PATCH 2/6] Add LeafRelated Functions --- .../keygen/LeafSegmentKeyGeneratorTest.java | 324 ++++++++++++++++++ 1 file changed, 324 insertions(+) create mode 100644 sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java diff --git a/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java b/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java new file mode 100644 index 0000000000000..c2f1b4fde7acc --- /dev/null +++ b/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.orchestration.internal.keygen; + +import lombok.SneakyThrows; +import org.apache.curator.test.TestingServer; +import org.apache.shardingsphere.orchestration.reg.exception.RegistryCenterException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +public class LeafSegmentKeyGeneratorTest { + + private LeafSegmentKeyGenerator leafSegmentKeyGenerator = new LeafSegmentKeyGenerator(); + + private static TestingServer server; + + @BeforeClass + @SneakyThrows + public static void startServer(){ + server = new TestingServer(2181,true); + } + + @Test + public void assertGetProperties() { + assertThat(leafSegmentKeyGenerator.getProperties().entrySet().size(), is(0)); + } + + @Test + public void assertSetProperties() { + Properties properties = new Properties(); + properties.setProperty("key1", "value1"); + leafSegmentKeyGenerator.setProperties(properties); + assertThat(leafSegmentKeyGenerator.getProperties().get("key1"), is((Object) "value1")); + } + + @Test + public void assertGenerateKeyWithSingleThread(){ + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_1"); + leafSegmentKeyGenerator.setProperties(properties); + List> expected = Arrays.>asList(100001L,100002L,100003L,100004L,100005L,100006L,100007L,100008L,100009L,100010L); + List> actual = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + actual.add(leafSegmentKeyGenerator.generateKey()); + } + assertThat(actual, is(expected)); + } + + @Test + @SneakyThrows + public void assertGenerateKeyWithMultipleThreads() { + int threadNumber = Runtime.getRuntime().availableProcessors() << 1; + ExecutorService executor = Executors.newFixedThreadPool(threadNumber); + int taskNumber = threadNumber << 2; + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_2"); + leafSegmentKeyGenerator.setProperties(properties); + Set> actual = new HashSet<>(); + for (int i = 0; i < taskNumber; i++) { + actual.add(executor.submit(new Callable>() { + @Override + public Comparable call() { + return leafSegmentKeyGenerator.generateKey(); + } + }).get()); + } + assertThat(actual.size(), is(taskNumber)); + } + + @Test + @SneakyThrows + public void assertGenerateKeyWithDigest() { + int threadNumber = Runtime.getRuntime().availableProcessors() << 1; + ExecutorService executor = Executors.newFixedThreadPool(threadNumber); + int taskNumber = threadNumber << 2; + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + properties.setProperty("digest","user1:1231"); + properties.setProperty("leaf.key","test_table_3"); + leafSegmentKeyGenerator.setProperties(properties); + Set> actual = new HashSet<>(); + for (int i = 0; i < taskNumber; i++) { + actual.add(executor.submit(new Callable>() { + @Override + public Comparable call() { + return leafSegmentKeyGenerator.generateKey(); + } + }).get()); + } + assertThat(actual.size(), is(taskNumber)); + } + + @Test(expected = Exception.class) + @SneakyThrows + public void assertGenerateKeyWithWrongDigest() { + int threadNumber = Runtime.getRuntime().availableProcessors() << 1; + ExecutorService executor = Executors.newFixedThreadPool(threadNumber); + int taskNumber = threadNumber << 2; + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("step","3"); + properties.setProperty("digest","user1:98"); + properties.setProperty("leaf.key","test_table_4"); + leafSegmentKeyGenerator.setProperties(properties); + Set> actual = new HashSet<>(); + for (int i = 0; i < taskNumber; i++) { + actual.add(executor.submit(new Callable>() { + @Override + public Comparable call() { + return leafSegmentKeyGenerator.generateKey(); + } + }).get()); + } + assertThat(actual.size(), is(taskNumber)); + } + + @Test + @SneakyThrows + public void assertGenerateKeyWithDefaultStep() { + int threadNumber = Runtime.getRuntime().availableProcessors() << 1; + ExecutorService executor = Executors.newFixedThreadPool(threadNumber); + int taskNumber = threadNumber << 2; + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("initialValue","100001"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_5"); + leafSegmentKeyGenerator.setProperties(properties); + Set> actual = new HashSet<>(); + for (int i = 0; i < taskNumber; i++) { + actual.add(executor.submit(new Callable>() { + @Override + public Comparable call() { + return leafSegmentKeyGenerator.generateKey(); + } + }).get()); + } + assertThat(actual.size(), is(taskNumber)); + } + + @Test + @SneakyThrows + public void assertGenerateKeyWithDefaultInitialValue() { + int threadNumber = Runtime.getRuntime().availableProcessors() << 1; + ExecutorService executor = Executors.newFixedThreadPool(threadNumber); + int taskNumber = threadNumber << 2; + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step","3"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_6"); + leafSegmentKeyGenerator.setProperties(properties); + Set> actual = new HashSet<>(); + for (int i = 0; i < taskNumber; i++) { + actual.add(executor.submit(new Callable>() { + @Override + public Comparable call() { + return leafSegmentKeyGenerator.generateKey(); + } + }).get()); + } + assertThat(actual.size(), is(taskNumber)); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetStepFailureWhenNegative(){ + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step", String.valueOf(-1L)); + properties.setProperty("initialValue","100001"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_7"); + leafSegmentKeyGenerator.setProperties(properties); + leafSegmentKeyGenerator.generateKey(); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetStepFailureWhenZero(){ + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step", String.valueOf(0L)); + properties.setProperty("initialValue","100001"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_8"); + leafSegmentKeyGenerator.setProperties(properties); + leafSegmentKeyGenerator.generateKey(); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetStepFailureWhenTooMuch(){ + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step", String.valueOf(Long.MAX_VALUE)); + properties.setProperty("initialValue","100001"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_9"); + leafSegmentKeyGenerator.setProperties(properties); + leafSegmentKeyGenerator.generateKey(); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetInitialValueFailureWhenNegative() { + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step","3"); + properties.setProperty("initialValue", String.valueOf(-1L)); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_10"); + leafSegmentKeyGenerator.setProperties(properties); + leafSegmentKeyGenerator.generateKey(); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetInitialValueFailureWhenTooMuch() { + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step","3"); + properties.setProperty("initialValue", String.valueOf(Long.MAX_VALUE)); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_11"); + leafSegmentKeyGenerator.setProperties(properties); + leafSegmentKeyGenerator.generateKey(); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetServerListFailureWhenNull() { + Properties properties = new Properties(); + properties.setProperty("step","3"); + properties.setProperty("initialValue", "100001"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_12"); + leafSegmentKeyGenerator.setProperties(properties); + leafSegmentKeyGenerator.generateKey(); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetServerListFailureWhenArgumentEmpty() { + Properties properties = new Properties(); + properties.setProperty("serverList",""); + properties.setProperty("step","3"); + properties.setProperty("initialValue", "100001"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","test_table_13"); + leafSegmentKeyGenerator.setProperties(properties); + leafSegmentKeyGenerator.generateKey(); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetLeafKeyFailureWhenArgumentIllegal() { + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step","3"); + properties.setProperty("initialValue", "100001"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key","/test_table_14"); + leafSegmentKeyGenerator.setProperties(properties); + leafSegmentKeyGenerator.generateKey(); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetLeafKeyFailureWhenArgumentEmpty() { + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step","3"); + properties.setProperty("initialValue", "100001"); + properties.setProperty("digest",""); + properties.setProperty("leaf.key",""); + leafSegmentKeyGenerator.setProperties(properties); + leafSegmentKeyGenerator.generateKey(); + } + + @Test(expected = IllegalArgumentException.class) + public void assertSetLeafKeyFailureWhenNull() { + Properties properties = new Properties(); + properties.setProperty("serverList","127.0.0.1:2181"); + properties.setProperty("step","3"); + properties.setProperty("initialValue", "100001"); + properties.setProperty("digest",""); + leafSegmentKeyGenerator.setProperties(properties); + leafSegmentKeyGenerator.generateKey(); + } + + @AfterClass + @SneakyThrows + public static void closeServer(){ + server.close(); + } + +} From 306a46c28acb381bc1dfa33c927e8f5a32db8f58 Mon Sep 17 00:00:00 2001 From: wgy8283335 Date: Mon, 3 Jun 2019 10:49:59 +0800 Subject: [PATCH 3/6] Add LeafRelated Functions --- .../sharding-orchestration-core/pom.xml | 10 +++ .../keygen/LeafSegmentKeyGenerator.java | 82 +++++++++---------- .../keygen/LeafSegmentKeyGeneratorTest.java | 43 ++++++---- .../CuratorZookeeperRegistryCenter.java | 46 ++++++++++- 4 files changed, 123 insertions(+), 58 deletions(-) diff --git a/sharding-orchestration/sharding-orchestration-core/pom.xml b/sharding-orchestration/sharding-orchestration-core/pom.xml index f07c68028b75e..9591334781cd7 100644 --- a/sharding-orchestration/sharding-orchestration-core/pom.xml +++ b/sharding-orchestration/sharding-orchestration-core/pom.xml @@ -42,5 +42,15 @@ org.apache.commons commons-dbcp2 + + org.apache.shardingsphere + sharding-orchestration-reg-zookeeper-curator + 4.0.0-RC2-SNAPSHOT + compile + + + org.apache.curator + curator-test + diff --git a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java index d3c02c64fd280..d35befea8c856 100644 --- a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java +++ b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java @@ -34,8 +34,7 @@ /** * Created by Jason on 2019/4/28. */ -public class LeafSegmentKeyGenerator implements ShardingKeyGenerator { - +public final class LeafSegmentKeyGenerator implements ShardingKeyGenerator { private static final String TYPE = "LEAFSEGMENT"; @@ -49,7 +48,7 @@ public class LeafSegmentKeyGenerator implements ShardingKeyGenerator { private static final String INITIAL_VALUE = "1"; - private static final float THRESHOLD=0.5F; + private static final float THRESHOLD = 0.5F; private boolean isInitialized = Boolean.FALSE; @@ -67,7 +66,6 @@ public class LeafSegmentKeyGenerator implements ShardingKeyGenerator { @Setter private Properties properties = new Properties(); - @Override public String getType() { return TYPE; @@ -85,115 +83,117 @@ public synchronized Comparable generateKey() { return id; } - private void initLeafSegmentKeyGenerator(final String leafKey){ + private void initLeafSegmentKeyGenerator(final String leafKey) { leafRegistryCenter = new CuratorZookeeperRegistryCenter(); RegistryCenterConfiguration leafConfiguration = getRegistryCenterConfiguration(); leafRegistryCenter.init(leafConfiguration); - if(leafRegistryCenter.isExisted(leafKey)){ - id = incrementCacheId(leafKey,getStep()); - }else{ + if (leafRegistryCenter.isExisted(leafKey)) { + id = incrementCacheId(leafKey, getStep()); + } else { id = getInitialValue(); - leafRegistryCenter.persist(leafKey,String.valueOf(id)); + leafRegistryCenter.persist(leafKey, String.valueOf(id)); } incrementCacheIdExecutor = Executors.newSingleThreadExecutor(); cacheIdQueue = new SynchronousQueue<>(); step = getStep(); } - - private long generateKeyWhenLeafKeyStoredInCenter(final String leafKey){ + private long generateKeyWhenLeafKeyStoredInCenter(final String leafKey) { ++id; - if(((id%step) >= (step*THRESHOLD-1)) && cacheIdQueue.isEmpty()){ - incrementCacheIdAsynchronous(leafKey,step); + if (((id % step) >= (step * THRESHOLD - 1)) && cacheIdQueue.isEmpty()) { + incrementCacheIdAsynchronous(leafKey, step); } - if((id%step) == (step-1)){ + if ((id % step) == (step - 1)) { id = tryTakeCacheId(); } return id; } - private RegistryCenterConfiguration getRegistryCenterConfiguration(){ - RegistryCenterConfiguration registryCenterConfiguration = new RegistryCenterConfiguration(TYPE,properties); + private RegistryCenterConfiguration getRegistryCenterConfiguration() { + RegistryCenterConfiguration registryCenterConfiguration = new RegistryCenterConfiguration(TYPE, properties); registryCenterConfiguration.setNamespace(NAMESPACE); registryCenterConfiguration.setServerLists(getServerList()); registryCenterConfiguration.setDigest(getDigest()); return registryCenterConfiguration; } - private void incrementCacheIdAsynchronous(final String leafKey,final long step){ + private void incrementCacheIdAsynchronous(final String leafKey, final long step) { incrementCacheIdExecutor.execute(new Runnable() { @Override public void run() { - long id = incrementCacheId(leafKey,step); + long id = incrementCacheId(leafKey, step); tryPutCacheId(id); } }); } - private long incrementCacheId(final String leafKey,final long step){ + private long incrementCacheId(final String leafKey, final long step) { InterProcessMutex lock = leafRegistryCenter.initLock(leafKey); - long result=Long.MIN_VALUE; + long result = Long.MIN_VALUE; boolean lockIsAcquired = leafRegistryCenter.tryLock(lock); - if ( lockIsAcquired ) { + if (lockIsAcquired) { result = updateCacheIdInCenter(leafKey, step); leafRegistryCenter.tryRelease(lock); } - return result; + return result; } - - private void tryPutCacheId(long id){ - try{ + private void tryPutCacheId(final long id) { + try { cacheIdQueue.put(id); - }catch (Exception ex){ + } catch (Exception ex) { Thread.currentThread().interrupt(); } } - private long tryTakeCacheId(){ + private long tryTakeCacheId() { long id = Long.MIN_VALUE; - try{ + try { id = cacheIdQueue.take(); - }catch (Exception ex){ + } catch (Exception ex) { Thread.currentThread().interrupt(); } return id; } - private long updateCacheIdInCenter(final String leafKey,final long step){ - long cacheId = Long.parseLong(leafRegistryCenter.getDirectly(leafKey)); - long result = cacheId+step; + private long updateCacheIdInCenter(final String leafKey, final long step) { + String cacheIdInString = leafRegistryCenter.getDirectly(leafKey); + if (Strings.isNullOrEmpty(cacheIdInString)) { + return Long.MIN_VALUE; + } + long cacheId = Long.parseLong(cacheIdInString); + long result = cacheId + step; leafRegistryCenter.update(leafKey, String.valueOf(result)); return result; } - private long getStep(){ - long step = Long.parseLong(properties.getProperty("step",STEP)); + private long getStep() { + long step = Long.parseLong(properties.getProperty("step", STEP)); Preconditions.checkArgument(step > 0L && step < Long.MAX_VALUE); return step; } - private long getInitialValue(){ - long initialValue = Long.parseLong(properties.getProperty("initialValue",INITIAL_VALUE)); + private long getInitialValue() { + long initialValue = Long.parseLong(properties.getProperty("initialValue", INITIAL_VALUE)); Preconditions.checkArgument(initialValue >= 0L && initialValue < Long.MAX_VALUE); return initialValue; } - private String getLeafKey(){ + private String getLeafKey() { String leafKey = properties.getProperty("leaf.key"); Preconditions.checkArgument(!Strings.isNullOrEmpty(leafKey)); Preconditions.checkArgument(leafKey.matches(REGULAR_PATTERN)); - return SLANTING_BAR+leafKey; + return SLANTING_BAR + leafKey; } - private String getServerList(){ + private String getServerList() { String serverList = properties.getProperty("serverList"); Preconditions.checkArgument(!Strings.isNullOrEmpty(serverList)); return serverList; } - private String getDigest(){ + private String getDigest() { return properties.getProperty("digest"); } -} \ No newline at end of file +} diff --git a/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java b/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java index c2f1b4fde7acc..15a9a565d6b91 100644 --- a/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java +++ b/sharding-orchestration/sharding-orchestration-core/src/test/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGeneratorTest.java @@ -19,7 +19,6 @@ import lombok.SneakyThrows; import org.apache.curator.test.TestingServer; -import org.apache.shardingsphere.orchestration.reg.exception.RegistryCenterException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -130,23 +129,39 @@ public void assertGenerateKeyWithWrongDigest() { int threadNumber = Runtime.getRuntime().availableProcessors() << 1; ExecutorService executor = Executors.newFixedThreadPool(threadNumber); int taskNumber = threadNumber << 2; - Properties properties = new Properties(); - properties.setProperty("serverList","127.0.0.1:2181"); - properties.setProperty("initialValue","100001"); - properties.setProperty("step","3"); - properties.setProperty("digest","user1:98"); - properties.setProperty("leaf.key","test_table_4"); - leafSegmentKeyGenerator.setProperties(properties); - Set> actual = new HashSet<>(); + Properties propertiesBefore = new Properties(); + propertiesBefore.setProperty("serverList","127.0.0.1:2181"); + propertiesBefore.setProperty("initialValue","100001"); + propertiesBefore.setProperty("step","3"); + propertiesBefore.setProperty("digest","user2:1231"); + propertiesBefore.setProperty("leaf.key","test_table_3"); + leafSegmentKeyGenerator.setProperties(propertiesBefore); + Set> actualBefore = new HashSet<>(); for (int i = 0; i < taskNumber; i++) { - actual.add(executor.submit(new Callable>() { + actualBefore.add(executor.submit(new Callable>() { @Override public Comparable call() { return leafSegmentKeyGenerator.generateKey(); } }).get()); } - assertThat(actual.size(), is(taskNumber)); + Properties propertiesAfter = new Properties(); + propertiesAfter.setProperty("serverList","127.0.0.1:2181"); + propertiesAfter.setProperty("initialValue","100001"); + propertiesAfter.setProperty("step","3"); + propertiesAfter.setProperty("digest","user2:98"); + propertiesAfter.setProperty("leaf.key","test_table_4"); + leafSegmentKeyGenerator.setProperties(propertiesAfter); + Set> actualAfter = new HashSet<>(); + for (int i = 0; i < taskNumber; i++) { + actualAfter.add(executor.submit(new Callable>() { + @Override + public Comparable call() { + return leafSegmentKeyGenerator.generateKey(); + } + }).get()); + } + assertThat(actualAfter.size(), is(taskNumber)); } @Test @@ -198,7 +213,7 @@ public Comparable call() { } @Test(expected = IllegalArgumentException.class) - public void assertSetStepFailureWhenNegative(){ + public void assertSetStepFailureWhenNegative() { Properties properties = new Properties(); properties.setProperty("serverList","127.0.0.1:2181"); properties.setProperty("step", String.valueOf(-1L)); @@ -210,7 +225,7 @@ public void assertSetStepFailureWhenNegative(){ } @Test(expected = IllegalArgumentException.class) - public void assertSetStepFailureWhenZero(){ + public void assertSetStepFailureWhenZero() { Properties properties = new Properties(); properties.setProperty("serverList","127.0.0.1:2181"); properties.setProperty("step", String.valueOf(0L)); @@ -222,7 +237,7 @@ public void assertSetStepFailureWhenZero(){ } @Test(expected = IllegalArgumentException.class) - public void assertSetStepFailureWhenTooMuch(){ + public void assertSetStepFailureWhenTooMuch() { Properties properties = new Properties(); properties.setProperty("serverList","127.0.0.1:2181"); properties.setProperty("step", String.valueOf(Long.MAX_VALUE)); diff --git a/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java b/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java index 95538b1e098bb..501ee2fb070a4 100644 --- a/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java +++ b/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java @@ -28,6 +28,7 @@ import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.shardingsphere.orchestration.reg.api.RegistryCenter; @@ -244,11 +245,11 @@ public void childEvent(final CuratorFramework client, final TreeCacheEvent event private ChangedType getChangedType(final TreeCacheEvent event) { switch (event.getType()) { case NODE_UPDATED: - return DataChangedEvent.ChangedType.UPDATED; + return ChangedType.UPDATED; case NODE_REMOVED: - return DataChangedEvent.ChangedType.DELETED; + return ChangedType.DELETED; default: - return DataChangedEvent.ChangedType.IGNORED; + return ChangedType.IGNORED; } } @@ -290,4 +291,43 @@ private void waitForCacheClose() { public String getType() { return "zookeeper"; } + + /** + * Initialize the lock of the key. + * + * @param key key of data + * @return the lock of the key + */ + public InterProcessMutex initLock(final String key) { + return new InterProcessMutex(client, key); + } + + /** + * Try to get the lock of the key. + * + * @param lock lock of key + * @return get the lock or not + */ + public boolean tryLock(final InterProcessMutex lock) { + try { + return lock.acquire(5, TimeUnit.SECONDS); + } catch (Exception ex) { + CuratorZookeeperExceptionHandler.handleException(ex); + return Boolean.FALSE; + } + } + + /** + * Try to release the lock of the key. + * + * @param lock lock of key + */ + public void tryRelease(final InterProcessMutex lock) { + try { + lock.release(); + } catch (Exception ex) { + CuratorZookeeperExceptionHandler.handleException(ex); + } + } + } From d952bcc0af7df2ac3b660e3126528ccc85f350de Mon Sep 17 00:00:00 2001 From: wgy8283335 Date: Mon, 3 Jun 2019 13:26:11 +0800 Subject: [PATCH 4/6] Add LeafRelated Functions --- .../keygen/LeafSegmentKeyGenerator.java | 18 ++++++------------ .../CuratorZookeeperRegistryCenter.java | 16 +++++----------- 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java index d35befea8c856..463b96cb882fd 100644 --- a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java +++ b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java @@ -21,6 +21,7 @@ import com.google.common.base.Strings; import lombok.Getter; import lombok.Setter; +import lombok.SneakyThrows; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.shardingsphere.orchestration.reg.api.RegistryCenterConfiguration; import org.apache.shardingsphere.orchestration.reg.zookeeper.curator.CuratorZookeeperRegistryCenter; @@ -127,6 +128,7 @@ public void run() { }); } + @SneakyThrows private long incrementCacheId(final String leafKey, final long step) { InterProcessMutex lock = leafRegistryCenter.initLock(leafKey); long result = Long.MIN_VALUE; @@ -138,22 +140,14 @@ private long incrementCacheId(final String leafKey, final long step) { return result; } + @SneakyThrows private void tryPutCacheId(final long id) { - try { - cacheIdQueue.put(id); - } catch (Exception ex) { - Thread.currentThread().interrupt(); - } + cacheIdQueue.put(id); } + @SneakyThrows private long tryTakeCacheId() { - long id = Long.MIN_VALUE; - try { - id = cacheIdQueue.take(); - } catch (Exception ex) { - Thread.currentThread().interrupt(); - } - return id; + return cacheIdQueue.take(); } private long updateCacheIdInCenter(final String leafKey, final long step) { diff --git a/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java b/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java index 501ee2fb070a4..099e5de11643f 100644 --- a/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java +++ b/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java @@ -21,6 +21,7 @@ import com.google.common.base.Strings; import lombok.Getter; import lombok.Setter; +import lombok.SneakyThrows; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; @@ -308,13 +309,9 @@ public InterProcessMutex initLock(final String key) { * @param lock lock of key * @return get the lock or not */ + @SneakyThrows public boolean tryLock(final InterProcessMutex lock) { - try { - return lock.acquire(5, TimeUnit.SECONDS); - } catch (Exception ex) { - CuratorZookeeperExceptionHandler.handleException(ex); - return Boolean.FALSE; - } + return lock.acquire(5, TimeUnit.SECONDS); } /** @@ -322,12 +319,9 @@ public boolean tryLock(final InterProcessMutex lock) { * * @param lock lock of key */ + @SneakyThrows public void tryRelease(final InterProcessMutex lock) { - try { - lock.release(); - } catch (Exception ex) { - CuratorZookeeperExceptionHandler.handleException(ex); - } + lock.release(); } } From 555b4f82f5279dc6818477be6850c36e0bcd0b57 Mon Sep 17 00:00:00 2001 From: wgy8283335 Date: Tue, 4 Jun 2019 08:28:07 +0800 Subject: [PATCH 5/6] Add LeafRelated Functions --- .../keygen/LeafSegmentKeyGenerator.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java index 463b96cb882fd..3d31bfdbee6d9 100644 --- a/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java +++ b/sharding-orchestration/sharding-orchestration-core/src/main/java/org/apache/shardingsphere/orchestration/internal/keygen/LeafSegmentKeyGenerator.java @@ -33,7 +33,9 @@ import java.util.concurrent.SynchronousQueue; /** - * Created by Jason on 2019/4/28. + * Key generator implemented by leaf segment algorithms. + * + * @author wangguangyuan */ public final class LeafSegmentKeyGenerator implements ShardingKeyGenerator { @@ -111,11 +113,11 @@ private long generateKeyWhenLeafKeyStoredInCenter(final String leafKey) { } private RegistryCenterConfiguration getRegistryCenterConfiguration() { - RegistryCenterConfiguration registryCenterConfiguration = new RegistryCenterConfiguration(TYPE, properties); - registryCenterConfiguration.setNamespace(NAMESPACE); - registryCenterConfiguration.setServerLists(getServerList()); - registryCenterConfiguration.setDigest(getDigest()); - return registryCenterConfiguration; + RegistryCenterConfiguration result = new RegistryCenterConfiguration(TYPE, properties); + result.setNamespace(NAMESPACE); + result.setServerLists(getServerList()); + result.setDigest(getDigest()); + return result; } private void incrementCacheIdAsynchronous(final String leafKey, final long step) { @@ -162,15 +164,15 @@ private long updateCacheIdInCenter(final String leafKey, final long step) { } private long getStep() { - long step = Long.parseLong(properties.getProperty("step", STEP)); + long result = Long.parseLong(properties.getProperty("step", STEP)); Preconditions.checkArgument(step > 0L && step < Long.MAX_VALUE); - return step; + return result; } private long getInitialValue() { - long initialValue = Long.parseLong(properties.getProperty("initialValue", INITIAL_VALUE)); - Preconditions.checkArgument(initialValue >= 0L && initialValue < Long.MAX_VALUE); - return initialValue; + long result = Long.parseLong(properties.getProperty("initialValue", INITIAL_VALUE)); + Preconditions.checkArgument(result >= 0L && result < Long.MAX_VALUE); + return result; } private String getLeafKey() { @@ -181,9 +183,9 @@ private String getLeafKey() { } private String getServerList() { - String serverList = properties.getProperty("serverList"); - Preconditions.checkArgument(!Strings.isNullOrEmpty(serverList)); - return serverList; + String result = properties.getProperty("serverList"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(result)); + return result; } private String getDigest() { From 74473c0ecc69e3fe1ec8ef471fcf1f3fe455dba0 Mon Sep 17 00:00:00 2001 From: wgy8283335 Date: Tue, 4 Jun 2019 08:41:06 +0800 Subject: [PATCH 6/6] Add LeafRelated Functions --- .../zookeeper/curator/CuratorZookeeperRegistryCenter.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java b/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java index 099e5de11643f..84ddbd2f181b4 100644 --- a/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java +++ b/sharding-orchestration/sharding-orchestration-reg/sharding-orchestration-reg-zookeeper-curator/src/main/java/org/apache/shardingsphere/orchestration/reg/zookeeper/curator/CuratorZookeeperRegistryCenter.java @@ -242,15 +242,15 @@ public void childEvent(final CuratorFramework client, final TreeCacheEvent event } }); } - + private ChangedType getChangedType(final TreeCacheEvent event) { switch (event.getType()) { case NODE_UPDATED: - return ChangedType.UPDATED; + return DataChangedEvent.ChangedType.UPDATED; case NODE_REMOVED: - return ChangedType.DELETED; + return DataChangedEvent.ChangedType.DELETED; default: - return ChangedType.IGNORED; + return DataChangedEvent.ChangedType.IGNORED; } }