Java多线程 - 线程变量传递ThreadLocal
# 特点
- 线程并发:在多线程并发的场景下
- 传递数据:可以通过 ThreadLocal 在同一线程,不同组件中传递公共变量
- 线程隔离:每个线程的变量都是独立的,不会互相影响
# 常用方法
方法声明 | 描述 |
---|---|
ThreadLocal() | 创建 ThreadLocal 对象 |
public void set( T value) | 设置当前线程绑定的局部变量 |
public T get() | 获取当前线程绑定的局部变量 |
public void remove() | 移除当前线程绑定的局部变量 |
# 案例:数据隔离
# 背景
public class MyDemo {
private String content;
private String getContent() {
return content;
}
private void setContent(String content) {
this.content = content;
}
public static void main(String[] args) {
MyDemo demo = new MyDemo();
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
demo.setContent(Thread.currentThread().getName() + "的数据");
System.out.println("-----------------------");
System.out.println(Thread.currentThread().getName() + "--->" + demo.getContent());
}
});
thread.setName("线程" + i);
thread.start();
}
}
}
控制台输出:
-----------------------
-----------------------
-----------------------
-----------------------
-----------------------
线程2--->线程2的数据
线程4--->线程2的数据
线程1--->线程2的数据
线程3--->线程0的数据
线程0--->线程2的数据
多次运行上述代码,可发现多个线程在访问同一个变量的时候出现的异常,线程间的数据没有隔离。
# 解决方案
# ThreadLocal 类
点击查看
public class MyDemo1 {
private static ThreadLocal<String> tl = new ThreadLocal<>();
private String content;
private String getContent() {
return tl.get();
}
private void setContent(String content) {
tl.set(content);
}
public static void main(String[] args) {
MyDemo demo = new MyDemo();
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
demo.setContent(Thread.currentThread().getName() + "的数据");
System.out.println("-----------------------");
System.out.println(Thread.currentThread().getName() + "--->" + demo.getContent());
}
});
thread.setName("线程" + i);
thread.start();
}
}
}
# synchronized 关键字
点击查看
public class Demo02 {
private String content;
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public static void main(String[] args) {
Demo02 demo02 = new Demo02();
for (int i = 0; i < 5; i++) {
Thread t = new Thread(){
@Override
public void run() {
synchronized (Demo02.class){
demo02.setContent(Thread.currentThread().getName() + "的数据");
System.out.println("-------------------------------------");
String content = demo02.getContent();
System.out.println(Thread.currentThread().getName() + "--->" + content);
}
}
};
t.setName("线程" + i);
t.start();
}
}
}
# ThreadLocal 与 synchronized 的区别
synchronized | ThreadLocal | |
---|---|---|
原理 | 同步机制采用’以时间换空间’的方式,只提供了一份变量,让不同的线程排队访问 | ThreadLocal 采用’以空间换时间’的方式,为每一个线程都提供了一份变量的副本,从而实现同时访问而相不干扰 |
侧重点 | 多个线程之间访问资源的同步 | 多线程中让每个线程之间的数据相互隔离 |
# 小结
虽然使用 ThreadLocal 和 synchronized 都能解决问题,但是使用 ThreadLocal 更为合适,因为这样可以使程序拥有更高的并发性。
# 案例:应用事务
# 背景
Service
public class AccountService {
public boolean transfer(String outUser, String inUser, int money) {
AccountDao ad = new AccountDao();
try {
// 转出
ad.out(outUser, money);
// 模拟转账过程中的异常
int i = 1/0;
// 转入
ad.in(inUser, money);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
}
Dao
public class AccountDao {
public void out(String outUser, int money) throws SQLException {
String sql = "update account set money = money - ? where name = ?";
Connection conn = JdbcUtils.getConnection();
PreparedStatement pstm = conn.prepareStatement(sql);
pstm.setInt(1,money);
pstm.setString(2,outUser);
pstm.executeUpdate();
JdbcUtils.release(pstm,conn);
}
public void in(String inUser, int money) throws SQLException {
String sql = "update account set money = money + ? where name = ?";
Connection conn = JdbcUtils.getConnection();
PreparedStatement pstm = conn.prepareStatement(sql);
pstm.setInt(1,money);
pstm.setString(2,inUser);
pstm.executeUpdate();
JdbcUtils.release(pstm,conn);
}
}
JdbcUtils
public class JdbcUtils {
public static void commitAndClose(Connection conn) {
try {
if(conn != null){
//提交事务
conn.commit();
//释放连接
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void rollbackAndClose(Connection conn) {
try {
if(conn != null){
//回滚事务
conn.rollback();
//释放连接
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
存在问题:
- service 操作需要进行两次 DDL 操作,而代码不具备原子性。
- 两个 DDL 获取的
connection
不是同一个,即便在 servicegetConnection
手动提交事务也不在同一个事务内。
# 解决方案
开启事务
为了保证所有的操作在一个事务中,案例中使用的连接必须是同一个:service 层开启事务的 connection 需要跟 dao 层访问数据库的 connection 保持一致
线程并发情况下,每个线程只能操作各自的 connection
# ThreadLocal 类
改造 JdbcUtils
public class JdbcUtils {
//ThreadLocal对象 : 将connection绑定在当前线程中
private static final ThreadLocal<Connection> tl = new ThreadLocal();
// c3p0 数据库连接池对象属性
private static final ComboPooledDataSource ds = new ComboPooledDataSource();
// 获取连接
public static Connection getConnection() throws SQLException {
//取出当前线程绑定的connection对象
Connection conn = tl.get();
if (conn == null) {
//如果没有,则从连接池中取出
conn = ds.getConnection();
//再将connection对象绑定到当前线程中
tl.set(conn);
}
return conn;
}
//释放资源
public static void release(AutoCloseable... ios) {
for (AutoCloseable io : ios) {
if (io != null) {
try {
io.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public static void commitAndClose() {
try {
Connection conn = getConnection();
//提交事务
conn.commit();
//解除绑定
tl.remove();
//释放连接
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void rollbackAndClose() {
try {
Connection conn = getConnection();
//回滚事务
conn.rollback();
//解除绑定
tl.remove();
//释放连接
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
改造 Service
public class AccountService {
public boolean transfer(String outUser, String inUser, int money) {
AccountDao ad = new AccountDao();
try {
Connection conn = JdbcUtils.getConnection();
//开启事务
conn.setAutoCommit(false);
// 转出 : 这里不需要传参了 !
ad.out(outUser, money);
// 模拟转账过程中的异常
// int i = 1 / 0;
// 转入
ad.in(inUser, money);
//事务提交
JdbcUtils.commitAndClose();
} catch (Exception e) {
e.printStackTrace();
//事务回滚
JdbcUtils.rollbackAndClose();
return false;
}
return true;
}
}
# 源码分析
# set 方法
/**
* 设置当前线程对应的ThreadLocal的值
*
* @param value 将要保存在当前线程对应的ThreadLocal的值
*/
public void set(T value) {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 判断map是否存在
if (map != null)
// 存在则调用map.set设置此实体entry
map.set(this, value);
else
// 1)当前线程Thread 不存在ThreadLocalMap对象
// 2)则调用createMap进行ThreadLocalMap对象的初始化
// 3)并将 t(当前线程)和value(t对应的值)作为第一个entry存放至ThreadLocalMap中
createMap(t, value);
}
/**
* 获取当前线程Thread对应维护的ThreadLocalMap
*
* @param t the current thread 当前线程
* @return the map 对应维护的ThreadLocalMap
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
/**
*创建当前线程Thread对应维护的ThreadLocalMap
*
* @param t 当前线程
* @param firstValue 存放到map中第一个entry的值
*/
void createMap(Thread t, T firstValue) {
//这里的this是调用此方法的threadLocal
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
# get 方法
/**
* 返回当前线程中保存ThreadLocal的值
* 如果当前线程没有此ThreadLocal变量,
* 则它会通过调用{@link #initialValue} 方法进行初始化值
*
* @return 返回当前线程对应此ThreadLocal的值
*/
public T get() {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 如果此map存在
if (map != null) {
// 以当前的ThreadLocal 为 key,调用getEntry获取对应的存储实体e
ThreadLocalMap.Entry e = map.getEntry(this);
// 对e进行判空
if (e != null) {
@SuppressWarnings("unchecked")
// 获取存储实体 e 对应的 value值
// 即为我们想要的当前线程对应此ThreadLocal的值
T result = (T)e.value;
return result;
}
}
/*
初始化 : 有两种情况有执行当前代码
第一种情况: map不存在,表示此线程没有维护的ThreadLocalMap对象
第二种情况: map存在, 但是没有与当前ThreadLocal关联的entry
*/
return setInitialValue();
}
/**
* 初始化
*
* @return the initial value 初始化后的值
*/
private T setInitialValue() {
// 调用initialValue获取初始化的值
// 此方法可以被子类重写, 如果不重写默认返回null
T value = initialValue();
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 判断map是否存在
if (map != null)
// 存在则调用map.set设置此实体entry
map.set(this, value);
else
// 1)当前线程Thread 不存在ThreadLocalMap对象
// 2)则调用createMap进行ThreadLocalMap对象的初始化
// 3)并将 t(当前线程)和value(t对应的值)作为第一个entry存放至ThreadLocalMap中
createMap(t, value);
// 返回设置的值value
return value;
}
总结: 先获取当前线程的 ThreadLocalMap 变量,如果存在则返回值,不存在则创建并返回初始值。
# remove 方法
/**
* 删除当前线程中保存的ThreadLocal对应的实体entry
*/
public void remove() {
// 获取当前线程对象中维护的ThreadLocalMap对象
ThreadLocalMap m = getMap(Thread.currentThread());
// 如果此map存在
if (m != null)
// 存在则调用map.remove
// 以当前ThreadLocal为key删除对应的实体entry
m.remove(this);
}
# initialValue 方法
/**
* 返回当前线程对应的ThreadLocal的初始值
* 此方法的第一次调用发生在,当线程通过get方法访问此线程的ThreadLocal值时
* 除非线程先调用了set方法,在这种情况下,initialValue 才不会被这个线程调用。
* 通常情况下,每个线程最多调用一次这个方法。
*
* <p>这个方法仅仅简单的返回null {@code null};
* 如果程序员想ThreadLocal线程局部变量有一个除null以外的初始值,
* 必须通过子类继承{@code ThreadLocal} 的方式去重写此方法
* 通常, 可以通过匿名内部类的方式实现
*
* @return 当前ThreadLocal的初始值
*/
protected T initialValue() {
return null;
}
此方法的作用是 返回该线程局部变量的初始值。
(1) 这个方法是一个延迟调用方法,从上面的代码我们得知,在 set 方法还未调用而先调用了 get 方法时才执行,并且仅执行 1 次。
(2)这个方法缺省实现直接返回一个 null。
(3)如果想要一个除 null 之外的初始值,可以重写此方法。(备注: 该方法是一个 protected 的方法,显然是为了让子类覆盖而设计的)
# ThreadLocalMap 源码分析
ThreadLocalMap 是 ThreadLocal 的内部类,没有实现 Map 接口,用独立的方式实现了 Map 的功能,其内部的 Entry 也是独立实现。
# 成员变量
/**
* 初始容量 —— 必须是2的整次幂
*/
private static final int INITIAL_CAPACITY = 16;
/**
* 存放数据的table,Entry类的定义在下面分析
* 同样,数组长度必须是2的整次幂。
*/
private Entry[] table;
/**
* 数组里面entrys的个数,可以用于判断table当前使用量是否超过阈值。
*/
private int size = 0;
/**
* 进行扩容的阈值,表使用量大于它的时候进行扩容。
*/
private int threshold; // Default to 0
跟 HashMap 类似
INITIAL_CAPACITY
代表这个 Map 的初始容量;table
是一个 Entry 类型的数组,用于存储数据;size
代表表中的存储数目;threshold
代表需要扩容时对应 size 的阈值。
# 存储结构 - Entry
/*
* Entry继承WeakReference,并且用ThreadLocal作为key.
* 如果key为null(entry.get() == null),意味着key不再被引用,
* 因此这时候entry也可以从table中清除。
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
在 ThreadLocalMap 中,也是用 Entry 来保存 K-V 结构数据的。不过 Entry 中的 key 只能是 ThreadLocal 对象,这点在构造方法中已经限定死了。
另外,Entry 继承 WeakReference,也就是 key(ThreadLocal)是弱引用,其目的是将 ThreadLocal 对象的生命周期和线程生命周期解绑。