多线程系列一:线程基础

2018/01/06 12:16
阅读数 0

概念

什么是线程:运行程序会创建一个进程。进程里面包含多个线程,OS调度的最小单元是线程(轻量级进程)。

运行一个普通的java程序包含的线程:

 1 package com.lgstudy;
 2 
 3 import java.lang.management.ManagementFactory;
 4 import java.lang.management.ThreadInfo;
 5 import java.lang.management.ThreadMXBean;
 6 
 7 /**
 8  * lgs
 9  * 
10  * 
11  * 一个java程序包含的线程
12  */
13 public class ShowMainThread {
14 
15     public static void main(String[] args) {
16         //java虚拟机的线程管理接口
17         ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
18         //获取线程信息的方法
19         ThreadInfo[] threadInfos =
20                 threadMXBean.dumpAllThreads(false,false);
21         for(ThreadInfo threadInfo:threadInfos){
22             System.out.println(threadInfo.getThreadId()+":"+threadInfo.getThreadName());
23         }
24     }
25 
26 }

输出:

5:Attach Listener  //获取内存dump,线程dump

4:Signal Dispatcher  //将信号分jvm的线程

3:Finalizer  //调用对象的finalizer 方法 进行垃圾回收

2:Reference Handler  //清除Reference

1:main //程序的主入口

 为什么要用线程?

1、 充分利用CPU多处理核心;

2、 更快的响应时间(用户订单的场景,发送邮件等部分与订单业务无关和没有要求数据一致性的功能可由其他线程执行)

启动线程和退出线程 

创建线程的方法

extends Thread

implements Runnable

启动线程threadl类的start()

线程完成:1run()方法执行完成;2、抛出一个未处理的异常导致线程的提前结束

 1 package com.lgstudy;
 2 
 3 import jdk.management.resource.internal.inst.ThreadRMHooks;
 4 
 5 /**
 6  * lgs
 7  * 
 8  * 如何创建一个线程
 9  */
10 public class HowStartThread {
11 
12     private static class TestThread extends Thread{
13         @Override
14         public void run() {
15             System.out.println("TestThread is runing");
16 
17         }
18     }
19 
20     private static class TestRunable implements Runnable{
21 
22         @Override
23         public void run() {
24             System.out.println("TestRunable is runing");
25         }
26     }
27 
28     public static void main(String[] args) {
29         Thread t1 = new TestThread();
30         Thread t2 = new Thread(new TestRunable());
31         t1.start();
32         t2.start();
33 
34     }
35 
36 }

取消和中断

不安全的取消:

 单独使用一个取消标志位.

 1 package com.lgstudy.interrupt;
 2 
 3 /**
 4  * lgs
 5  * 
 6  * 使用自定义的取消标志位中断线程(不安全)
 7  */
 8 public class FlagCancel {
 9 
10     private static class TestRunable implements Runnable{
11 
12         private volatile boolean on = true;
13         private long i =0;
14 
15         @Override
16         public void run() {
17             while(on){
18                 i++;
19                 //阻塞方法,on不起作用
20                 //wait,sleep,blockingqueue(put,take)
21                 try {
22                     Thread.sleep(20000);
23                 } catch (InterruptedException e) {
24                     e.printStackTrace();
25                 }
26             }
27             System.out.println("TestRunable is runing :"+i);
28         }
29 
30         public void cancel(){
31             on = false;
32         }
33     }
34 
35 }

Stop(),suspend(),resume()是过期的api,很大的副作用,容易导致死锁(suspend():将线程挂起不会释放锁)或者数据不一致(Stop():线程在未处理完数据时就停止)

如何安全的终止线程

使用线程的中断

interrupt() 中断线程,本质是将线程的中断标志位设为true,其他线程向需要中断的线程打个招呼。是否真正进行中断由线程自己决定。

isInterrupted() 线程检查自己的中断标志位

静态方法Thread.interrupted() 中断标志位复位为false

由上面的中断机制可知Java里是没有抢占式任务,只有协作式任务。

