Я реализовал очередь с приоритетом потребителя/производителя, где приоритетом является отметка времени, указывающая, когда товар должен быть доставлен. Это работает очень хорошо, но я хотел бы знать, есть ли у кого-нибудь лучшая идея реализовать это или комментарии по поводу текущей реализации.
Код находится на Python. Один поток создается для своевременного пробуждения ожидающих потребителей. Я знаю, что это антишаблон для создания потока в библиотеке, но я не мог придумать другого метода.
Вот код:
import collections
import heapq
import threading
import time
class TimelyQueue(threading.Thread):
"""
Implements a similar but stripped down interface of Queue which
delivers items on time only.
"""
class Locker:
def __init__(self, lock):
self.l = lock
def __enter__(self):
self.l.acquire()
return self.l
def __exit__(self, type, value, traceback):
self.l.release()
# Optimization to avoid wasting CPU cycles when something
# is about to happen in less than 5 ms.
_RESOLUTION = 0.005
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.queue = []
self.triggered = collections.deque()
self.putcond = threading.Condition()
self.getcond = threading.Condition()
# Optimization to avoid waking the thread uselessly.
self.putwaketime = 0
def put(self, when, item):
with self.Locker(self.putcond):
heapq.heappush(self.queue, (when, item))
if when < self.putwaketime or self.putwaketime == 0:
self.putcond.notify()
def get(self, timeout=None):
with self.Locker(self.getcond):
if len(self.triggered) > 0:
when, item = self.triggered.popleft()
return item
self.getcond.wait(timeout)
try:
when, item = self.triggered.popleft()
except IndexError:
return None
return item
def qsize(self):
with self.Locker(self.putcond):
return len(self.queue)
def run(self):
with self.Locker(self.putcond):
maxwait = None
while True:
curtime = time.time()
try:
when, item = self.queue[0]
maxwait = when - curtime
self.putwaketime = when
except IndexError:
maxwait = None
self.putwaketime = 0
self.putcond.wait(maxwait)
curtime = time.time()
while True:
# Don't dequeue now, we are not sure to use it yet.
try:
when, item = self.queue[0]
except IndexError:
break
if when > curtime + self._RESOLUTION:
break
self.triggered.append(heapq.heappop(self.queue))
if len(self.triggered) > 0:
with self.Locker(self.getcond):
self.getcond.notify()
if __name__ == "__main__":
q = TimelyQueue()
q.start()
N = 50000
t0 = time.time()
for i in range(N):
q.put(time.time() + 2, i)
dt = time.time() - t0
print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
t0 = time.time()
i = 0
while i < N:
a = q.get(3)
if i == 0:
dt = time.time() - t0
print "start get after %.3fs" % dt
t0 = time.time()
i += 1
dt = time.time() - t0
print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
Queue
, а неThread
. Кроме того, почему вы строите менеджер контекстаLocker
вокругCondition
? См. документы. - person abarnert   schedule 06.12.2012Locker
, я думаю, что никогда не читал документацию так далеко :). Спасибо за подсказку! - person Jeremie Le Hen   schedule 06.12.2012notifyAll
, потому что может быть несколько записей, которые должны быть выполнены одновременно. Что еще более важно, вам, вероятно, нужны многопоточные модульные тесты. - person abarnert   schedule 06.12.2012Queue
(например, стандартногоPriorityQueue
), а не с нуля, чтобы вы могли получить некоторые дополнительные функции бесплатно (в основномmaxsize
, что очень сложно сделать правильно) — хотя, как всегда, помните о ЯГНИ; может оно того не стоит. - person abarnert   schedule 06.12.2012Queue
, переход на утиную типизацию был бы гораздо проще, чем наследование. - person Jeremie Le Hen   schedule 07.12.2012