Своевременная очередь потребителя/производителя

Я реализовал очередь с приоритетом потребителя/производителя, где приоритетом является отметка времени, указывающая, когда товар должен быть доставлен. Это работает очень хорошо, но я хотел бы знать, есть ли у кого-нибудь лучшая идея реализовать это или комментарии по поводу текущей реализации.

Код находится на Python. Один поток создается для своевременного пробуждения ожидающих потребителей. Я знаю, что это антишаблон для создания потока в библиотеке, но я не мог придумать другого метода.

Вот код:

import collections
import heapq
import threading
import time

class TimelyQueue(threading.Thread):
    """
    Implements a similar but stripped down interface of Queue which
    delivers items on time only.
    """

    class Locker:
        def __init__(self, lock):
            self.l = lock
        def __enter__(self):
            self.l.acquire()
            return self.l
        def __exit__(self, type, value, traceback):
            self.l.release()

    # Optimization to avoid wasting CPU cycles when something
    # is about to happen in less than 5 ms.
    _RESOLUTION = 0.005

    def __init__(self):
        threading.Thread.__init__(self)
        self.daemon = True
        self.queue = []
        self.triggered = collections.deque()
        self.putcond = threading.Condition()
        self.getcond = threading.Condition()
        # Optimization to avoid waking the thread uselessly.
        self.putwaketime = 0

    def put(self, when, item):
        with self.Locker(self.putcond):
            heapq.heappush(self.queue, (when, item))
            if when < self.putwaketime or self.putwaketime == 0:
                self.putcond.notify()

    def get(self, timeout=None):
        with self.Locker(self.getcond):
            if len(self.triggered) > 0:
                when, item = self.triggered.popleft()
                return item
                self.getcond.wait(timeout)
            try:
                when, item = self.triggered.popleft()
            except IndexError:
                return None
            return item

    def qsize(self):
        with self.Locker(self.putcond):
            return len(self.queue)

    def run(self):
        with self.Locker(self.putcond):
            maxwait = None
            while True:
                curtime = time.time()
                try:
                    when, item = self.queue[0]
                    maxwait = when - curtime
                    self.putwaketime = when
                except IndexError:
                    maxwait = None
                    self.putwaketime = 0
                self.putcond.wait(maxwait)

                curtime = time.time()
                while True:
                    # Don't dequeue now, we are not sure to use it yet.
                    try:
                        when, item = self.queue[0]
                    except IndexError:
                        break
                    if when > curtime + self._RESOLUTION:
                        break

                    self.triggered.append(heapq.heappop(self.queue))
                if len(self.triggered) > 0:
                    with self.Locker(self.getcond):
                        self.getcond.notify()


if __name__ == "__main__":
    q = TimelyQueue()
    q.start()

    N = 50000
    t0 = time.time()
    for i in range(N):
        q.put(time.time() + 2, i)
    dt = time.time() - t0
    print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
    t0 = time.time()
    i = 0
    while i < N:
        a = q.get(3)
        if i == 0:
            dt = time.time() - t0
            print "start get after %.3fs" % dt
            t0 = time.time()
        i += 1
    dt = time.time() - t0
    print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)

person Jeremie Le Hen    schedule 05.12.2012    source источник
comment
Я мог бы сделать присутствие потока менее явным, чтобы объект выглядел как Queue, а не Thread. Кроме того, почему вы строите менеджер контекста Locker вокруг Condition? См. документы.   -  person abarnert    schedule 06.12.2012
comment
@abarnert Хорошая идея для темы, я создам ее в конструкторе. Что касается Locker, я думаю, что никогда не читал документацию так далеко :). Спасибо за подсказку!   -  person Jeremie Le Hen    schedule 06.12.2012
comment
Это должна быть реализация для одного потребителя? Если нет, вы можете рассмотреть notifyAll, потому что может быть несколько записей, которые должны быть выполнены одновременно. Что еще более важно, вам, вероятно, нужны многопоточные модульные тесты.   -  person abarnert    schedule 06.12.2012
comment
@abarnert Нет, несколько потребителей. Вы правы это баг. Я написал внешний многопоточный тестовый модуль, но он был слишком большим, чтобы его можно было встроить в файл, а код был таким уродливым, что мне было стыдно его публиковать.   -  person Jeremie Le Hen    schedule 06.12.2012
comment
Один последний комментарий не совсем по теме: вы можете просмотреть исходный код для Queue.py и модульные тесты для него в стандартной библиотеке, просто чтобы убедиться, что вы не пропустили ничего важного. И посмотрите, можете ли вы на самом деле построить это как оболочку вокруг Queue (например, стандартного PriorityQueue), а не с нуля, чтобы вы могли получить некоторые дополнительные функции бесплатно (в основном maxsize, что очень сложно сделать правильно) — хотя, как всегда, помните о ЯГНИ; может оно того не стоит.   -  person abarnert    schedule 06.12.2012
comment
Ну, для реализации, а не для модульных тестов, я уже посмотрел на это на самом деле. Сначала я хотел вывести Queue, но мне все равно пришлось бы переписывать методы put() и get(), поэтому я выбрал урезанный интерфейс. Я мог бы   -  person Jeremie Le Hen    schedule 07.12.2012
comment
В связи с этим, если бы нам нужен был совместимый интерфейс с Queue, переход на утиную типизацию был бы гораздо проще, чем наследование.   -  person Jeremie Le Hen    schedule 07.12.2012