为何要用中断,线程处于阻塞(如调用了javasleep,wait等等方法时)的时候,是不会理会我们自己设置的取消标志位的,但是这些阻塞方法都会检查线程的中断标志位。

 1 package com.lgstudy.interrupt;
 2 
 3 /**
 4  * lgs
 5  * 
 6  * 安全的中断线程
 7  */
 8 public class SafeInterrupt implements Runnable {
 9 
10     private volatile boolean on = true;
11     private long i =0;
12 
13     @Override
14     public void run() {
15         //阻塞方法wait,sleep,blockingqueue(put,take),on不起作用
16         //要加上线程的中断才能安全的终止线程
17         while(on&&Thread.currentThread().isInterrupted()){
18             i++;
19         }
20         System.out.println("TestRunable is runing :"+i);
21     }
22 
23     public void cancel(){
24         on = false;
25         //在java线程很忙的时候可能不会理会中断,所以定义一个标志位on更好
26         Thread.currentThread().interrupt();
27     }
28 }

 处理不可中断的阻塞

IO通信 inputstream read/write等阻塞方法,不会理会中断,而关闭底层的套接字socket.close()会抛出socketException

NIOselector.select()会阻塞,调用selectorwakeupclose方法会抛出ClosedSelectorException

死锁状态不响应中断的请求,这个必须重启程序,检查程序找到死锁的位置修改错误。

 1 package com.lgstudy.interrupt;
 2 
 3 /**
 4  * lgs
 5  * 
 6  * 调用阻塞方法时,如何中断线程
 7  */
 8 public class BlockInterrupt {
 9 
10     private static volatile boolean on = true;
11 
12     private static class WhenBlock implements Runnable {
13 
14         @Override
15         public void run() {
16             while (on && !Thread.currentThread().isInterrupted()) {
17                 try {
18                     //抛出中断异常的阻塞方法(wait,sleep,blockingqueue(put,take)),抛出异常后,中断标志位改成false
19                     Thread.sleep(100);
20                 } catch (InterruptedException e) {
21                     Thread.currentThread().interrupt();//重新设置一下
22                     //do my work
23                 }
24                 //清理工作结束线程
25             }
26         }
27 
28         //外部线程调用方法阻塞
29         public void cancel() {
30             on = false;
31             Thread.currentThread().interrupt();
32         }
33 
34     }
35 }

如何让我们的代码既可以响应普通的中断,又可以关闭底层的套接字呢?

覆盖线程的interrupt方法,在处理套接字异常时,再用super.interrupt()自行中断线程

 1 package com.lgstudy.interrupt;
 2 
 3 import java.io.IOException;
 4 import java.io.InputStream;
 5 import java.net.Socket;
 6 
 7 /**
 8  * lgs
 9  * 
10  * 如何覆盖线程的interrupt() 方法
11  */
12 public class OverrideInterrupt extends Thread {
13     private final Socket socket;
14     private final InputStream in;
15 
16     public OverrideInterrupt(Socket socket, InputStream in) {
17         this.socket = socket;
18         this.in = in;
19     }
20 
21     private void t(){
22     }
23 
24     @Override
25     public void interrupt() {
26         try {
27             //关闭底层的套接字
28             socket.close();
29         } catch (IOException e) {
30             e.printStackTrace();
31             //.....
32         }finally {
33             //同时中断线程
34             super.interrupt();
35         }
36 
37     }
38 }

线程的状态

新创建   线程被创建,但是没有调用start方法

可运行(RUNNABLE)  运行状态,由cpu决定是不是正在运行

被阻塞(BLOCKING)  阻塞,线程被阻塞于锁

等待/计时等待(WAITING) 等待某些条件成熟

被终止  线程执行完毕

线程的优先级

成员变量priority控制优先级,范围1-10之间,数字越高优先级越高,缺省为5,创建线程时setPriotity()可以设置优先级,不要指望他发挥作用因为线程优先级是由操作系统决定的,有的操作系统甚至会忽略jvm的线程优先级

Daemon线程

守护型线程(如GC线程),程序里没有非Daemon线程时,java程序就会退出。一般用不上,也不建议我们平时开发时使用,因为Try/Finally里的代码不一定执行的。

 1 package com.lgstudy;
 2 
 3 import com.lgstudy.threadstate.SleepUtils;
 4 
 5 /**
 6  * lgs
 7  * 
 8  * 守护线程
 9  */
