Skip to content

Commit

Permalink
add SessionCacheDemo #122
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed Nov 29, 2018
1 parent 338f4a0 commit b0854e7
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 0 deletions.
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@
<version>${kotlin.coroutine.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxkotlin</artifactId>
<version>2.3.0</version>
<scope>test</scope>
</dependency>
<!--
commons-lang3 v3.5 is the last version support Java 6
http://commons.apache.org/proper/commons-lang/release-history.html
Expand Down
12 changes: 12 additions & 0 deletions pom4ide.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,18 @@
<version>${kotlin.coroutine.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxkotlin</artifactId>
<version>2.3.0</version>
<scope>test</scope>
</dependency>
<!--
commons-lang3 v3.5 is the last version support Java 6
http://commons.apache.org/proper/commons-lang/release-history.html
Expand Down
127 changes: 127 additions & 0 deletions src/test/java/com/alibaba/demo/session_cache/SessionCacheDemo.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.alibaba.demo.session_cache

import com.alibaba.expandThreadPool
import com.alibaba.ttl.TransmittableThreadLocal
import com.alibaba.ttl.TtlRunnable
import com.alibaba.ttl.threadpool.TtlExecutors
import io.reactivex.Flowable
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.schedulers.Schedulers
import org.junit.*
import java.util.concurrent.*


class SessionCacheDemo {
@Test
fun invokeInThreadOfThreadPool() {
val bizService = BizService()
val printer: () -> Unit = { System.out.printf("[%20s] cache: %s%n", Thread.currentThread().name, bizService.getCacheItem()) }

executorService.submit(Callable<Item> {
bizService.getItemByCache().also { printer() }
}).get()

printer()

// here service invocation will use item cache
bizService.getItemByCache()
printer()
}

@Test
fun invokeInThreadOfRxJava() {
val bizService = BizService()
val printer: (Item) -> Unit = { System.out.printf("[%30s] cache: %s%n", Thread.currentThread().name, bizService.getCacheItem()) }

Flowable.just(bizService)
.observeOn(Schedulers.io())
.map<Item>(BizService::getItemByCache)
.doOnNext(printer)
.blockingSubscribe(printer)

// here service invocation will use item cache
bizService.getItemByCache()
.let(printer)
}

companion object {
private val executorService = Executors.newFixedThreadPool(3).let {
expandThreadPool(it)
// TTL integration for thread pool
TtlExecutors.getTtlExecutorService(it)!!
}

@BeforeClass
@JvmStatic
@Suppress("unused")
fun beforeClass() {
// expand Schedulers.io()
(0 until Runtime.getRuntime().availableProcessors() * 2)
.map {
FutureTask {
Thread.sleep(10)
it
}.apply { Schedulers.io().scheduleDirect(this) }
}
.forEach { it.get() }

// TTL integration for RxJava
RxJavaPlugins.setScheduleHandler(TtlRunnable::get)
}

@AfterClass
@JvmStatic
@Suppress("unused")
fun afterClass() {
executorService.shutdown()
executorService.awaitTermination(100, TimeUnit.MILLISECONDS)
if (!executorService.isTerminated) Assert.fail("Fail to shutdown thread pool")
}
}

@After
fun tearDown() {
BizService.clearCache()
}
}

/**
* Mock Service
*/
private class BizService {
init {
// NOTE: AVOID cache object lazy init
getCache()
}

fun getItem(): Item = Item(ThreadLocalRandom.current().nextInt(0, 10_000))

/**
* get biz data, usually use spring cache. here is simple implementation
*/
fun getItemByCache(): Item {
return getCache().computeIfAbsent(ONLY_KEY) { getItem() }
}

fun getCacheItem(): Item? = getCache()[ONLY_KEY]

companion object {
private const val ONLY_KEY = "ONLY_KEY"

private val cacheContext = object : TransmittableThreadLocal<ConcurrentMap<String, Item>>() {
// init cache
override fun initialValue(): ConcurrentMap<String, Item> = ConcurrentHashMap()
}

private fun getCache() = cacheContext.get()

fun clearCache() {
getCache().clear()
}
}
}

/**
* Mock Cache Data
*/
private data class Item(val id: Int)

0 comments on commit b0854e7

Please sign in to comment.