A
download WaitFreeQueue.java
Language: Java
LOC: 163
Project Info
Lyophilizer
Server: SourceForge
Type: cvs
...wego\cs\dl\util\concurrent\
   Barrier.java
   BoundedBuffer.java
   BoundedChannel.java
   BoundedLinkedQueue.java
   BoundedPriorityQueue.java
   ...enBarrierException.java
   Callable.java
   Channel.java
   ClockDaemon.java
   ConcurrentHashMap.java
   ...rrentReaderHashMap.java
   CondVar.java
   CopyOnWriteArrayList.java
   CopyOnWriteArraySet.java
   CountDown.java
   CyclicBarrier.java
   ...ultChannelCapacity.java
   DirectExecutor.java
   Executor.java
   FIFOReadWriteLock.java
   FIFOSemaphore.java
   FJTask.java
   FJTaskRunner.java
   FJTaskRunnerGroup.java
   FutureResult.java
   Heap.java
   Latch.java
   LayeredSync.java
   LinkedNode.java
   LinkedQueue.java
   LockedExecutor.java
   Mutex.java
   NullSync.java
   ObservableSync.java
   PooledExecutor.java
   PrioritySemaphore.java
   ...yChangeMulticaster.java
   Puttable.java
   QueuedExecutor.java
   QueuedSemaphore.java
   ...renceReadWriteLock.java
   ReadWriteLock.java
   ReentrantLock.java
   ...renceReadWriteLock.java
   Rendezvous.java
   Semaphore.java
   ...eControlledChannel.java
   Slot.java
   Sync.java
   SyncCollection.java
   SynchronizedBoolean.java
   SynchronizedByte.java
   SynchronizedChar.java
   SynchronizedDouble.java
   SynchronizedFloat.java
   SynchronizedInt.java
   SynchronizedLong.java
   SynchronizedRef.java
   SynchronizedShort.java
   SynchronizedVariable.java
   SynchronousChannel.java
   SyncList.java
   SyncMap.java
   SyncSet.java
   SyncSortedMap.java
   SyncSortedSet.java
   Takable.java
   ThreadedExecutor.java
   ThreadFactory.java
   ThreadFactoryUser.java
   TimeDaemon.java
   TimedCallable.java
   TimeoutException.java
   TimeoutSync.java
   ...eChangeMulticaster.java
   WaitableBoolean.java
   WaitableByte.java
   WaitableChar.java
   WaitableDouble.java
   WaitableFloat.java
   WaitableInt.java
   WaitableLong.java
   WaitableRef.java
   WaitableShort.java
   ...referenceSemaphore.java
   WaitFreeQueue.java
   ...renceReadWriteLock.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
/*
  File: WaitFreeQueue.java

  Originally written by Doug Lea and released into the public domain.
  This may be used for any purposes whatsoever without acknowledgment.
  Thanks for the assistance and support of Sun Microsystems Labs,
  and everyone contributing, testing, and using this code.

  History:
  Date       Who                What
  16Jun1998  dl               Create public version
   5Aug1998  dl               replaced int counters with longs
*/

package EDU.oswego.cs.dl.util.concurrent;

/**
 * A wait-free linked list based queue implementation,
 * adapted from the algorithm described in
 * <a
 * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
 * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
 * Algorithms</a> by Maged M. Michael and Michael L. Scott.
 * This implementation is not strictly wait-free since it
 * relies on locking for basic atomicity and visibility requirements.
 * Locks can impose unbounded waits, although this should not 
 * be a major practical concern here since each lock is held
 * for the duration of only a few statements. (However, the
 * overhead of using so many locks can make it less attractive
 * than other Channel implementations on JVMs where locking
 * operations are very slow.)
 * <p>
 * The main advantage of this implementation over 
 * LinkedQueue
 * is that it does not strictly prohibit multiple concurrent
 * puts and/or multiple concurrent takes, but instead retries
 * these actions upon detection of interference.
 * Performance depends in part on the locking and scheduling
 * policies of the Java VM.
 * On at least some VMs, this implementation tends to perform well in
 * producer/consumer applications in which the queue is
 * hardly ever empty for long periods, normally because both the producers
 * and consumers are constantly active, and especially so on
 * multiple-CPU machines. However, it is a poor choice for
 * applications in which there is so much activity that
 * internal contention-based retries predominate computation, or 
 * in which take() may be expected to have to wait
 * for items to appear. The blocking take() operation performs a busy-wait
 * spin loop, which can needlessly eat up CPU time, especially
 * on uniprocessors. It would be a better idea in this case to
 * use an otherwise similar (and usually at least as efficient) 
 * LinkedQueue or BoundedLinkedQueue.
 * @see BoundedLinkedQueue
 * @see LinkedQueue
 * 
 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]

 **/

public class WaitFreeQueue implements Channel {

  /**
   * Node class for linked list.
   * <p>
   * This is a faithful translation of Michael/Scott version. But
   * there are two accommodations to Java that
   * lead to a certain amount of ugliness. (Well,
   * a lot of ugliness.) 
   * <ul>
   *   <li> Since there is no atomic double-word read, compare,
   *    or compare-and-swap in java, all of these are
   *    simulated by synchronized blocks and methods.
   *
   *   <li> Their (ptr, count) `pointer' fields are expanded out
   *    into the Node class since operations should be synched
   *    on each Node anyway.
   * </ul>
   **/

  protected final static class Node { // list nodes
    protected Object  value; 
    protected Node    next = null;
    protected long     count = 0;     // version number of pointer
    /** Make a new node with indicated item, and null link **/
    protected Node(Object x) { value = x; }

