Peewee, SQLite и многопоточность

Я работаю над многопоточным приложением, в котором один поток будет кормить Queue объектами, которые нужно изменить, а несколько других потоков затем будут читать из очереди, выполнять модификации и сохранять изменения.

Приложению не потребуется много параллелизма, поэтому я хотел бы придерживаться базы данных SQLite. Вот небольшой пример, иллюстрирующий приложение:

import queue
import threading
import peewee as pw

db = pw.SqliteDatabase('test.db', threadlocals=True)

class Container(pw.Model):
    contents = pw.CharField(default="spam")

    class Meta:
        database = db


class FeederThread(threading.Thread):

    def __init__(self, input_queue):
        super().__init__()

        self.q = input_queue

    def run(self):
        containers = Container.select()

        for container in containers:
            self.q.put(container)


class ReaderThread(threading.Thread):

    def __init__(self, input_queue):
        super().__init__()

        self.q = input_queue

    def run(self):
        while True:
            item = self.q.get()

            with db.execution_context() as ctx:
                # Get a new connection to the container object:
                container = Container.get(id=item.id)
                container.contents = "eggs"
                container.save()

            self.q.task_done()


if __name__ == "__main__":

    db.connect()
    try:
        db.create_tables([Container,])
    except pw.OperationalError:
        pass
    else:
        [Container.create() for c in range(42)]
    db.close()

    q = queue.Queue(maxsize=10)


    feeder = FeederThread(q)
    feeder.setDaemon(True)
    feeder.start()

    for i in range(10):
        reader = ReaderThread(q)
        reader.setDaemon(True)
        reader.start()

    q.join()

Согласно документам peewee, для SQLite должна поддерживаться многопоточность. Тем не менее, я продолжаю получать печально известную ошибку peewee.OperationalError: database is locked с выводом ошибки, указывающим на строку container.save().

Как мне обойти это?


person digitaldingo    schedule 22.07.2015    source источник


Ответы (2)


Я был несколько удивлен, увидев этот сбой, поэтому я скопировал ваш код и поэкспериментировал с некоторыми другими идеями. Я думаю, проблема в том, что ExecutionContext() по умолчанию заставит обернутый блок запускаться в транзакции. Чтобы избежать этого, я передал False в ветке чтения.

Я также отредактировал фидер, чтобы использовать оператор SELECT перед помещением материала в очередь (list(Container.select())).

Следующие работы для меня локально:

class FeederThread(threading.Thread):

    def __init__(self, input_queue):
        super(FeederThread, self).__init__()

        self.q = input_queue

    def run(self):
        containers = list(Container.select())

        for container in containers:
            self.q.put(container.id)  # I don't like passing model instances around like this, personal preference though

class ReaderThread(threading.Thread):

    def __init__(self, input_queue):
        super(ReaderThread, self).__init__()

        self.q = input_queue

    def run(self):
        while True:
            item = self.q.get()

            with db.execution_context(False):
                # Get a new connection to the container object:
                container = Container.get(id=item)
                container.contents = "nuggets"
                with db.atomic():
                    container.save()

            self.q.task_done()

if __name__ == "__main__":

    with db.execution_context():
        try:
            db.create_tables([Container,])
        except OperationalError:
            pass
        else:
            [Container.create() for c in range(42)]

    # ... same ...

Я не совсем доволен этим, но, надеюсь, это даст вам некоторые идеи.

Вот сообщение в блоге, которое я написал некоторое время назад, в котором есть несколько советов по повышению параллелизма с SQLite: http://charlesleifer.com/blog/sqlite-small-fast-reliable-choose-any-three-/

person coleifer    schedule 22.07.2015
comment
Спасибо за Ваш ответ! Потоки для меня все еще немного вуду, поэтому я рад, что это не было очевидной ошибкой с моей стороны. Интересно, что использование оператора SELECT кажется здесь ключевым — я не вижу никакой разницы в использовании db.execution_context(False) или with db.atomic(). На самом деле, используя оператор SELECT, я даже не нуждаюсь в ExecutionContext(). Итак, я полагаю, что оператор SELECT фактически блокировал базу данных? - person digitaldingo; 23.07.2015

Вы пробовали режим WAL?

Повысить производительность INSERT в секунду для SQLite?

Вы должны быть очень осторожны, если у вас есть одновременный доступ к SQLite, так как вся база данных блокируется, когда выполняется запись, и хотя возможно несколько считывателей, запись будет заблокирована. Это было несколько улучшено с добавлением WAL в более новых версиях SQLite.

а также

Если вы используете несколько потоков, вы можете попробовать использовать общий кэш страниц, который позволит совместно использовать загруженные страницы между потоками, что позволит избежать дорогостоящих вызовов ввода-вывода.

person Paras    schedule 22.07.2015
comment
Спасибо за ваши предложения! К сожалению, ни WAL, ни использование общего кеша, похоже, здесь не имели значения. - person digitaldingo; 23.07.2015