Java多线程
Published:
创建一个thread
Thread j = new Thread(new Runnable(){ @override public void run() {...}})
Executor
// thread pool of size 3 ExecutorService service = Executor.newFixedThreadPool(3); service.submit(a new Runnable object)
Timing with CountDownLatch
class SomeWork implements Runnable { CountDownLatch latch; public SomeWork(CountDownLatch m) { this.latch = m; } public synchronized void myPrint(String msg) { System.out.println(msg); } @Override public void run() { myPrint("start" + System.currentTimeMillis()); try { latch.await(10, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { System.out.println("Interrupted"); } myPrint("end" + System.currentTimeMillis()); } } IN main function CountDownLatch latch = new CountDownLatch(10); Create a thread pool and submit Somework to the thread pool.
ReadWriteLock
ReadWriteLock from java.util.concurrent.locks.ReadWriteLock
控制读写的锁,可以锁定读或者写
public class TheadSafeMap { static int count = 0; static Object t1; static Map<String, String> map = new HashMap<>(); static class Task implements Runnable { String k, v; ReadWriteLock lock; Task(String a, String b, ReadWriteLock l) { k = a; v = b; lock = l; // Pass the readwritelock to a runnable object // Now it holds a readwritelock } @Override public void run() { lock.writeLock().lock(); // lock writing to this scope try { map.put(k, v); Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.writeLock().unlock(); // unlock writing to this scope } } } public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); ReadWriteLock lock = new ReentrantReadWriteLock(); // The lock to be passed List<String> arr = Arrays.asList("jekr", "asdfasdf", "asdf"); // for each element in the arr, submit a task to thread pool which writes the element to the map; arr.stream().forEach(s -> executor.submit(() -> { lock.writeLock().lock(); // writing lock try { map.put(s, "ok"); Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { // after excution or any exception, release the lock lock.writeLock().unlock(); } })); // for each element in the arr submitt a task to thread pool which reads the element from the map arr.stream().forEach(s -> executor.submit(()-> { lock.readLock().lock(); // reading lock try { System.out.println(map.get(s)); Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { // remember to realease the lock. lock.readLock().unlock(); } })); executor.submit(new Task("a", "b", lock)); // writing Task as a class executor.submit(()-> { lock.readLock().lock(); try { System.out.println(map.get("a")); Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.readLock().unlock(); } }); executor.shutdown(); // shut down the executor } }
ReentrantLock()
基本的锁,使用方式与readwritelock差不多
有lock和unlock方法,同时控制读写。可用isLocked查看状态,isHeldByCurrentThread来查看是不是被当前线程拥有。tryLock将在锁没有被其它线程锁上的时候马上获取这个锁,并返回true/false结果。
Future
Future<Integer> myFuture = new FutureTask<>(()->1) or Future<String> future = executor.submit(()->"jerk"); - get from the future String result = null; try { result = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
Synchronized object
example
object t1; count = 0; // a stream of integer from 0 to 100 IntStream.range(0, 100).forEach( i -> executor.submit(() -> { synchronized(t1) { count ++; } }) );
notify and wait
Image from Baeldung tutorial
使用wait方法会使当前线程停止运作并将等待另一个线程的notify或者notifyAll(唤醒所有在等待的线程)。wait(long timeout)可以设置等待时间限制
notify会通知正在等待的任意一个线程,所以应用于功能相似的thread的唤醒、
notifyAll会唤醒所有正在等待的线程, 但是在唤醒的时候要加上一些必要的条件检查,以免线程因为一些其它原因被唤醒(supirious wake)
比如让一个队列写线程唤醒队列读线程(如在之前的C++多线程里的例子一样),在唤醒时要检查一些队列是否为空,为空则继续等待。
一个例子(还是来自Baeldung tutorial)
public class Data { private String packet; // the packet to be sent private boolean transfer = true; // transfer status public synchronized void send(String packet) { while (!transfer) { // when the packet is not transferrred to the receiver, wait for notification try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } // Transfer a new packet (received notification, like ACK in TCP :)), notify the receiver to receive the packet transfer = false; this.packet = packet; notifyAll(); } public synchronized String receive() { // While the packet is being transferred to the while (transfer) { try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } // Notify the sender to send a new packet transfer = true; notifyAll(); return packet; } }
为什么要用while loop包裹?
条件检查,notifyAll会唤醒所有正在等待的线程,所以要检查条件是否满足被唤醒。
再继续实现Sender class 和 Receiver class
public class Sender implements Runnable { private Data data; // Data class implemented before public Sender(Data d){ this.data = d; } public void run() { String packets[] = { ..., "EOF" }; for (String packet : packets) { data.send(packet); // send the packets using the send function implemented before try { Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } } } public class Receiver implements Runnable { private Data load; public Receiver(Data d){ this.data = d; } public void run() { for(String msg = load.receive(); !msg.equals("EOF"); msg = load.receive()) { // while receiving messages System.out.println(receivedMessage); try { Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Log.error("Thread interrupted", e); } } } }