Как вы можете передать итерируемый объект нескольким потребителям в постоянном пространстве?

Как вы можете передать итерируемый объект нескольким потребителям в постоянном пространстве?

TLDR

Напишите реализацию, которая проходит следующий тест в CONSTANT SPACE, рассматривая min, max и sum как черные ящики.

def testit(implementation, N):
    assert implementation(range(N), min, max, sum) == (0, N-1, N*(N-1)//2)

Обсуждение

Мы любим итераторы, потому что они позволяют нам лениво обрабатывать потоки данных, позволяя обрабатывать огромные объемы данных в ПОСТОЯННОМ ПРОСТРАНСТВЕ.

def source_summary(source, summary):
    return summary(source)

N = 10 ** 8
print(source_summary(range(N), min))
print(source_summary(range(N), max))
print(source_summary(range(N), sum))

Каждая строка выполнялась несколько секунд, но использовала очень мало памяти. Однако для этого потребовалось 3 отдельных обхода источника. Таким образом, это не будет работать, если вашим источником является сетевое соединение, оборудование для сбора данных и т. д., если только вы не кэшируете все данные где-то, теряя требование ПОСТОЯННОГО ПРОСТРАНСТВА.

Вот версия, которая демонстрирует эту проблему

def source_summaries(source, *summaries):
    from itertools import tee
    return tuple(map(source_summary, tee(source, len(summaries)),
                                     summaries))

testit(source_summaries, N)
print('OK')

Тест пройден, но tee пришлось сохранить копию всех данных, поэтому использование пространства увеличилось с O(1) до O(N).

Как вы можете получить результаты за один проход с постоянной памятью?

Конечно, можно пройти тест, указанный выше, с использованием O(1) пространства, путем обмана: используя знание конкретных потребителей итераторов, которые использует тест. Но дело не в этом: source_summaries должен работать с любыми расходными материалами для итераторов, такими как set, collections.Counter, ''.join, включая все, что может быть написано в будущем. Реализация должна рассматривать их как черные ящики.

Для ясности: единственное знание, доступное о потребителях, состоит в том, что каждый потребляет одну итерацию и возвращает один результат. Использование любых других знаний о потребителе является обманом.

Идеи

[EDIT: я опубликовал реализацию этой идеи в качестве ответа]

Я могу представить решение (которое мне очень не нравится), которое использует

  • упреждающая потоковая передача

  • пользовательский итератор, связывающий потребителя с источником

Назовем пользовательский итератор link.

  • Для каждого потребителя запустите
result = consumer(<link instance for this thread>)
<link instance for this thread>.set_result(result)

на отдельной ветке.

  • В основной теме что-то вроде
for item in source:
    for l in links:
        l.push(item)

for l in links:
    l.stop()

for thread in threads:
    thread.join()

return tuple(link.get_result, links)
  • link.__next__ блокирует до тех пор, пока экземпляр link не получит

    • .push(item) in which case it returns the item
    • .stop() в этом случае повышается StopIteration
  • Гонки данных выглядят как кошмар. Вам понадобится очередь для push-уведомлений, и, вероятно, дозорный объект должен быть помещен в очередь link.stop() ... и множество других вещей, которые я упускаю из виду.

Я бы предпочел использовать кооперативную многопоточность, но consumer(link) кажется неизбежно некооперативной.

Есть ли у вас менее грязные предложения?


person jacg    schedule 08.04.2020    source источник
comment
Каким черным ящиком должны быть эти функции? Будет ли это вычисление промежуточных результатов, как в вызове reduce? Таким образом, вместо вычисления sum(some_list) вы можете инициализировать tmp = 0, а затем на каждой итерации делать tmp = sum(tmp, current_value). Вы можете сделать это для всех трех операций ( min, max, sum) одновременно и потребуется только один проход по элементам. Единственная проблема состоит в том, чтобы выбрать осмысленное начальное значение tmp для каждой из трех операций.   -  person Daniel Junglas    schedule 08.04.2020
comment
@DanielJunglas Полностью черный ящик. Использование reduce в эквивалентной бинарной функции требует специальных знаний потребителя. Таким образом, это подпадает под «обман», о котором я упоминал в вопросе. Я хочу предоставить (что-то вроде) это как библиотечную утилиту, которую пользователи могут вызывать с любыми потребителями, которых они хотят, включая те, которые не были изобретены сегодня, так что единственное, что я могу знать о потребитель заключается в том, что он потребляет итерацию для получения результата. Все, что помимо этого, является мошенничеством.   -  person jacg    schedule 08.04.2020
comment
Вы делаете это ради упражнения или для реальной библиотеки? В последнем случае, я думаю, имеет смысл расширить вашу функцию, чтобы она принимала инициализатор для tmp в качестве аргумента. Если вы посмотрите на встроенную функцию sum(), то это именно то, что делает эта функция. Вот как вы можете использовать эту функцию для суммирования чисел или объединения списков с одной и той же реализацией. Во всяком случае, это только мои два цента.   -  person Daniel Junglas    schedule 08.04.2020
comment
@DanielJunglas Интерфейс произвольного потребителя итерируемых объектов consumer(iterable): ничего другого. Не имеет значения, что sum, max или любой другой конкретный вариант, который вы имеете в виду, предлагает больше, библиотека может полагаться только на наименьший общий знаменатель. Это верно как в упражнениях, так и в реальном мире: это фундаментальное свойство того, что означает «интерфейс»!   -  person jacg    schedule 08.04.2020
comment
@DanielJunglas Думаю, я могу понять, откуда взялась часть вашего замешательства: возможно, вы думаете, что все потребители перегружены, например max и min: max((1,2)) == max(1,2) [примечание: во втором случае меньше скобок]. max в этом отношении необычно: даже sum (которое вы использовали в первом примере) нельзя использовать таким образом: sum((1,2)) == 3 но sum(1,2) — это ошибка. Так что ваш пример tmp = sum(tmp, current_value) тоже ошибка. Подавляющее большинство попадают в эту категорию: list, tuple, set, dict, collections.Counter, enumerate, partial(map, fn) и т.д. и т.п.   -  person jacg    schedule 09.04.2020
comment
Большое спасибо за объяснение. Мой пример для sum был написан неправильно. Он должен был читаться как sum([current_value], tmp) или sum([current_value, tmp]), что совпадает с sum([current_value, tmp]) или sum((current_value, tmp)). И вы правы, есть много потенциальных потребителей, которые не соответствуют шаблону использования начального значения.   -  person Daniel Junglas    schedule 09.04.2020
comment
Давайте продолжим обсуждение в чате.   -  person Daniel Junglas    schedule 09.04.2020


Ответы (2)


Вот альтернативная реализация вашей идеи. Он использует кооперативную многопоточность. Как вы предложили, ключевым моментом является использование многопоточности и наличие блока метода итераторов __next__ до тех пор, пока все потоки не используют текущую итерацию.

Кроме того, итератор содержит (необязательный) буфер постоянного размера. С помощью этого буфера мы можем считывать исходный код по частям и избегать блокировки/синхронизации.

Моя реализация также обрабатывает случай, когда некоторые потребители прекращают итерацию, не достигнув конца итератора.

import threading

class BufferedMultiIter:
    def __init__(self, source, n, bufsize = 1):
        '''`source` is an iterator or iterable,
        `n` is the number of threads that will interact with this iterator,
        `bufsize` is the size of the internal buffer. The iterator will read
        and buffer elements from `source` in chunks of `bufsize`. The bigger
        the buffer is, the better the performance but also the bigger the
        (constant) space requirement.
        '''
        self._source = iter(source)
        self._n = n
        # Condition variable for synchronization
        self._cond = threading.Condition()
        # Buffered values
        bufsize = max(bufsize, 1)
        self._buffer = [None] * bufsize
        self._buffered = 0
        self._next = threading.local()
        # State variables to implement the "wait for buffer to get refilled"
        # protocol
        self._serial = 0
        self._waiting = 0

        # True if we reached the end of the source
        self._stop = False
        # Was the thread killed (for error handling)?
        self._killed = False

    def _fill_buffer(self):
        '''Refill the internal buffer.'''
        self._buffered = 0
        while self._buffered < len(self._buffer):
            try:
                self._buffer[self._buffered] = next(self._source)
                self._buffered += 1
            except StopIteration:
                self._stop = True
                break
            # Explicitly clear the unused part of the buffer to release
            # references as early as possible
            for i in range(self._buffered, len(self._buffer)):
                self._buffer[i] = None
        self._waiting = 0
        self._serial += 1

    def register_thread(self):
        '''Register a thread.

        Each thread that wants to access this iterator must first register
        with the iterator. It is an error to register the same thread more
        than once. It is an error to access this iterator with a thread that
        was not registered (with the exception of calling `kill`). It is an
        error to register more threads than the number that was passed to the
        constructor.
        '''
        self._next.i = 0

    def unregister_thread(self):
        '''Unregister a thread from this iterator.

        This should be called when a thread is done using the iterator.
        It catches the case in which a consumer does not consume all the
        elements from the iterator but exits early.
        '''
        assert hasattr(self._next, 'i')
        delattr(self._next, 'i')
        with self._cond:
            assert self._n > 0
            self._n -= 1
            if self._waiting == self._n:
                self._fill_buffer()
            self._cond.notify_all()

    def kill(self):
        '''Forcibly kill this iterator.

        This will wake up all threads currently blocked in `__next__` and
        will have them raise a `StopIteration`.
        This function should be called in case of error to terminate all
        threads as fast as possible.
        '''
        self._cond.acquire()
        self._killed = True
        self._stop = True
        self._cond.notify_all()
        self._cond.release()
    def __iter__(self): return self
    def __next__(self):
        if self._next.i == self._buffered:
            # We read everything from the buffer.
            # Wait until all other threads have also consumed the buffer
            # completely and then refill it.
            with self._cond:
                old = self._serial
                self._waiting += 1
                if self._waiting == self._n:
                    self._fill_buffer()
                    self._cond.notify_all()
                else:
                    # Wait until the serial number changes. A change in
                    # serial number indicates that another thread has filled
                    # the buffer
                    while self._serial == old and not self._killed:
                        self._cond.wait()
            # Start at beginning of newly filled buffer
            self._next.i = 0

        if self._killed:
            raise StopIteration
        k = self._next.i
        if k == self._buffered and self._stop:
            raise StopIteration
        value = self._buffer[k]
        self._next.i = k + 1
        return value

class NotAll:
    '''A consumer that does not consume all the elements from the source.'''
    def __init__(self, limit):
        self._limit = limit
        self._consumed = 0
    def __call__(self, it):
        last = None
        for k in it:
            last = k
            self._consumed += 1
            if self._consumed >= self._limit:
                break
        return last

def multi_iter(iterable, *consumers, **kwargs):
    '''Iterate using multiple consumers.

    Each value in `iterable` is presented to each of the `consumers`.
    The function returns a tuple with the results of all `consumers`.

    There is an optional `bufsize` argument. This controls the internal
    buffer size. The bigger the buffer, the better the performance, but also
    the bigger the (constant) space requirement of the operation.

    NOTE: This will spawn a new thread for each consumer! The iteration is
    multi-threaded and happens in parallel for each element.
    '''
    n = len(consumers)
    it = BufferedMultiIter(iterable, n, kwargs.get('bufsize', 1))
    threads = list() # List with **running** threads
    result = [None] * n
    def thread_func(i, c):
        it.register_thread()
        result[i] = c(it)
        it.unregister_thread()
    try:
        for c in consumers:
            t = threading.Thread(target = thread_func, args = (len(threads), c))
            t.start()
            threads.append(t)
    except:
        # Here we should forcibly kill all the threads but there is not
        # t.kill() function or similar. So the best we can do is stop the
        # iterator
        it.kill()
    finally:
        while len(threads) > 0:
            t = threads.pop(-1)
            t.join()
    return tuple(result)

from time import time
N = 10 ** 7
notall1 = NotAll(1)
notall1000 = NotAll(1000)
start1 = time()
res1 = (min(range(N)), max(range(N)), sum(range(N)), NotAll(1)(range(N)),
        NotAll(1000)(range(N)))
stop1 = time()
print('5 iterators: %s %.2f' % (str(res1), stop1 - start1))

for p in range(5):
    start2 = time()
    res2 = multi_iter(range(N), min, max, sum, NotAll(1), NotAll(1000),
                      bufsize = 2**p)
    stop2 = time()
    print('multi_iter%d: %s %.2f' % (p, str(res2), stop2 - start2))

Тайминги снова ужасны, но вы можете видеть, как использование буфера постоянного размера значительно улучшает ситуацию:

5 iterators: (0, 9999999, 49999995000000, 0, 999) 0.71
multi_iter0: (0, 9999999, 49999995000000, 0, 999) 342.36
multi_iter1: (0, 9999999, 49999995000000, 0, 999) 264.71
multi_iter2: (0, 9999999, 49999995000000, 0, 999) 151.06
multi_iter3: (0, 9999999, 49999995000000, 0, 999) 95.79
multi_iter4: (0, 9999999, 49999995000000, 0, 999) 72.79

Возможно, это может послужить источником идей для хорошей реализации.

person Daniel Junglas    schedule 09.04.2020
comment
Спасибо! Вы меня очень взволновали, когда упомянули совместную поточность... но я могу найти только упреждающий :-( [Вы все еще используете модуль threading: это упреждающий; и я не могу найти выходы или asyncio, или что-то еще, что выглядит так, как будто оно может реализовать совместную многозадачность.] Мне нравится идея отмены регистрации потока для решения проблемы раннего завершения.Спасибо за тайминги буфера, хотя я удивлен тем, насколько он медленнее, чем мой... без профилирования, я думаю, в резюме больше блокировок, чем в очередях, но я оставлю это на другой день. - person jacg; 10.04.2020
comment
Некоторые измерения: с тривиальным источником и потребителями (range, min, max, sum), но реализация слегка замедляется из-за инструментовки, мои очереди достигают максимума примерно в 6500 элементов каждая, асимптотически приближаясь к этому значению по мере роста длины источника. Таким образом, при максимальном размере буфера 4, который вы рассчитали по времени, вероятно, будет много блокировок. Если я ввожу sleep(0.001) (1 миллисекунду) за элемент в исходный код, моя реализация будет на 1% медленнее, чем простая итерация, а длина очереди почти никогда не превышает 1; Задержка 0,1 мс -> замедление 20%, длина очереди ‹ 10. - person jacg; 10.04.2020
comment
Я думаю, это зависит от точного определения кооператива. В моем коде поток всегда выполняется до тех пор, пока не будет прочитан полный буфер. В этот момент он логически блокируется и добровольно отказывается от ЦП (кооперативный). Таким образом, на уровне Python он никогда не вытесняется. В любом случае, это не то, что вы искали. Но есть еще кое-что, что я заметил в вашем подходе к организации очередей: что, если у вас больше потоков, чем процессоров, и есть один поток с более низким приоритетом, чем все остальные? Этот поток будет запускаться только после завершения всех остальных, и очередь для этого потока будет буферизовать всю последовательность? - person Daniel Junglas; 14.04.2020
comment
Где точно поток добровольно отказывается от ЦП? Вы путаете выпуск резюме с отказом от ЦП? threading упреждающе вырвет ЦП из-под потока, который заблокировал CV, другие потоки, которым нужна эта блокировка, ничего не будут делать с отведенным им процессорным временем, и прогресс на некоторое время остановится. Я не вижу здесь совместной многозадачности, которая в Python требует yield или await (threading упреждающая!) Количество ЦП не имеет значения: threading использует только 1 ЦП (потому что GIL; ср. multiprocessing ). Существуют ли приоритеты потоков в threading? - person jacg; 14.04.2020
comment
Извините, я не совсем понял, что я хотел сказать, когда упомянул приоритеты потоков. Я хочу сказать: ваша реализация очереди не удовлетворяет требованиям к постоянному пространству. Очередь для потока может стать произвольно большой. В худшем случае он должен буферизовать все элементы входной последовательности. Спор с приоритетами потоков был лишь одним из примеров того, как заставить такое поведение работать. Но на самом деле вы можете сделать это проще: создайте потребителя, который выполняет time.sleep(60) перед обработкой первого элемента. Очередь для потока с этим потребителем будет буферизовать всю последовательность. - person Daniel Junglas; 15.04.2020
comment
Да, решение с очередями далеко от идеального. Ваш пример одного более медленного потребителя, вызывающего заполнение буфера, иллюстрирует преимущества совместной многозадачности: если бы каждый потребитель передал бы контроль после потребления одного элемента (в отличие от планировщика, захватившего его, когда ему захочется), и задачи были бы запущены в циклическом расписании размер буфера будет ровно 1, а ЦП будет загружен на 100%: каждая задача получит ровно столько, сколько ей нужно. threading будет продолжать «справедливо» делить ЦП между занятой задачей и заблокированными, что является пустой тратой времени ЦП. - person jacg; 15.04.2020
comment
Вы уверены, что планировщик назначает время заблокированным потокам в условной переменной? Я не эксперт в многопоточности Python, но я уверен, что это не так, как все реализовано на уровне ОС. Поток, заблокированный блокировкой или переменной условия, не находится в состоянии выполнения и не будет выполняться, если его явно не разбудить. Я понятия не имею, сколько всего этого переносится через Python. Обратите внимание, что вы можете эмулировать циклический перебор, имея одну условную переменную/семафор на поток. После запуска потока он сигнализирует условие для следующего. Таким образом, только один поток всегда активен. - person Daniel Junglas; 15.04.2020

Вот реализация решения с упреждающей потоковой передачей, описанного в исходном вопросе.

[EDIT: с этой реализацией есть серьезная проблема. [EDIT, теперь исправлено, используя решение, вдохновленное Daniel Junglas.]

Потребители, которые не выполняют итерацию всего итерируемого объекта, вызовут утечку пространства в очереди внутри Link. Например:


def exceeds_10(iterable):
    for item in iterable:
        if item > 10:
            return True
    return False

если вы используете это как одного из потребителей и используете источник range(10**6), он перестанет удалять элементы из очереди внутри Link после первых 11 элементов, оставляя примерно 10**6 элементов для накопления в очереди!

]


class Link:

    def __init__(self, queue):
        self.queue = queue

    def __iter__(self):
        return self

    def __next__(self):
        item = self.queue.get()
        if item is FINISHED:
            raise StopIteration
        return item

    def put(self, item):
        self.queue.put(item)

    def stop(self):
        self.queue.put(FINISHED)

    def consumer_not_listening_any_more(self):
        self.__class__ = ClosedLink


class ClosedLink:

    def put(self, _): pass
    def stop(self)  : pass


class FINISHED: pass


def make_thread(link, consumer, future):
    from threading import Thread
    return Thread(target = lambda: on_thread(link, consumer, future))

def on_thread(link, consumer, future):
    future.set_result(consumer(link))
    link.consumer_not_listening_any_more()

def source_summaries_PREEMPTIVE_THREAD(source, *consumers):
    from queue     import SimpleQueue as Queue
    from asyncio   import Future

    links   = tuple(Link(Queue()) for _ in consumers)
    futures = tuple(     Future() for _ in consumers)
    threads = tuple(map(make_thread, links, consumers, futures))

    for thread in threads:
        thread.start()

    for item in source:
        for link in links:
            link.put(item)

    for link in links:
        link.stop()

    for t in threads:
        t.join()

    return tuple(f.result() for f in futures)

Это работает, но (что неудивительно) с ужасным ухудшением производительности:

def time(thunk):
    from time import time
    start = time()
    thunk()
    stop  = time()
    return stop - start

N = 10 ** 7
t = time(lambda: testit(source_summaries, N))
print(f'old: {N} in {t:5.1f} s')

t = time(lambda: testit(source_summaries_PREEMPTIVE_THREAD, N))
print(f'new: {N} in {t:5.1f} s')

давать

old: 10000000 in   1.2 s
new: 10000000 in  30.1 s

Таким образом, хотя это и теоретическое решение, оно не является практическим[*].

Следовательно, я думаю, что этот подход является тупиковым, если нет способа убедить consumer уступить совместно (в отличие от принуждения его к уступке упреждающе) в

def on_thread(link, consumer, future):
    future.set_result(consumer(link))

... но это кажется принципиально невозможным. Хотелось бы оказаться неправым.

[*] Это на самом деле немного грубо: тест абсолютно ничего не делает с тривиальными данными; если бы это было частью более крупного вычисления, выполняющего тяжелые вычисления элементов, то этот подход мог бы быть действительно полезным.

person jacg    schedule 08.04.2020