Java多线程

4 minute read

Published:

  1. 创建一个thread

    Thread j = new Thread(new Runnable(){ @override public void run() {...}})
    
  2. Executor

    // thread pool of size 3
    ExecutorService service = Executor.newFixedThreadPool(3); 
    service.submit(a new Runnable object)
    
  3. Timing with CountDownLatch

    class SomeWork implements Runnable {
    	CountDownLatch latch;
        public SomeWork(CountDownLatch m) {
            this.latch = m;
        }
        public synchronized void myPrint(String msg) {
            System.out.println(msg);
        }
        @Override
        public void run() {
            myPrint("start" + System.currentTimeMillis());
            try {
                latch.await(10, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                System.out.println("Interrupted");
            } 
            myPrint("end" + System.currentTimeMillis());
        }
    }
       
    IN main function
       
    CountDownLatch latch = new CountDownLatch(10);
    Create a thread pool and submit Somework to the thread pool.      
        
    
    1. ReadWriteLock

      ReadWriteLock from java.util.concurrent.locks.ReadWriteLock
      

      控制读写的锁,可以锁定读或者写

      public class TheadSafeMap {
          static int count = 0;
          static Object t1;
          static Map<String, String> map = new HashMap<>();
            
          static class Task implements Runnable {
              String k, v;
              ReadWriteLock lock;
              Task(String a, String b, ReadWriteLock l) {
                  k = a;
                  v = b;
                  lock = l;			// Pass the readwritelock to a runnable object
                  					// Now it holds a readwritelock
              }
              @Override
              public void run() {
                  lock.writeLock().lock();	  // lock writing to this scope
                  try {
                      map.put(k, v);
                      Thread.sleep(10);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } finally {
                      lock.writeLock().unlock(); // unlock writing to this scope
                  }
              }
          }
            
          public static void main(String[] args) {
              ExecutorService executor = Executors.newFixedThreadPool(2);
              ReadWriteLock lock = new ReentrantReadWriteLock();	// The lock to be passed
              List<String> arr = Arrays.asList("jekr", "asdfasdf", "asdf");
              // for each element in the arr, submit a task to thread pool which writes the element to the map;
              arr.stream().forEach(s -> executor.submit(() -> {
                  lock.writeLock().lock();	// writing lock
                  try {
                      map.put(s, "ok");
                      Thread.sleep(10);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } finally {
                      // after excution or any exception, release the lock
                      lock.writeLock().unlock();
                  }
              }));
            		
              // for each element in the arr submitt a task to thread pool which reads the element from the map
              arr.stream().forEach(s -> executor.submit(()-> {
                  lock.readLock().lock();		// reading lock
                  try {
                      System.out.println(map.get(s));
                      Thread.sleep(10);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } finally {
                      // remember to realease the lock.
                      lock.readLock().unlock();
                  }
              }));
            
              executor.submit(new Task("a", "b", lock));	// writing Task as a class
              executor.submit(()-> {
                  lock.readLock().lock();
                  try {
                      System.out.println(map.get("a"));
                      Thread.sleep(10);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } finally {
                      lock.readLock().unlock();
                  }
              });
            
              executor.shutdown();	// shut down the executor
          }
      }
            
      
      1. ReentrantLock()

        基本的锁,使用方式与readwritelock差不多

        有lock和unlock方法,同时控制读写。可用isLocked查看状态,isHeldByCurrentThread来查看是不是被当前线程拥有。tryLock将在锁没有被其它线程锁上的时候马上获取这个锁,并返回true/false结果。

      2. Future

        Future<Integer> myFuture = new FutureTask<>(()->1)
            or
        Future<String> future = executor.submit(()->"jerk");
                 
        - get from the future
                 
        String result = null;
        try {
            result = future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        
      3. Synchronized object

        example

        object t1;
        count = 0;
                 
        // a stream of integer from 0 to 100
        IntStream.range(0, 100).forEach(
        	i -> executor.submit(() -> {
                    synchronized(t1) {
                        count ++;
                    }
                })
        );
                 
        
      4. notify and wait

        Baeldung image

        Image from Baeldung tutorial

        使用wait方法会使当前线程停止运作并将等待另一个线程的notify或者notifyAll(唤醒所有在等待的线程)。wait(long timeout)可以设置等待时间限制

        notify会通知正在等待的任意一个线程,所以应用于功能相似的thread的唤醒、

        notifyAll会唤醒所有正在等待的线程, 但是在唤醒的时候要加上一些必要的条件检查,以免线程因为一些其它原因被唤醒(supirious wake)

        比如让一个队列写线程唤醒队列读线程(如在之前的C++多线程里的例子一样),在唤醒时要检查一些队列是否为空,为空则继续等待。

        一个例子(还是来自Baeldung tutorial)

        public class Data {
            private String packet;				// the packet to be sent
            private boolean transfer = true;	// transfer status
                   
            public synchronized void send(String packet) {
                while (!transfer) {				// when the packet is not transferrred to the receiver, wait for notification
                    try { 
                        wait();
                    } catch (InterruptedException e)  {
                        Thread.currentThread().interrupt(); 
                        Log.error("Thread interrupted", e); 
                    }
                }
                // Transfer a new packet (received notification, like ACK in TCP :)), notify the receiver to receive the packet
                transfer = false;	
                          
                this.packet = packet;
                notifyAll();
            }
                   
            public synchronized String receive() {
                // While the packet is being transferred to the 
                while (transfer) {
                    try {
                        wait();
                    } catch (InterruptedException e)  {
                        Thread.currentThread().interrupt(); 
                        Log.error("Thread interrupted", e); 
                    }
                }
                // Notify the sender to send a new packet
                transfer = true;
                notifyAll();
                return packet;
            }
        }
                 
        

        为什么要用while loop包裹?

        条件检查,notifyAll会唤醒所有正在等待的线程,所以要检查条件是否满足被唤醒。

        再继续实现Sender class 和 Receiver class

        public class Sender implements Runnable {
            private Data data;	// Data class implemented before
            public Sender(Data d){ this.data = d; }
         	public void run() {
                String packets[] = {
                  ..., "EOF"
                };
                for (String packet : packets) {
                    data.send(packet);	// send the packets using the send function implemented before
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
                    } catch (InterruptedException e)  {
                        Thread.currentThread().interrupt(); 
                        Log.error("Thread interrupted", e); 
                    }
                }
            }
        }
                 
        public class Receiver implements Runnable {
            private Data load;
            public Receiver(Data d){ this.data = d; }
            public void run() {
                for(String msg = load.receive(); !msg.equals("EOF");
                  msg = load.receive()) {	// while receiving messages
                   System.out.println(receivedMessage);
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt(); 
                        Log.error("Thread interrupted", e); 
                    }
                }
            }
        }