Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

讨论:Session级Cache场景下,TransmittableThreadLocal的使用 #122

Closed
olove opened this issue Nov 26, 2018 · 3 comments
Closed

讨论:Session级Cache场景下,TransmittableThreadLocal的使用 #122

olove opened this issue Nov 26, 2018 · 3 comments
Assignees

Comments

@olove
Copy link

olove commented Nov 26, 2018

场景

某些业务流程计算逻辑复杂,基础数据读取服务可能需要多次调用。

希望做线程级缓存(更准确地说,因为涉及多个上下游线程,其实是Session级的缓存),避免重复调用外部服务。

问题

数据产生可能来源于子线程,TransmittableThreadLocalcopy数据的时候,忽略主线程不存在的key,导致主线程无法读取到子线程新创建的ThreadLocal值。

解决方案

TransmittableThreadLocal可以增加参数控制是否需要向父线程传递新增ThreadLocal

示例代码

package com.example.practice.java.modern.tech.sandbox.library.hello;

import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;
import io.reactivex.Flowable;
import io.reactivex.functions.Consumer;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import org.junit.Before;
import org.junit.Test;

public class EasySample {

    @Before
    public void setUp() {
        RxJavaPlugins.setScheduleHandler(TtlRunnable::get);
    }

    @Test
    public void getSomethingByCache() {
        BizService bizService = new BizService();

        final Consumer<Object> consumer = result -> System.out.println(Thread.currentThread().getName() + ":" + bizService.getCache());

        Flowable.just(bizService)
                .observeOn(Schedulers.io())
                .map(BizService::getSomethingByCache)
                .doOnNext(consumer)
                .blockingSubscribe(consumer);

        // 业务在某些时刻需要用到
        Object object = bizService.getSomethingByCache();

        // doSomething...
    }

    @Test
    public void getSomethingByCache2() {
        BizService bizService = new BizService();

        // 如果在父线程生成,子线程可以直接使用
        Object object = bizService.getSomethingByCache();

        // doSomething...

        final Consumer<Object> consumer = result -> System.out.println(Thread.currentThread().getName() + ":" + bizService.getCache());

        Flowable.just(bizService)
                .observeOn(Schedulers.io())
                .map(BizService::getSomethingByCache)
                .doOnNext(consumer)
                .blockingSubscribe(consumer);
    }

    private static class BizService {
        public final TransmittableThreadLocal<Object> cache = new TransmittableThreadLocal<>();

        public Object getSomething() {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
            }

            return new Object();
        }

        /**
         * 获取业务数据,一般使用spring cache做缓存,这里简单实现
         */
        public Object getSomethingByCache() {
            Object value = cache.get();
            if (value == null) {
                value = getSomething();
                cache.set(value);
                return value;
            } else {
                return cache.get();
            }
        }

        public Object getCache() {
            return cache.get();
        }

        public void remove() {
            cache.remove();
        }
    }
}

The results:

com.alibaba.nageer.EasySample#getSomethingByCache

RxCachedThreadScheduler-1:java.lang.Object@29d7d137
main:null

com.alibaba.nageer.EasySample#getSomethingByCache2

RxCachedThreadScheduler-1:java.lang.Object@740a86a8
main:java.lang.Object@740a86a8
@oldratlee
Copy link
Member

oldratlee commented Nov 29, 2018

『Cache的问题』 与 『TTL的传递问题』是正交的。

表述如下: @olove

设计表述

在设计上/思路/概念上,

  • cache_context/cache上下文
    是一个TransmittableThreadLocal,里面持有的是Cache对象(要传递的对象)
    因为Cache是在TransmittableThreadLocal中传递,所以这个Cache是Session级的。
  • cache
    维护的cache。
    Cache(实现上可以是个Map)里维护的是 Cache的值(在下面的Demo代码中是Item)。

实现上的注意点

  • 因为ThreadLocal(包括InheritableThreadLocalTransmittableThreadLocal)的中维护的值是lazy-init的,所以在主线程中,要先主动get一下以init
    • lazy-init如果不去getThreadLocal值还不存在,自然也就不会被传递。
    • 在下面的Demo的实现流程中,这个『主动get一下以init』是 示意地 封装在了BizService的构造函数中。
  • SessionCache新建,要有对应/对称的SessionCache的清理,否则会有
    • 内存泄漏
    • 上下文污染

代码表述