Ответы (2)


Единственное, для чего вам действительно нужен фоновый поток, — это таймер, чтобы выгнать официантов, когда он закончится, верно?

Во-первых, вы можете реализовать это с помощью threading.Timer вместо явного фонового потока. Но, хотя это может быть проще, на самом деле это не решит проблему, связанную с тем, что вы создаете поток за спиной пользователя, хотят они того или нет. Кроме того, с threading.Timer вы фактически запускаете новый поток каждый раз, когда перезапускаете таймер, что может быть проблемой производительности. (У вас есть только один за раз, но тем не менее запуск и остановка потоков не бесплатны.)

Если вы посмотрите на модули PyPI, рецепты ActiveState и различные фреймворки, то увидите множество реализаций, позволяющих запускать несколько таймеров в одном фоновом потоке. Это решило бы вашу проблему.

Но это все еще не идеальное решение. Например, предположим, что моему приложению требуется 20 объектов TimelyQueue или TimelyQueue плюс еще 19 объектов, для которых нужны таймеры. Я бы все равно закончил с 20 потоками. Или, скажем, я создаю сервер сокетов или приложение с графическим интерфейсом (два наиболее очевидных варианта использования вашего TimelyQueue; я могу реализовать таймер поверх моего цикла событий (или, скорее всего, просто использовать таймер, который поставляется с фреймворк), так зачем мне вообще нужен поток?

Выход из этого состоит в том, чтобы предложить хук для питания любой фабрики таймеров:

def __init__(self, timerfactory = threading.Timer):
    self.timerfactory = timerfactory
    ...

Теперь, когда вам нужно настроить таймер:

if when < self.waketime:
    self.timer.cancel()
    self.timer = self.timerfactory(when - now(), self.timercallback)
    self.waketime = when

Для быстрых и грязных случаев использования этого было бы достаточно из коробки. Но если я, например, использую twisted, я могу просто используйте TimelyQueue(twisted.reactor.callLater), и теперь таймеры очереди проходят через цикл событий twisted. Или, если у меня есть реализация с несколькими таймерами и одним потоком, которую я использую в другом месте, TimelyQueue(multiTimer.add), и теперь таймеры очереди работают в том же потоке, что и все другие мои таймеры.

Если вы хотите, вы можете указать лучшее значение по умолчанию, чем threading.Timer, но на самом деле, я думаю, что большинство людей, которым нужно что-то лучше, чем threading.Timer, смогут предоставить что-то, что лучше для их конкретного приложения, чем то, что вы предоставляете.

Конечно, не каждая реализация таймера имеет тот же API, что и threading.Timer, хотя вы будете удивлены, узнав, сколько из них это делает. Но не так сложно написать адаптер, если у вас есть таймер, который вы хотите использовать с TimelyQueue, но у него неправильный интерфейс. Например, если я создаю приложение PyQt4/PySide, QTimer не имеет метода cancel и принимает мс вместо секунд, поэтому мне пришлось бы сделать что-то вроде этого:

class AdaptedQTimer(object):
    def __init__(self, timeout, callback):
        self.timer = QTimer.singleShot(timeout * 1000, callback)
    def cancel(self):
        self.timer.stop()

q = TimelyQueue(AdaptedQTimer)

Или, если бы я хотел более непосредственно интегрировать очередь в QObject, я мог бы обернуть QObject.startTimer() и сделать так, чтобы мой метод timerEvent(self) вызывал обратный вызов.

После того, как вы подумываете об адаптерах, последняя идея. Я не думаю, что это того стоит, но, возможно, стоит подумать. Если ваш таймер принимает метку времени, а не дельту времени, и имеет метод adjust, а не cancel, и содержит собственное waketime, ваша реализация TimelyQueue может быть проще и, возможно, более эффективной. В put у вас есть что-то вроде этого:

if self.timer is None:
    self.timer = self.timerfactory(when)
elif when < self.timer.waketime:
    self.timer.adjust(when)

Конечно, большинство таймеров не предоставляют этот интерфейс. Но если у кого-то он есть или он хочет его изготовить, он может получить преимущества. А для всех остальных вы можете предоставить простой адаптер, который превращает таймер в стиле threading.Timer в нужный вам вид, например:

def timerFactoryAdapter(threadingStyleTimerFactory):
    class TimerFactory(object):
        def __init__(self, timestamp, callback):
            self.timer = threadingStyleTimerFactory(timestamp - now(), callback)
            self.callback = callback
        def cancel(self):
            return self.timer.cancel()
        def adjust(self, timestamp):
            self.timer.cancel()
            self.timer = threadingStyleTimerFactory(timestamp - now(), self.callback)
person abarnert    schedule 05.12.2012
comment
Это решение звучит неплохо, но я беспокоюсь о стоимости создания потока каждый раз, когда мне нужно включить таймер. Не было бы более разумным предоставить мой собственный метод запроса на завершение, который должен был бы вызываться вызывающей стороной при необходимости? - person Jeremie Le Hen; 09.12.2012
comment
@JeremieLeHen: я не уверен, что понимаю ваш вопрос. Но позвольте мне переформулировать идею в ответе и посмотреть, будет ли это иметь больше смысла. - person abarnert; 10.12.2012
comment
На самом деле, теперь, когда я перечитал ваш вопрос, я думаю, что, возможно, то, о чем вы говорите, является фабрикой таймеров, и я просто не понял вашей терминологии (и наоборот). Если да, то прошу прощения. Если нет, и если то, что я говорю, все еще не делает того, что вы хотите, пожалуйста, уточните. - person abarnert; 10.12.2012

Для справки, я реализовал то, что вы предложили, используя фабрику таймеров. Я провел небольшой тест, используя версию выше и новую версию, используя класс threading.Timer:

  1. Первая реализация

    • При разрешении по умолчанию (5 мс, то есть все, что находится в окне 5 мс, запускается вместе), достигается около 88k put()/сек и 69k get()/сек.

    • При разрешении, установленном на 0 мс (без оптимизации), достигается около 88 тыс. put()/сек и 55 тыс. get()/сек.

  2. Вторая реализация

    • При разрешении по умолчанию (5 мс) достигается около 88k put()/сек и 65k get()/сек.

    • При разрешении, установленном на 0 мс, достигается около 88k put()/сек и 62k get()/сек.

Признаюсь, я удивлен, что вторая реализация работает быстрее без оптимизации разрешения. Сейчас слишком поздно для расследования.

import collections
import heapq
import threading
import time

class TimelyQueue:
    """
    Implements a similar but stripped down interface of Queue which
    delivers items on time only.
    """

    def __init__(self, resolution=5, timerfactory=threading.Timer):
        """
        `resolution' is an optimization to avoid wasting CPU cycles when
        something is about to happen in less than X ms.
        """
        self.resolution = float(resolution) / 1000
        self.timerfactory = timerfactory
        self.queue = []
        self.triggered = collections.deque()
        self.putcond = threading.Condition()
        self.getcond = threading.Condition()
        # Optimization to avoid waking the thread uselessly.
        self.putwaketime = 0
        self.timer = None
        self.terminating = False

    def __arm(self):
        """
        Arm the next timer; putcond must be acquired!
        """
        curtime = time.time()
        when, item = self.queue[0]
        interval = when - curtime
        self.putwaketime = when
        self.timer = self.timerfactory(interval, self.__fire)
        self.timer.start()

    def __fire(self):
        with self.putcond:
            curtime = time.time()
            debug = 0
            while True:
                # Don't dequeue now, we are not sure to use it yet.
                try:
                    when, item = self.queue[0]
                except IndexError:
                    break
                if when > curtime + self.resolution:
                    break

                debug += 1
                self.triggered.append(heapq.heappop(self.queue))
            if len(self.triggered) > 0:
                with self.getcond:
                    self.getcond.notify(len(self.triggered))
            if self.terminating:
                return
            if len(self.queue) > 0:
                self.__arm()

    def put(self, when, item):
        """
        `when' is a Unix time from Epoch.
        """
        with self.putcond:
            heapq.heappush(self.queue, (when, item))
            if when >= self.putwaketime and self.putwaketime != 0:
                return
            # Arm next timer.
            if self.timer is not None:
                self.timer.cancel()
            self.__arm()

    def get(self, timeout=None):
        """
        Timely return the next object on the queue.
        """
        with self.getcond:
            if len(self.triggered) > 0:
                when, item = self.triggered.popleft()
                return item
            self.getcond.wait(timeout)
            try:
                when, item = self.triggered.popleft()
            except IndexError:
                return None
            return item

    def qsize(self):
        """
        Self explanatory.
        """
        with self.putcond:
            return len(self.queue)

    def terminate(self):
        """
        Request the embedded thread to terminate.
        """
        with self.putcond:
            self.terminating = True
            if self.timer is not None:
                self.timer.cancel()
            self.putcond.notifyAll()


if __name__ == "__main__":
    q = TimelyQueue(0)
    N = 100000
    t0 = time.time()
    for i in range(N):
        q.put(time.time() + 2, i)
    dt = time.time() - t0
    print "put done in %.3fs (%.2f put/sec)" % (dt, N / dt)
    t0 = time.time()
    i = 0
    while i < N:
        a = q.get(3)
        if i == 0:
            dt = time.time() - t0
            print "start get after %.3fs" % dt
            t0 = time.time()
        i += 1
    dt = time.time() - t0
    print "get done in %.3fs (%.2f get/sec)" % (dt, N / dt)
    q.terminate()
    # Give change to the thread to exit properly, otherwise we may get
    # a stray interpreter exception.
    time.sleep(0.1)
person Jeremie Le Hen    schedule 13.12.2012