10 public class Daemon {
11     public static void main(String[] args) {
12         Thread thread = new Thread(new DaemonRunner());
13         //将线程置为守护线程
14         thread.setDaemon(true);
15         thread.start();
16     }
17 
18     static class DaemonRunner implements Runnable {
19         @Override
20         public void run() {
21             try {
22                 SleepUtils.second(100);
23             } finally {
24                 System.out.println("DaemonThread finally run.");
25             }
26         }
27     }
28 }

常用方法深入理解

run()start() 

run就是一个普通的方法,跟其他类的实例方法没有任何区别他之所以能在线程里面运行时因为调用了start()方法。

Sleep

不会释放锁,当前线程变成了休眠状态所以我们在用sleep时,要把sleep放在同步代码块的外面。

yield()

不会释放锁,当前线程出让cpu占有权,当前线程变成了可运行状态,下一时刻仍然可能被cpu选中。

wait()notify()/notiyfAll()

调用以前,当前线程必须要持有锁,调用了wait() notify()/notiyfAll()会释放锁。

等待通知机制:

线程 A调用了对象Owait方法进入等待状态,线程 B调用了对象Onotify方法进行唤醒,唤醒的是在对象Owait的线程(比如线程A

notify() 唤醒一个线程,唤醒哪一个完全看cpu的心情(谨慎使用)

notiyfAll() 所有在对象Owait的线程全部唤醒(应该用notiyfAll())

 

线程间协作和通信

 

每个线程有自己栈空间孤立运行对我们没有价值。如果多个线程能够相互配合完成工作,这将会带来巨大的价值。

volatilesynchronized

多个线程同时访问一个共享的变量的时候每个线程的工作内存有这个变量的一个拷贝变量本身还是保存在共享内存中

volatile修饰字段,对这个变量的访问必须要从共享内存刷新一次。最新的修改写回共享内存。可以保证字段的可见性。绝对不是线程安全的,没有操作的原子性。

适用场景:1、一个线程写,多个线程读;2volatile变量的变化很固定即变化以后都是一个固定的值

 1 package com.lgstudy.volatiletest;
 2 
 3 /**
 4  * 
 5  * 测试Volatile型变量的操作原子性
 6  */
 7 public class VolatileThread implements Runnable {
 8 
 9     private volatile  int a= 0;
10 
11     @Override
12     public void run() {
13         synchronized (this){
14             a=a+1;
15             System.out.println(Thread.currentThread().getName()+"----"+a);
16             try {
17                 Thread.sleep(100);
18             } catch (InterruptedException e) {
19                 e.printStackTrace();
20             }
21             a=a+1;
22             System.out.println(Thread.currentThread().getName()+"----"+a);
23 
24         }
25     }
26 }
 1 package com.lgstudy.volatiletest;
 2 
 3 public class VolatileTest {
 4     public static void main(String[] args) {
 5         VolatileThread volatileThread = new VolatileThread();
 6 
 7         Thread t1 = new Thread(volatileThread);
 8         Thread t2 = new Thread(volatileThread);
 9         Thread t3 = new Thread(volatileThread);
10         Thread t4 = new Thread(volatileThread);
11         t1.start();
12         t2.start();
13         t3.start();
14         t4.start();
15     }
16 }

 

输出:

Thread-0----1
Thread-0----2
Thread-3----3
Thread-3----4
Thread-2----5
Thread-2----6
Thread-1----7
Thread-1----8

关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制。

Synchronized的类锁和对象锁,本质上是两把锁,类锁实际锁的是每一个类的class对象。对象锁锁的是当前对象实例。

 1 package com.lgstudy.syn;
 2 
 3 import com.lgstudy.threadstate.SleepUtils;
 4 
 5 /**
 6  * 类锁和实例锁
 7  */
 8 public class InstanceAndClass {
 9 
10     //测试类锁
11     private static class TestClassSyn extends Thread{
12         @Override
13         public void run() {
14             System.out.println("TestClass is going...");
15             synClass();
16         }
17     }
18 
19     //测试对象锁
20     private static class TestInstanceSyn extends Thread{
21         private InstanceAndClass instanceAndClass;
22 
23         public TestInstanceSyn(InstanceAndClass instanceAndClass) {
24             this.instanceAndClass = instanceAndClass;
25         }
26 
27         @Override
28         public void run() {
29             System.out.println("TestInstance is going..."+instanceAndClass);
30             instanceAndClass.synInstance();
31         }
32 
33     }
34 
35     //测试对象锁
36     private static class TestInstance2Syn implements Runnable{
37         private InstanceAndClass instanceAndClass;
38 
39         public TestInstance2Syn(InstanceAndClass instanceAndClass) {
40             this.instanceAndClass = instanceAndClass;
41         }
42         @Override
43         public void run() {
44             System.out.println("TestInstance2 is going..."+instanceAndClass);
45             instanceAndClass.synInstance2();
46         }
47     }
48 
49     //锁对象的方法
50     private synchronized void synInstance(){
51         SleepUtils.second(3);
52         System.out.println("synInstance is going...");
53         SleepUtils.second(3);
54         System.out.println("synInstance ended");
55     }
56 
57     //锁对象的方法
58     private synchronized void synInstance2(){
59         SleepUtils.second(3);
60         System.out.println("synInstance2 going...");
61         SleepUtils.second(3);
62         System.out.println("synInstance2 ended");
63     }
64 
65     //锁类的方法
66     private static synchronized void synClass(){
67         SleepUtils.second(1);
68         System.out.println("synClass going...");
69         SleepUtils.second(1);
70     }
71 
72     public static void main(String[] args) {
73         InstanceAndClass instanceAndClass = new InstanceAndClass();
74         Thread t1 = new TestClassSyn();
75         Thread t2 = new Thread(new TestInstanceSyn(instanceAndClass));
76         Thread t3 = new Thread(new TestInstance2Syn(instanceAndClass));
77         t2.start();
78         t3.start();
79         SleepUtils.second(1);
80         t1.start();
81     }
82 
83 }

等待和通知机制

等待方原则:

1、获取对象锁

2、如果条件不满足,调用对象的wait方法,被通知后依然要检查条件是否满足

3、条件满足以后,才能执行相关的业务逻辑

Synchronized(对象){

While(条件不满足){

对象.wait()

}

业务逻辑处理

}

通知方原则

1、 获得对象的锁;

2、 改变条件;

3、 通知所有等待在对象的线程

Synchronized(对象){

业务逻辑处理,改变条件

对象.notify/notifyAll

}

 1 package com.lgstudy.bq;
 2 
 3 import java.util.LinkedList;
 4 import java.util.List;
 5 
 6 /**
 7  * 
 8  * 有界阻塞队列/有界缓存队列
 9  */
10 public class BlockingQueueWN<T> {
11 
12     //当前队列
13     private List queue = new LinkedList<>();
14     //队列支持的最大容量
15     private final int limit;
16 
17     //外部修改队列的容量
18     public BlockingQueueWN(int limit) {
19         this.limit = limit;
20     }
21 
22     //入队
23     public synchronized void enqueue(T item) throws InterruptedException {
24         //如果当前队列的容量已经满了的话就要等待
25         while(this.queue.size()==this.limit){
26             wait();
27         }
28         //如果当前队列的容量为0的话,可以肯定有出队的线程正在等待,需要他可以准备出队了
29         if (this.queue.size()==0){
30             notifyAll();
31         }
32         //开始入队
33         this.queue.add(item);
34     }
35 
36     //出队
37     public synchronized T dequeue() throws InterruptedException {
38         //如果当前队列的容量为0的话就等待暂不出队
39         while(this.queue.size()==0){
40             wait();
41         }
42         //如果当前队列的容量已经满了的话,可以肯定有入队线程正在等待,需要唤醒他可以准备入队了
43         if (this.queue.size()==this.limit){
44             notifyAll();
45         }
46         //开始出队
47         return (T)this.queue.remove(0);
48     }
49 }

 

 1 package com.lgstudy.bq;
 2 
 3 public class BqTest {
 4     public static void main(String[] args) {
 5         BlockingQueueWN bq = new BlockingQueueWN(10);
 6         Thread threadA = new ThreadPush(bq);
 7         threadA.setName("Push");
 8         Thread threadB = new ThreadPop(bq);
 9         threadB.setName("Pop");
10         threadB.start();
11         threadA.start();
12     }
13 
14     //推数据入队列
15     private static class ThreadPush extends Thread{
16         BlockingQueueWN<Integer> bq;
17 
18         public ThreadPush(BlockingQueueWN<Integer> bq) {
19             this.bq = bq;
20         }
21 
22         @Override
23         public void run() {
24             String threadName = Thread.currentThread().getName();
25             int i = 5;
26             while(i>0){
27                 try {
28                     Thread.sleep(1000);
29                     System.out.println(" i="+i+" will push");
30                     bq.enqueue(i--);
31                 } catch (InterruptedException e) {
32                     //e.printStackTrace();
33                 }
34 
35             }
36         }
37     }
38 
39     //取数据出队列
40     private static class ThreadPop extends Thread{
41         BlockingQueueWN<Integer> bq;
42 
43         public ThreadPop(BlockingQueueWN<Integer> bq) {
44             this.bq = bq;
45         }
46         @Override
47         public void run() {
48             while(true){
49                 try {
50                     System.out.println(Thread.currentThread().getName()
51                             +" will pop.....");
52                     Integer i = bq.dequeue();
53                     System.out.println(" i="+i.intValue()+" alread pop");
54                 } catch (InterruptedException e) {
55                     //e.printStackTrace();
56                 }
57             }
58 
59         }
60     }
61 }

 

输出;

Pop will pop.....
i=5 will push
i=5 alread pop
Pop will pop.....
i=4 will push
i=4 alread pop
Pop will pop.....
i=3 will push
i=3 alread pop
Pop will pop.....
i=2 will push
i=2 alread pop
Pop will pop.....
i=1 will push
i=1 alread pop
Pop will pop.....

管道输入输出流 使用较少

管道输入输出流用于线程中间的数据传递,传输媒介是内存

PpedOutputStream/PpedInputStream 面向的字节

PipedReader/PipedWriter 面向的是字符

只适合线程间一对一的通信,适用范围较狭窄。

join方法

线程A,执行了thread.join(),线程A等待thread线程终止了以后,Ajoin后面的语句才会继续执行

 1 package com.lgstudy;
 2 
 3 /**
 4  * join的使用
 5  */
 6 public class JoinTest {
 7 
 8     static class CutInLine implements Runnable{
 9 
10         private Thread thread;
11 
12         public CutInLine(Thread thread) {
13             this.thread = thread;
14         }
15 
16         @Override
17         public void run() {
18             try {
19                 //在被插队的线程里,调用一下插队线程的join方法
20                 thread.join();
21             } catch (InterruptedException e) {
22                 e.printStackTrace();
23             }
24             System.out.println(Thread.currentThread().getName()+" will work");
25         }
26     }
27 
28     public static void main(String[] args) {
29         Thread previous = Thread.currentThread();
30         for(int i=0;i<10;i++){
31             Thread thread =
32                     new Thread(new CutInLine(previous),String.valueOf(i));
33             System.out.println(previous.getId()+" cut in the thread:"+thread.getName());
34             thread.start();
35             previous = thread;
36         }
37 
38     }
39 
40 }

ThreadLocal

本质是个map,map的键就是每个线程对象就是每个线程所拥有的值

常用方法:

initialValue()

get()

set()

remove():将当前线程局部变量的值删除,这个方法是JDK 5.0新增的方法。当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。

ThreadLocal拥有的这个变量在线程之间很独立的相互之间没有联系内存占用相对来说比较大

性能问题

串行化无锁化、异步化编程是趋势之一,比如node.jsVert.x

黄金原则:编码时候不要考虑性能优化的事情,先正确实现业务,发现性能不行,这个时候再来考虑性能优化。

等待超时模式

调用场景:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果。

假设等待时间段是T,那么可以推断出在当前时间now+T之后就会超时

等待持续时间:REMAINING=T。

·超时时间:FUTURE=now+T。

 1 public synchronized Object get(long mills) throws InterruptedException {
 2 long future = System.currentTimeMillis() + mills;
 3 long remaining = mills;
 4 // 当超时大于0并且result返回值不满足要求
 5 while ((result == null) && remaining > 0) {
 6 wait(remaining);
 7 remaining = future - System.currentTimeMillis();
 8 }
 9 return result;
10 }

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部