1 Pipe
PipedReader是Reader类的扩展,用于读取字符流。 它的read()方法读取连接的PipedWriter的流。 同样, PipedWriter是Writer类的扩展,它完成Reader类所收缩的所有工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class PipeReaderThread implements Runnable { PipedReader pr; String name = null; public PipeReaderThread(String name, PipedReader pr) { this.name = name; this.pr = pr; } public void run() { try { while (true) { char c = (char) pr.read(); if (c != -1) { System.out.print(c); } } } catch (Exception e) { System.out.println(" PipeThread Exception: " + e); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class PipeWriterThread implements Runnable { PipedWriter pw; String name = null; public PipeWriterThread(String name, PipedWriter pw) { this.name = name; this.pw = pw; } public void run() { try { while (true) { pw.write("Testing data written...n"); pw.flush(); Thread.sleep(2000); } } catch (Exception e) { System.out.println(" PipeThread Exception: " + e); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package multiThread; import java.io.*; public class PipedCommunicationTest { public static void main(String[] args) { new PipedCommunicationTest(); } public PipedCommunicationTest() { try { PipedReader pr = new PipedReader(); PipedWriter pw = new PipedWriter(); pw.connect(pr); Thread thread1 = new Thread(new PipeReaderThread("ReaderThread", pr)); Thread thread2 = new Thread(new PipeWriterThread("WriterThread", pw)); thread1.start(); thread2.start(); } catch (Exception e) { System.out.println("PipeThread Exception: " + e); } } }
|
2 BlockQueue(新的最佳实践)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package corejava.thread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DemoExecutor { public static void main(String[] args) { Integer threadCounter = 0; BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50); CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS, blockingQueue); executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("DemoTask Rejected : " + ((DemoTask) r).getName()); System.out.println("Waiting for a second !!"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Lets add another time : " + ((DemoTask) r).getName()); executor.execute(r); } }); executor.prestartAllCoreThreads(); while (true) { threadCounter++; System.out.println("Adding DemoTask : " + threadCounter); executor.execute(new DemoTask(threadCounter.toString())); if (threadCounter == 100) break; } } }
|
3 共享数据(最基本的通信方式)