ETJava Beta | Java    注册   登录
  • 搜索:
  • ThreadLocal 源码浅析

    发表于      阅读(1)     博客类别:Crawler     转自:https://www.cnblogs.com/fuxing/p/18264815
    如有侵权 请联系我们删除  (页面底部联系我们)  

    前言

    多线程在访问同一个共享变量时很可能会出现并发问题,特别是在多线程对共享变量进行写入时,那么除了加锁还有其他方法避免并发问题吗?本文将详细讲解 ThreadLocal 的使用及其源码。


    一、什么是 ThreadLocal?

    ThreadLocal 是 JDK 包提供的,它提供了线程本地变量,也就是说,如果你创建了一个 ThreadLocal 变量,那么访问这个变量的每一个线程,都创建这个变量的一个本地副本。

    这样可以解决什么问题呢?当多个线程操作这个变量时,实际操作的是自己线程本地内存里的数据,从而避免线程安全问题

    如下图,线程表中的每个线程,都有自己 ThreadLocal 变量,线程操作这个变量只是在自己的本地内存在,跟其他线程是隔离的。

    image.png

    二、如何使用 ThreadLocal

    ThreadLocal 就是一个简单的容器,使用起来也没有难度,初始化后仅需通过 get/set 方法进行操作即可。

    如下代码,开辟两个线程对 ThreadLocal 变量进行操作,获取的值是不同的。

    public class FuXing {
    
        /**
         * 初始化ThreadLocal
         */
        private static final ThreadLocal<String> myThreadLocal = new ThreadLocal<>();
    
        public static void main (String[] args) {
            // 线程1中操作 myThreadLocal
            new Thread(()->{
                myThreadLocal.set("thread 1");		//set方法设置值
                System.out.println(myThreadLocal.get());	//get方法获取值"thread 1"
            },"thread 1").start();
    
            // 线程2中操作 myThreadLocal
            new Thread(()->{
                myThreadLocal.set("thread 2");		//set方法设置值
                System.out.println(myThreadLocal.get());	//get方法获取值"thread 2"
            },"thread 2").start();
        }
    }
    

    三、ThreadLocal 实现原理

    ThreadLocal 是如何保证操作的对象只被当前线程进行访问呢,我们通过源码一起进行分析学习。

    一般分析源码我们都先看它的构造方法是如何初始化的,接着通过对 ThreadLocal 的简单使用,我们知道了关键的两个方法 set/get,所以源码分析也按照这个顺序。

    1. 构造方法

    泛型类的空参构造,没有什么特别的

    2. set 方法源码

    源码如下,ThreadLocalMap 是什么呢?由于比较复杂,这里先不做解释,你暂时可以理解为是一个 HashMap,其中 key 为 ThreadLocal 当前对象,value 就是我们设置的值,后面会单独解释源码。

    public void set(T value) {
        //获取本地线程
        Thread t = Thread.currentThread();
    
        //获取当前线程下的threadLocals对象,对象类型是ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        if (map != null)
            //获取到则添加值
            map.set(this, value);
        else
            //否则初始化ThreadLocalMap --第一次设置值
            createMap(t, value);
    }
    
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
    

    3. get 方法源码

    public T get() {
        //获取本地线程
        Thread t = Thread.currentThread();
    
        //获取当前线程下的threadLocals对象,对象类型是ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        if (map != null) {
    
            //通过当前的ThreadLocal作为key去获取对应value
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                //@SuppressWarnings忽略告警的注解
                //"unchecked"表示未经检查的转换相关的警告,通常出现在泛型编程中
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        //threadLocals为空或它的Entry为空时,需要对其进行初始化操作。
        return setInitialValue();
    }
    
    private T setInitialValue() {
        //初始化为null
        T value = initialValue();
        
        //获取当前线程
        Thread t = Thread.currentThread();
        
        //获取当前线程下的threadLocals对象,对象类型是ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        
        //返回的其实就是个null
        return value;
    }
    
    protected T initialValue() {
        return null;
    }
    

    4. remove 方法源码

    核心也是 ThreadLocalMap 中的 remove 方法,会删除 key 对应的 Entry,具体源码后面统一在 ThreadLocalMap 源码中分析。

    public void remove() {
        //获取当前线程下的threadLocals对象,对象类型是ThreadLocalMap
        ThreadLocalMap m = getMap(Thread.currentThread());
        if (m != null)
            //通过当前的ThreadLocal作为key调用remove
            m.remove(this);
    }
    

    5. ThreadLocalMap 源码

    ThreadLocalMap 是 ThreadLocal 的一个静态内部类,看了上面的几个源码解释,可以了解到 ThreadLocalMap 其实才是核心。

    简单的说,ThreadLocalMap 与 HashMap 类似,如,初始容量 16,一定范围内扩容,Entry 数组存储等,那它与 HashMap 有什么不同呢,下面将对源码进行详解。

    ThreadLocalMap 的底层数据结构:

    image.png

    5.1 常量

    //初始容量,一定是2的幂等数。
    private static final int INITIAL_CAPACITY = 16;
    
    // Entry 数组
    private Entry[] table;
    
    //table的长度
    private int size = 0;
    
    //扩容阈值
    private int threshold; 
    
    //设置扩容阈值,长度的 2 / 3
    private void setThreshold(int len) {
        threshold = len * 2 / 3;
    }
    
    //计算下一个存储位置
    private static int nextIndex(int i, int len) {
        return ((i + 1 < len) ? i + 1 : 0);
    }
    
    // 计算前一个存储位置
    private static int prevIndex(int i, int len) {
        return ((i - 1 >= 0) ? i - 1 : len - 1);
    }
    

    5.2 Entry 相关源码

    由于 Entry 是底层核心源码,所有的操作几乎都是围绕着它来进行的,所以关于 Entry 的源码会比较多,我一一拆分进行分析讲解。

    静态内部类 Entry

    这个是 ThreadLocalMap 的底层数据结构,Entry 数组,每个 Entry 对象,这里的 Entry 继承了 WeakReference,关于弱引用不懂得,可以看我的另一篇文章《Java 引用》

    然后将 Entry 的 key 设置承了 弱引用,这有什么作用呢?作用是当 ThreadLocal 失去强引用后,在系统GC时,只要发现弱引用,不管系统堆空间使用是否充足,都会回收掉 key,进而 Entry 被内部清理。

    //静态内部类Entry
    static class Entry extends WeakReference<ThreadLocal<?>> {
        Object value;
        Entry(ThreadLocal<?> k, Object v) {
            // key为弱引用
            super(k);
            value = v;
        }
    }
    

    获取 Entry

    拿到当前线程中对应的 ThreadLocal 所在的 Entry,找不到的话会重新寻找,因为当前的 Entry 可能已经扩容,扩容后会重新计算索引位置,详情见扩容机制源码。

    源码中的计算索引位置的算法我没有解释,这个我会放在后面解释,涉及到了如何解决 Hash 冲突的问题,这个和我们熟知的 HashMap 是不同的。

    //获取Entry
    private Entry getEntry(ThreadLocal<?> key) {
        //计算索引位置
        int i = key.threadLocalHashCode & (table.length - 1);
        Entry e = table[i];
    
        //找到了就返回Entry
        if (e != null && e.get() == key)
                return e;
        else
            //没找到则重新寻找,因为可能发生扩容导致索引重新计算
            return getEntryAfterMiss(key, i, e);
    }
    
    //重新获取Entry --从当前索引i的位置向后搜索
    private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
        Entry[] tab = table;
        int len = tab.length;
    
        //循环遍历,获取对应的 ThreadLocal 所在的 Entry
        while (e != null) {
            //获取Entry对象的弱引用,WeakReference的方法
            ThreadLocal<?> k = e.get();
            if (k == key)
                return e;
            if (k == null)
                //清除无效 Entry,详解见下方
                expungeStaleEntry(i);
            else
                //计算下一个索引位置
                i = nextIndex(i, len);
            
            //可以理解为指针后移
            e = tab[i];
        }
        return null;
    }
    

    清除无效 Entry

    expunge 删除,抹去,stale 陈旧的,没有用的

    第 1 个方法:
    根据索引删除对应的桶位,并从给定索引开始,遍历清除无效的 Entry,何为无效?就是当 Entry 的 key 为 null 时,代表 key 已经被 GC 掉了,对应的 Entry 就无效了。

    第 2 个方法:
    删除Entry数组中所有无效的Entry,方法中的e.get() == null,代表key被回收了。

    第 3 个方法:
    清除一些失效桶位,它执行对数数量的扫描,向后遍历logn个位置,如8,4,2,1。

    方法 2、3 最后都通过方法 1 进行桶位的删除。

    //根据索引删除对应的桶位
    private int expungeStaleEntry(int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
    
        //删除该桶位的元素,并将数组长度减1
        tab[staleSlot].value = null;
        tab[staleSlot] = null;
        size--;
    
        Entry e;
        int i;
        //从当前索引开始,直到当前 Entry为null才会停止遍历
        for (i = nextIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = nextIndex(i, len)) {
            //获取Entry对象的弱引用,WeakReference的方法
            ThreadLocal<?> k = e.get();
            if (k == null) {//说明key已失效
                //删除该桶位的元素,并将数组长度减1
                e.value = null;
                tab[i] = null;
                size--;
            } else {//说明key有效,需要将其Rehash
                //计算rehash后索引位置
                int h = k.threadLocalHashCode & (len - 1);
                if (h != i) {
                    tab[i] = null;
                    //移动元素位置,若rehash后索引位置有其他元素,则继续向后移动,直至为空
                    while (tab[h] != null)
                        h = nextIndex(h, len);
                    tab[h] = e;
                }
            }
        }
        //直到当前 Entry为null才会停止遍历,i为其索引
        return i;
    }
    
    //删除Entry数组中所有无效的Entry,用于rehash时
    private void expungeStaleEntries() {
        Entry[] tab = table;
        int len = tab.length;
        for (int j = 0; j < len; j++) {
            Entry e = tab[j];
            //获取Entry对象的弱引用,Entry不为空而弱引用为空,代表被GC了
            if (e != null && e.get() == null)
                //根据索引删除对应的桶位
                expungeStaleEntry(j);
        }
    }
    
    //清楚一些清除桶位,它执行对数数量的扫描
    private boolean cleanSomeSlots(int i, int n) {
        boolean removed = false;
        Entry[] tab = table;
        int len = tab.length;
        //向后遍历logn个位置,如8,4,2,1
        do {
            i = nextIndex(i, len);
            Entry e = tab[i];
            //获取Entry对象的弱引用,Entry不为空而弱引用为空,代表被GC了
            if (e != null && e.get() == null) {
                n = len;
                removed = true;
                //根据索引删除对应的桶位
                i = expungeStaleEntry(i);
            }
        } while ( (n >>>= 1) != 0);//对数递减
        return removed;
    }
    

    替换无效 Entry

    替换失效元素,用在对 Entry 进行 set 操作时,如果 set 的 key 是失效的,则需要用新的替换它。

    这里不仅仅处理了当前的失效元素,还会将其他失效的元素进行清理,因为这里是当 key 为 null 时才进行的替换操作。

    那什么时候 key 为 null 呢?这个除了主动的 remove 之外,就只有 ThreadLocal 的弱引用被 GC 掉了。

    这里是在 set 操作时出现的,还出现了 key 为 null 的无效元素,代表已经之前发生过 GC 了,很可能Entry 数组中还可能出现其他无效元素,所以源码中会出现向前遍历和向后遍历的情况。

    向前遍历好理解,就是通过遍历找第一个失效元素的索引。向后遍历比较难理解,这里我先简单说一下 ThreadLocal 用的开放地址的方式来解决 hash 冲突的,具体原理我后面会在讲 hash 冲突时单独讲。

    这种情况下,很可能当前的失效元素对应的并不是 hascode 在 staleSlot 的Entry。因为 hash 冲突后,Entry 会后移,那么此元素的 hascode 对应的桶位很有可能往后移了,所以我们要向后找到它,并且和当前的 staleSlot 进行替换。

    如果不进行此操作的话,很有可能在 set 操作时,在 ThreadLocalMap 中会出现两个桶位,都被某个ThreadLocal 指向。

    private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                   int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
        Entry e;
    
        //记录失效元素的索引
        int slotToExpunge = staleSlot;
        //从失效元素位置向前遍历,直到当前 Entry为null才会停止遍历
        for (int i = prevIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = prevIndex(i, len))
            if (e.get() == null)
                //更新失效元素的索引,目的是找第一个失效的元素
                slotToExpunge = i;
    
        //从失效元素向后遍历
        for (int i = nextIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
            //找到了对应key
            if (k == key) {
                //更新该位置的value
                e.value = value;
                //把失效元素换到当前位置
                tab[i] = tab[staleSlot];
                //把当前Entry移动到失效元素位置
                tab[staleSlot] = e;
                
                //slotToExpunge是第一个失效元素的索引,若条件成立,向前没有失效元素
                if (slotToExpunge == staleSlot)
                    //从当前索引开始,清理失效元素
                    slotToExpunge = i;
                
                // 清理失效元素,详情见清除无效Entry相关源码
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                return;
            }
            
            //代表向前遍历没有找到第一个失效元素的位置
            if (k == null && slotToExpunge == staleSlot)
                //所以条件成立的i是向后遍历的的第一个失效元素的位置
                slotToExpunge = i;
        }
        
        //没找到key,则在失效元素索引的位置,新建Entry
        tab[staleSlot].value = null;
        tab[staleSlot] = new Entry(key, value);
        
        // 条件成立说明在找到了staleSlot前面找到了其他的失效元素
        if (slotToExpunge != staleSlot)
            
            // 清理失效元素,详情见清除无效Entry相关源码
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }
    

    5.3 构造方法

    还有一个基于 parentMap 的构造方法,由于目前仅在创建 InheritableThreadLocal 时调用,关于它这里不详细展开,后续会针对该类进行详解。

    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        // 初始化数组
        table = new Entry[INITIAL_CAPACITY];
    
        //计算存储位置
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    
        //存储元素,并将size设置为1
        table[i] = new Entry(firstKey, firstValue);
        size = 1;
    
        //设置扩容阈值
        setThreshold(INITIAL_CAPACITY);
    }
    

    5.4 set 方法源码

    设置 key,vlaue,key 就是 ThreadLocal 对象。

    private void set(ThreadLocal<?> key, Object value) {
        Entry[] tab = table;
        int len = tab.length;
        //计算索引位置
        int i = key.threadLocalHashCode & (len-1);
    
        //从当前索引开始,直到当前Entry为null才会停止遍历
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get();
    
            //如果key存在且等于当前key,代表之前存在的,直接覆盖
            if (k == key) {
                e.value = value;
                return;
            }
            //如果key不存在,说明已失效,需要替换,详情见替换无效Entry源码
            if (k == null) {
                replaceStaleEntry(key, value, i);
                return;
            }
        }
    
        //没有key则新建一个Entry即可
        tab[i] = new Entry(key, value);
        int sz = ++size;
    
        //清理一些失效元素,若清理失败且达到常量中的扩容阈值,则进行rehash操作
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }
    
    //删除Entry数组中所有无效的Entry并扩容
    private void rehash() {
        //删除Entry数组中所有无效的Entry
        expungeStaleEntries();
        if (size >= threshold - threshold / 4)
            //扩容,详情见下面的扩容机制源码
            resize();
    }
    

    5.5 remove 方法源码

    删除key对应的entry

    private void remove(ThreadLocal<?> key) {
        Entry[] tab = table;
        int len = tab.length;
        //计算存储位置
        int i = key.threadLocalHashCode & (len-1);
        
        //从当前索引开始,直到当前Entry为null才会停止遍历
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            if (e.get() == key) {
                //清除该对象的强引用,下次在通过get方法获取引用则返回null
                e.clear();
    
                //清除无效元素
                expungeStaleEntry(i);
                return;
            }
        }
    }
    

    5.6 扩容机制源码

    将元素转移到新的Entry 数组,长度是原来的两倍。

    private void resize() {
        //创建原数组长度两倍的新数组
        Entry[] oldTab = table;
        int oldLen = oldTab.length;
        int newLen = oldLen * 2;
        Entry[] newTab = new Entry[newLen];
        int count = 0;	//计算当前元素数量
        for (int j = 0; j < oldLen; ++j) {
            Entry e = oldTab[j];
            if (e != null) {
                ThreadLocal<?> k = e.get();
                if (k == null) {	//key失效则值也顺便设为null
                    e.value = null; 	// Help the GC
                } else {
                    //重新计算索引位置
                    int h = k.threadLocalHashCode & (newLen - 1);
    
                    //移动元素位置,若rehash后索引位置有其他元素,则继续向后移动,直至为空
                    while (newTab[h] != null)
                        h = nextIndex(h, newLen);
                    newTab[h] = e;
                    count++;
                }
            }
        }
        setThreshold(newLen);
        size = count;
        table = newTab;
    }
    

    四、ThreadLocalMap 的 Hash 冲突

    Java 中大部分都是使用拉链法法解决 Hash 冲突的,而 ThreadLocalMap 是通过开放地址法来解决 Hash 冲突,这两者有什么不同,下面我讲介绍一下。

    1. 拉链法

    拉链法也叫链地址法,经典的就是 HashMap 解决 Hash 冲突的方法,如下图。将所有的 hash 值相同的元素组成一个链表,除此外 HashMap 还进行了链表转红黑树的优化。

    image.png

    2. 开放地址法

    原理是当发生hash冲突时,不引入额外的数据结构,会以当前地址为基准,通过“多次探测”来处理哈希冲突,探测方式主要包括线性探测、平方探测和多次哈希等,ThreadLocalMap 使用的是线性探测法。

    image.png

    简单说,就是一旦发生了冲突,就去探测寻找下一个空的散列地址,根据上面的源码也能大致了解该处理方式。
    源码中的公式是key.threadLocalHashCode & (length - 1)

    公式类似 HashMap 的寻址算法,详情见HashMap源码,由于数组长度是 2 的 n 次幂,所以这里的与运算就是取模,得到索引 i,这样做是为了分布更均匀,减少冲突产生。

    threadLocalHashCode 源码如下:

    private final int threadLocalHashCode = nextHashCode();
    
    //初始化线程安全的Integer
    private static AtomicInteger nextHashCode =
        new AtomicInteger();
    
    //斐波那契散列乘数 --结果分布更均匀
    private static final int HASH_INCREMENT = 0x61c88647;
    
    //自增返回下一个hash code
    private static int nextHashCode() {
        
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }
    

    线性探测法的缺点:

    1. 不适用于存储大量数据,容易产生“聚集现象”;
    2. 删除元素需要清除无效元素;

    五、注意事项

    1. 关于内存泄漏

    在了解了 ThreadLocal 的内部实现以后,我们知道了数据其实存储在 ThreadLocalMap 中。这就意味着,线程只要不退出,则引用一直存在。

    当线程退出时,Thread 类会对一些资源进行清理,其中就有threadLocals,源码如下:

    private void exit() {
        if (group != null) {
            group.threadTerminated(this);
            group = null;
        }
        target = null;
        //加速一些资源的清理
        threadLocals = null;
        inheritableThreadLocals = null;
        inheritedAccessControlContext = null;
        blocker = null;
        uncaughtExceptionHandler = null;
    }
    

    因此,当使用的线程一直没有退出(如使用线程池),这时如果将一些大对象放入 ThreadLocal 中,且没有及时清理,就可能会出现内存泄漏的风险

    所以我们要养成习惯每次使用完 ThreadLocal 都要调用 remove 方法进行清理。

    2. 关于数据混乱

    通过对内存泄漏的解释,我们了解了当使用的线程一直没有退出,而又没有即使清理 ThreadLocal,则其中的数据会一直存在。

    这除了内存泄漏还有什么问题呢?我们在开发过程中,请求一般都是通过 Tomcat 处理,而其在处理请求时采用的就是线程池。

    这就意味着请求线程被 Tomcat 回收后,不一定会立即销毁,如果不在请求结束后主动 remove 线程中的 ThreadLocal 信息,可能会影响后续逻辑,拿到脏数据。

    我在开发过程中就遇到了这个问题,详情见ThreadLocal中的用户信息混乱问题。所以无论如何,在每次使用完 ThreadLocal 都要调用 remove 方法进行清理。

    3. 关于继承性

    同一个 ThreadLocal 变量,在父线程中被设置值后,在子线程其实是获取不到的。通过源码我们也知道,我们操作的都是当前线程下的 ThreadLocalMap ,所以这其实是正常的。

    测试代码如下:

    public class FuXing {
    
        /**
         * 初始化ThreadLocal
         */
        private static final ThreadLocal<String> myThreadLocal = new ThreadLocal<>();
    
        public static void main (String[] args) {
            myThreadLocal.set("father thread");
            System.out.println(myThreadLocal.get()); 	//father thread
    
            new Thread(()->{
                System.out.println(myThreadLocal.get());	//null
            },"thread 1").start();
        }
    }
    

    那么这可能会导致什么问题呢?比如我们在本服务调用外部服务,或者本服务开启新线程去进行异步操作,其中都无法获取 ThreadLocal 中的值。

    虽然都有其他解决方法,但是有没有让子线程也能直接获取到父线程的 ThreadLocal 中的值呢?这就用到了 InheritableThreadLocal。

    public class FuXing {
    
        /**
         * 初始化ThreadLocal
         */
        private static final InheritableThreadLocal<String> myThreadLocal 
                = new InheritableThreadLocal<>();
    
        public static void main (String[] args) {
            myThreadLocal.set("father thread");
            System.out.println(myThreadLocal.get()); 	//father thread
    
            new Thread(()->{
                System.out.println(myThreadLocal.get());	//father thread
            },"thread 1").start();
        }
    }
    

    InheritableThreadLocal 就是继承了 ThreadLocal,在创建和获取变量实例 inheritableThreadLocals 而不再是threadLocals,源码如下。

    public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    
        protected T childValue(T parentValue) {
            return parentValue;
        }
    
        ThreadLocalMap getMap(Thread t) {
           return t.inheritableThreadLocals;
        }
    
        void createMap(Thread t, T firstValue) {
            t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
        }
    }
    

    总结

    本文主要讲述了 ThreadLocal 的使用以及对其源码进行了详解,了解了 ThreadLocal 可以线程隔离的原因。通过对 ThreadLocalMap 的分析,知道了其底层数据结构和如何解决 Hash 冲突的。

    最后通过对 ThreadLocal 特点的分析,了解到有哪些需要注意的点,避免以后开发过程中遇到类似问题,若发现其他问题欢迎指正交流。


    参考:

    [1] 翟陆续/薛宾田. Java并发编程之美.

    [2] 葛一鸣/郭超. 实战Java高并发程序设计.

    [3] 靳宇栋. Hello 算法.