    /** atomic equality test of versioned pointer **/
    protected final synchronized boolean pointerEquals(Node assumedNext, 
                                                       long  assumedCount) {
      return assumedNext == next && assumedCount == count;
    }
    
    /** simulated double-compare-and-swap **/
    protected final synchronized boolean commit(Node assumedNext, 
                                                long assumedCount,
                                                Node newNext,
                                                long newCount) {
      boolean success = (next == assumedNext && count == assumedCount);
      if (success) {  next = newNext; count = newCount; }
      return success;
    }

  }

  /**
   * head_ and tail_ are used only as counted pointers,
   * not as nodes. They intially both point to a dummy empty node.
   **/
  protected final Node head_;
  protected final Node tail_;

  public WaitFreeQueue() {
    Node dummy = new Node(null);
    head_ = new Node(null);
    tail_ = new Node(null);
    head_.next = dummy;
    tail_.next = dummy;
  }

  protected void insert(Object x) throws InterruptedException {
    Node node = new Node(x);

    for (;;) {

      if (Thread.interrupted()) throw new InterruptedException();

      // Atomically read tail
      Node tailDotNext;
      long tailDotCount;
      synchronized(tail_) { 
        tailDotNext = tail_.next; 
        tailDotCount = tail_.count;  
      }

      // Atomically read last (tail_.next)
      Node lastDotNext;
      long lastDotCount;
      synchronized(tailDotNext) { 
        lastDotNext = tailDotNext.next; 
        lastDotCount = tailDotNext.count;  
      }

      // only proceed if tail unchanged since read last
      if (tail_.pointerEquals(tailDotNext, tailDotCount)) { 

        if (lastDotNext == null) { // a spot is available to insert node
          if (tailDotNext.commit(lastDotNext, lastDotCount, 
                                 node, lastDotCount+1)) {

            tail_.commit(tailDotNext, tailDotCount, 
                         node, tailDotCount+1);
            return;
          }

        }
        else { // help out and retry
          tail_.commit(tailDotNext, tailDotCount, 
                       lastDotNext, tailDotCount+1);
        }
      }
    }
  }

  protected Object extract() throws InterruptedException {  
    for (;;) {

      if (Thread.interrupted()) throw new InterruptedException();

      // atomically read head, tail
      Node headDotNext;
      long headDotCount;
      synchronized(head_) { 
        headDotNext = head_.next; 
        headDotCount = head_.count;  
      }

      Node tailDotNext;
      long tailDotCount;
      synchronized(tail_) { 
        tailDotNext = tail_.next; 
        tailDotCount = tail_.count; 
      }

      Node first = headDotNext.next;

      // only proceed if head still same after reading tail
      if (head_.pointerEquals(headDotNext, headDotCount)) {

        if (headDotNext == tailDotNext) { 
          if (first == null) { // empty
            return null;
          }
          else {              // being updated
            tail_.commit(tailDotNext, tailDotCount, 
                         first, tailDotCount+1);
          }
        }
        else {                // valid
          Object x = first.value;
          if (head_.commit(headDotNext, headDotCount, 
                           first, headDotCount+1)) {
            first.value = null;
            return x;
          }
        }
      }
    }
  }

  /**
   * Spin until poll returns a non-null value.
   * A Thread.sleep(0) is performed on each iteration
   * as a heuristic to reduce contention. If you would
   * rather use, for example, an exponential backoff, 
   * you could manually set this up using poll. 
   **/
  public Object take() throws InterruptedException {

    for(;;) {
      Object x = extract();
      if (x != null)
        return x;
      else
        Thread.sleep(0);
    }
  }

  /**
   * Spin until poll returns a non-null value or time elapses.
   * if msecs is positive, a Thread.sleep(0) is performed on each iteration
   * as a heuristic to reduce contention.
   **/
  public Object poll(long msecs) throws InterruptedException {
    Object x = extract();
    if (x != null || msecs <= 0)
      return x;
    else {
      long startTime = System.currentTimeMillis();
      Thread.sleep(0);
      for(;;) {
        x = extract();
        if (x != null)
          return x;
        else if (System.currentTimeMillis() - startTime >= msecs)
          return null;
        else
          Thread.sleep(0);
      }
    }
  }

  public void put(Object x)  throws InterruptedException {
    if (x == null) throw new IllegalArgumentException();
    insert(x); 
  }

  public boolean offer(Object x, long msecs) throws InterruptedException { 
    if (x == null) throw new IllegalArgumentException();
    insert(x);
    return true;
  }


  public Object peek() {  

    // a simplified version of extract; still needs retries in case of updates

    for (;;) {

      if (Thread.interrupted()) return null;

      // atomically read head, tail
      Node headDotNext;
      long headDotCount;
      synchronized(head_) { 
        headDotNext = head_.next; 
        headDotCount = head_.count;  
      }

      Node tailDotNext;
      long tailDotCount;
      synchronized(tail_) { 
        tailDotNext = tail_.next; 
        tailDotCount = tail_.count; 
      }

      Node first = headDotNext.next;

      // only proceed if head still same after reading tail
      if (head_.pointerEquals(headDotNext, headDotCount)) {

        if (headDotNext == tailDotNext) { 
          if (first == null) { // empty
            return null;
          }
        }
        else {                // valid
          return first.value;
        }
      }
    }
  }

}


About Koders | Resources | Downloads | Support | Black Duck | Terms of Service | DMCA | Privacy Policy | Contact Us