对应你的Demo代码,调整后,用代码表述如下:

import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;
import io.reactivex.Flowable;
import io.reactivex.functions.Consumer;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;

public class SessionCacheDemo {
    @BeforeClass
    public static void beforeClass() {
        // RxJava 集成 TTL
        RxJavaPlugins.setScheduleHandler(TtlRunnable::get);
    }

    @Test
    public void getSomethingByCache() throws Exception {
        BizService bizService = new BizService();

        final Consumer<Object> printer = result -> System.out.printf("[%30s]: %s%n", Thread.currentThread().getName(), bizService.getCache());

        Flowable.just(bizService)
                .observeOn(Schedulers.io())
                .map(BizService::getItemByCache)
                .doOnNext(printer)
                .blockingSubscribe(printer);

        // 业务 在后续时刻 需要用到
        Object object = bizService.getItemByCache();
        printer.accept(object);
    }

    /**
     * Mock Service
     */
    private static class BizService {
        private static final String ONLY_KEY = "ONLY_KEY";

        private final TransmittableThreadLocal<ConcurrentMap<String, Item>> cache_context = new TransmittableThreadLocal<ConcurrentMap<String, Item>>() {
            @Override
            protected ConcurrentMap<String, Item> initialValue() {
                return new ConcurrentHashMap<>(); // init cache
            }
        };

        public BizService() {
            // NOTE: AVOID cache object lazy init
            cache_context.get();
        }

        public Item getItem() {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // do nothing
            }
            return new Item(ThreadLocalRandom.current().nextInt(0, 10_000));
        }

        /**
         * 获取业务数据,一般使用spring cache做缓存,这里简单实现
         */
        public Item getItemByCache() {
            final ConcurrentMap<String, Item> cache = cache_context.get();
            return cache.computeIfAbsent(ONLY_KEY, key -> getItem());
        }

        public Item getCache() {
            return cache_context.get().get(ONLY_KEY);
        }

        public void clearCache() {
            cache_context.get().clear();
        }
    }

    /**
     * Mock Cache Data
     */
    public static class Item {
        private int id;

        public Item(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return "Item{id=" + id + '}';
        }
    }
}

运行结果

[     RxCachedThreadScheduler-1]: Item{id=248}
[                          main]: Item{id=248}
[                          main]: Item{id=248}

@oldratlee oldratlee added the ❓question Further information is requested label Nov 29, 2018
@oldratlee oldratlee self-assigned this Nov 29, 2018
@olove olove closed this as completed Nov 29, 2018
oldratlee added a commit that referenced this issue Nov 29, 2018
@oldratlee
Copy link
Member

oldratlee commented Nov 29, 2018

转成Demo代码(SessionCacheDemo)添加了到代码库中了 :

@Test
fun invokeInThreadOfRxJava() {
val bizService = BizService()
val printer: (Item) -> Unit = { System.out.printf("[%30s] cache: %s%n", Thread.currentThread().name, bizService.getCacheItem()) }
Flowable.just(bizService)
.observeOn(Schedulers.io())
.map<Item>(BizService::getItemByCache)
.doOnNext(printer)
.blockingSubscribe(printer)
// here service invocation will use item cache
bizService.getItemByCache()
.let(printer)
}

运行结果:

[     pool-1-thread-2] cache: Item(id=9335)
[                main] cache: Item(id=9335)
[                main] cache: Item(id=9335)
[     RxCachedThreadScheduler-1] cache: Item(id=8624)
[                          main] cache: Item(id=8624)
[                          main] cache: Item(id=8624)

感谢 @olove 场景说明 与 示例(代码)提供! ❤️

oldratlee added a commit that referenced this issue Nov 29, 2018
oldratlee added a commit that referenced this issue Nov 29, 2018
@olove
Copy link
Author

olove commented Nov 29, 2018

谢谢@oldratlee 这么详细的说明和demo用例

@oldratlee oldratlee changed the title TransmittableThreadLocal不支持子线程往父线程透传新增threadLocal值 Session级Cache场景下,TransmittableThreadLocal的使用问题 Nov 30, 2018
oldratlee added a commit that referenced this issue Apr 30, 2019
oldratlee added a commit that referenced this issue Apr 30, 2019
@oldratlee oldratlee changed the title Session级Cache场景下,TransmittableThreadLocal的使用问题 讨论:Session级Cache场景下,TransmittableThreadLocal的使用 Aug 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants