|
1 """A multi-producer, multi-consumer queue.""" |
|
2 |
|
3 from time import time as _time |
|
4 from collections import deque |
|
5 import heapq |
|
6 |
|
7 __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] |
|
8 |
|
9 class Empty(Exception): |
|
10 "Exception raised by Queue.get(block=0)/get_nowait()." |
|
11 pass |
|
12 |
|
13 class Full(Exception): |
|
14 "Exception raised by Queue.put(block=0)/put_nowait()." |
|
15 pass |
|
16 |
|
17 class Queue: |
|
18 """Create a queue object with a given maximum size. |
|
19 |
|
20 If maxsize is <= 0, the queue size is infinite. |
|
21 """ |
|
22 def __init__(self, maxsize=0): |
|
23 try: |
|
24 import threading |
|
25 except ImportError: |
|
26 import dummy_threading as threading |
|
27 self.maxsize = maxsize |
|
28 self._init(maxsize) |
|
29 # mutex must be held whenever the queue is mutating. All methods |
|
30 # that acquire mutex must release it before returning. mutex |
|
31 # is shared between the three conditions, so acquiring and |
|
32 # releasing the conditions also acquires and releases mutex. |
|
33 self.mutex = threading.Lock() |
|
34 # Notify not_empty whenever an item is added to the queue; a |
|
35 # thread waiting to get is notified then. |
|
36 self.not_empty = threading.Condition(self.mutex) |
|
37 # Notify not_full whenever an item is removed from the queue; |
|
38 # a thread waiting to put is notified then. |
|
39 self.not_full = threading.Condition(self.mutex) |
|
40 # Notify all_tasks_done whenever the number of unfinished tasks |
|
41 # drops to zero; thread waiting to join() is notified to resume |
|
42 self.all_tasks_done = threading.Condition(self.mutex) |
|
43 self.unfinished_tasks = 0 |
|
44 |
|
45 def task_done(self): |
|
46 """Indicate that a formerly enqueued task is complete. |
|
47 |
|
48 Used by Queue consumer threads. For each get() used to fetch a task, |
|
49 a subsequent call to task_done() tells the queue that the processing |
|
50 on the task is complete. |
|
51 |
|
52 If a join() is currently blocking, it will resume when all items |
|
53 have been processed (meaning that a task_done() call was received |
|
54 for every item that had been put() into the queue). |
|
55 |
|
56 Raises a ValueError if called more times than there were items |
|
57 placed in the queue. |
|
58 """ |
|
59 self.all_tasks_done.acquire() |
|
60 try: |
|
61 unfinished = self.unfinished_tasks - 1 |
|
62 if unfinished <= 0: |
|
63 if unfinished < 0: |
|
64 raise ValueError('task_done() called too many times') |
|
65 self.all_tasks_done.notify_all() |
|
66 self.unfinished_tasks = unfinished |
|
67 finally: |
|
68 self.all_tasks_done.release() |
|
69 |
|
70 def join(self): |
|
71 """Blocks until all items in the Queue have been gotten and processed. |
|
72 |
|
73 The count of unfinished tasks goes up whenever an item is added to the |
|
74 queue. The count goes down whenever a consumer thread calls task_done() |
|
75 to indicate the item was retrieved and all work on it is complete. |
|
76 |
|
77 When the count of unfinished tasks drops to zero, join() unblocks. |
|
78 """ |
|
79 self.all_tasks_done.acquire() |
|
80 try: |
|
81 while self.unfinished_tasks: |
|
82 self.all_tasks_done.wait() |
|
83 finally: |
|
84 self.all_tasks_done.release() |
|
85 |
|
86 def qsize(self): |
|
87 """Return the approximate size of the queue (not reliable!).""" |
|
88 self.mutex.acquire() |
|
89 n = self._qsize() |
|
90 self.mutex.release() |
|
91 return n |
|
92 |
|
93 def empty(self): |
|
94 """Return True if the queue is empty, False otherwise (not reliable!).""" |
|
95 self.mutex.acquire() |
|
96 n = not self._qsize() |
|
97 self.mutex.release() |
|
98 return n |
|
99 |
|
100 def full(self): |
|
101 """Return True if the queue is full, False otherwise (not reliable!).""" |
|
102 self.mutex.acquire() |
|
103 n = 0 < self.maxsize == self._qsize() |
|
104 self.mutex.release() |
|
105 return n |
|
106 |
|
107 def put(self, item, block=True, timeout=None): |
|
108 """Put an item into the queue. |
|
109 |
|
110 If optional args 'block' is true and 'timeout' is None (the default), |
|
111 block if necessary until a free slot is available. If 'timeout' is |
|
112 a positive number, it blocks at most 'timeout' seconds and raises |
|
113 the Full exception if no free slot was available within that time. |
|
114 Otherwise ('block' is false), put an item on the queue if a free slot |
|
115 is immediately available, else raise the Full exception ('timeout' |
|
116 is ignored in that case). |
|
117 """ |
|
118 self.not_full.acquire() |
|
119 try: |
|
120 if self.maxsize > 0: |
|
121 if not block: |
|
122 if self._qsize() == self.maxsize: |
|
123 raise Full |
|
124 elif timeout is None: |
|
125 while self._qsize() == self.maxsize: |
|
126 self.not_full.wait() |
|
127 elif timeout < 0: |
|
128 raise ValueError("'timeout' must be a positive number") |
|
129 else: |
|
130 endtime = _time() + timeout |
|
131 while self._qsize() == self.maxsize: |
|
132 remaining = endtime - _time() |
|
133 if remaining <= 0.0: |
|
134 raise Full |
|
135 self.not_full.wait(remaining) |
|
136 self._put(item) |
|
137 self.unfinished_tasks += 1 |
|
138 self.not_empty.notify() |
|
139 finally: |
|
140 self.not_full.release() |
|
141 |
|
142 def put_nowait(self, item): |
|
143 """Put an item into the queue without blocking. |
|
144 |
|
145 Only enqueue the item if a free slot is immediately available. |
|
146 Otherwise raise the Full exception. |
|
147 """ |
|
148 return self.put(item, False) |
|
149 |
|
150 def get(self, block=True, timeout=None): |
|
151 """Remove and return an item from the queue. |
|
152 |
|
153 If optional args 'block' is true and 'timeout' is None (the default), |
|
154 block if necessary until an item is available. If 'timeout' is |
|
155 a positive number, it blocks at most 'timeout' seconds and raises |
|
156 the Empty exception if no item was available within that time. |
|
157 Otherwise ('block' is false), return an item if one is immediately |
|
158 available, else raise the Empty exception ('timeout' is ignored |
|
159 in that case). |
|
160 """ |
|
161 self.not_empty.acquire() |
|
162 try: |
|
163 if not block: |
|
164 if not self._qsize(): |
|
165 raise Empty |
|
166 elif timeout is None: |
|
167 while not self._qsize(): |
|
168 self.not_empty.wait() |
|
169 elif timeout < 0: |
|
170 raise ValueError("'timeout' must be a positive number") |
|
171 else: |
|
172 endtime = _time() + timeout |
|
173 while not self._qsize(): |
|
174 remaining = endtime - _time() |
|
175 if remaining <= 0.0: |
|
176 raise Empty |
|
177 self.not_empty.wait(remaining) |
|
178 item = self._get() |
|
179 self.not_full.notify() |
|
180 return item |
|
181 finally: |
|
182 self.not_empty.release() |
|
183 |
|
184 def get_nowait(self): |
|
185 """Remove and return an item from the queue without blocking. |
|
186 |
|
187 Only get an item if one is immediately available. Otherwise |
|
188 raise the Empty exception. |
|
189 """ |
|
190 return self.get(False) |
|
191 |
|
192 # Override these methods to implement other queue organizations |
|
193 # (e.g. stack or priority queue). |
|
194 # These will only be called with appropriate locks held |
|
195 |
|
196 # Initialize the queue representation |
|
197 def _init(self, maxsize): |
|
198 self.queue = deque() |
|
199 |
|
200 def _qsize(self, len=len): |
|
201 return len(self.queue) |
|
202 |
|
203 # Put a new item in the queue |
|
204 def _put(self, item): |
|
205 self.queue.append(item) |
|
206 |
|
207 # Get an item from the queue |
|
208 def _get(self): |
|
209 return self.queue.popleft() |
|
210 |
|
211 |
|
212 class PriorityQueue(Queue): |
|
213 '''Variant of Queue that retrieves open entries in priority order (lowest first). |
|
214 |
|
215 Entries are typically tuples of the form: (priority number, data). |
|
216 ''' |
|
217 |
|
218 def _init(self, maxsize): |
|
219 self.queue = [] |
|
220 |
|
221 def _qsize(self, len=len): |
|
222 return len(self.queue) |
|
223 |
|
224 def _put(self, item, heappush=heapq.heappush): |
|
225 heappush(self.queue, item) |
|
226 |
|
227 def _get(self, heappop=heapq.heappop): |
|
228 return heappop(self.queue) |
|
229 |
|
230 |
|
231 class LifoQueue(Queue): |
|
232 '''Variant of Queue that retrieves most recently added entries first.''' |
|
233 |
|
234 def _init(self, maxsize): |
|
235 self.queue = [] |
|
236 |
|
237 def _qsize(self, len=len): |
|
238 return len(self.queue) |
|
239 |
|
240 def _put(self, item): |
|
241 self.queue.append(item) |
|
242 |
|
243 def _get(self): |
|
244 return self.queue.pop() |