Clover Coverage Report - Backport Util Concurrent v3.0
Coverage timestamp: Fri May 9 2008 11:05:23 EST
../../../../../../../img/srcFileCovDistChart10.png 0% of files have more coverage
43   254   16   10.75
22   75   0.37   4
4     4  
1    
 
  Exchanger       Line # 72 43 16 92.8% 0.92753625
 
  (6)
 
1    /*
2    * Written by Doug Lea with assistance from members of JCP JSR-166
3    * Expert Group and released to the public domain, as explained at
4    * http://creativecommons.org/licenses/publicdomain
5    */
6   
7    package edu.emory.mathcs.backport.java.util.concurrent;
8    import edu.emory.mathcs.backport.java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
9    import edu.emory.mathcs.backport.java.util.concurrent.locks.*;
10    import edu.emory.mathcs.backport.java.util.concurrent.helpers.*;
11   
12    /**
13    * A synchronization point at which threads can pair and swap elements
14    * within pairs. Each thread presents some object on entry to the
15    * {@link #exchange exchange} method, matches with a partner thread,
16    * and receives its partner's object on return.
17    *
18    * <p><b>Sample Usage:</b>
19    * Here are the highlights of a class that uses an {@code Exchanger}
20    * to swap buffers between threads so that the thread filling the
21    * buffer gets a freshly emptied one when it needs it, handing off the
22    * filled one to the thread emptying the buffer.
23    * <pre>{@code
24    * class FillAndEmpty {
25    * Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
26    * DataBuffer initialEmptyBuffer = ... a made-up type
27    * DataBuffer initialFullBuffer = ...
28    *
29    * class FillingLoop implements Runnable {
30    * public void run() {
31    * DataBuffer currentBuffer = initialEmptyBuffer;
32    * try {
33    * while (currentBuffer != null) {
34    * addToBuffer(currentBuffer);
35    * if (currentBuffer.isFull())
36    * currentBuffer = exchanger.exchange(currentBuffer);
37    * }
38    * } catch (InterruptedException ex) { ... handle ... }
39    * }
40    * }
41    *
42    * class EmptyingLoop implements Runnable {
43    * public void run() {
44    * DataBuffer currentBuffer = initialFullBuffer;
45    * try {
46    * while (currentBuffer != null) {
47    * takeFromBuffer(currentBuffer);
48    * if (currentBuffer.isEmpty())
49    * currentBuffer = exchanger.exchange(currentBuffer);
50    * }
51    * } catch (InterruptedException ex) { ... handle ...}
52    * }
53    * }
54    *
55    * void start() {
56    * new Thread(new FillingLoop()).start();
57    * new Thread(new EmptyingLoop()).start();
58    * }
59    * }
60    * }</pre>
61    *
62    * <p>Memory consistency effects: For each pair of threads that
63    * successfully exchange objects via an {@code Exchanger}, actions
64    * prior to the {@code exchange()} in each thread
65    * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
66    * those subsequent to a return from the corresponding {@code exchange()}
67    * in the other thread.
68    *
69    * @since 1.5
70    * @author Doug Lea and Bill Scherer and Michael Scott
71    */
 
72    public class Exchanger {
73    private final Object lock = new Object();
74   
75    /** Holder for the item being exchanged */
76    private Object item;
77   
78    /**
79    * Arrival count transitions from 0 to 1 to 2 then back to 0
80    * during an exchange.
81    */
82    private int arrivalCount;
83   
84    /**
85    * Main exchange function, handling the different policy variants.
86    */
 
87  16 toggle private Object doExchange(Object x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
88  16 synchronized (lock) {
89  16 Object other;
90  16 long deadline = timed ? Utils.nanoTime() + nanos : 0;
91    // If arrival count already at two, we must wait for
92    // a previous pair to finish and reset the count;
93  18 while (arrivalCount == 2) {
94  2 if (!timed)
95  1 lock.wait();
96  1 else if (nanos > 0) {
97  1 TimeUnit.NANOSECONDS.timedWait(lock, nanos);
98  1 nanos = deadline - Utils.nanoTime();
99    }
100    else
101  0 throw new TimeoutException();
102    }
103   
104  16 int count = ++arrivalCount;
105   
106    // If item is already waiting, replace it and signal other thread
107  16 if (count == 2) {
108  6 other = item;
109  6 item = x;
110  6 lock.notifyAll();
111  6 return other;
112    }
113   
114    // Otherwise, set item and wait for another thread to
115    // replace it and signal us.
116   
117  10 item = x;
118  10 InterruptedException interrupted = null;
119  10 try {
120  17 while (arrivalCount != 2) {
121  11 if (!timed)
122  6 lock.wait();
123  5 else if (nanos > 0) {
124  4 TimeUnit.NANOSECONDS.timedWait(lock, nanos);
125  3 nanos = deadline - Utils.nanoTime();
126    }
127    else
128  1 break; // timed out
129    }
130    } catch (InterruptedException ie) {
131  3 interrupted = ie;
132    }
133   
134    // Get and reset item and count after the wait.
135    // (We need to do this even if wait was aborted.)
136  10 other = item;
137  10 item = null;
138  10 count = arrivalCount;
139  10 arrivalCount = 0;
140  10 lock.notifyAll();
141   
142    // If the other thread replaced item, then we must
143    // continue even if cancelled.
144  10 if (count == 2) {
145  6 if (interrupted != null)
146  0 Thread.currentThread().interrupt();
147  6 return other;
148    }
149   
150    // If no one is waiting for us, we can back out
151  4 if (interrupted != null)
152  3 throw interrupted;
153    else // must be timeout
154  1 throw new TimeoutException();
155    }
156    }
157   
158    /**
159    * Creates a new Exchanger.
160    **/
 
161  6 toggle public Exchanger() {
162    }
163   
164    /**
165    * Waits for another thread to arrive at this exchange point (unless
166    * it is {@link Thread#interrupt interrupted}),
167    * and then transfers the given object to it, receiving its object
168    * in return.
169    *
170    * <p>If another thread is already waiting at the exchange point then
171    * it is resumed for thread scheduling purposes and receives the object
172    * passed in by the current thread. The current thread returns immediately,
173    * receiving the object passed to the exchange by that other thread.
174    *
175    * <p>If no other thread is already waiting at the exchange then the
176    * current thread is disabled for thread scheduling purposes and lies
177    * dormant until one of two things happens:
178    * <ul>
179    * <li>Some other thread enters the exchange; or
180    * <li>Some other thread {@link Thread#interrupt interrupts} the current
181    * thread.
182    * </ul>
183    * <p>If the current thread:
184    * <ul>
185    * <li>has its interrupted status set on entry to this method; or
186    * <li>is {@link Thread#interrupt interrupted} while waiting
187    * for the exchange,
188    * </ul>
189    * then {@link InterruptedException} is thrown and the current thread's
190    * interrupted status is cleared.
191    *
192    * @param x the object to exchange
193    * @return the object provided by the other thread
194    * @throws InterruptedException if the current thread was
195    * interrupted while waiting
196    */
 
197  10 toggle public Object exchange(Object x) throws InterruptedException {
198  10 try {
199  10 return doExchange(x, false, 0);
200    } catch (TimeoutException cannotHappen) {
201  0 throw new Error(cannotHappen);
202    }
203    }
204   
205    /**
206    * Waits for another thread to arrive at this exchange point (unless
207    * the current thread is {@link Thread#interrupt interrupted} or
208    * the specified waiting time elapses), and then transfers the given
209