Попытка заставить QProcess работать с очередью

Я пытаюсь запустить несколько процессов с очередью и получить результат для всех процессов, используя QProcess, но у меня есть пара проблем. Я использую QSpinBox, чтобы настроить максимальное количество процессов для одновременного запуска, и я могу заставить все нормально работать в основном потоке, или если я запускаю цикл с процессами в QObject, но я не могу заставить его работать правильно в a QThread.
Я знаю, что нет необходимости использовать потоки с QProcess, но с циклом у меня почти нет выбора. При запуске в основном потоке он на мгновение зависает до тех пор, пока процессы не запустятся, и я бы предпочел, чтобы он работал более плавно.
Я получаю только ошибки, пытаясь запустить процессы в QThread, если только я не использую что-то вроде _process.waitForFinished(), но проблема с этим в том, процессы запускаются только по одному.
Есть ли у кого-нибудь какие-либо предложения, чтобы заставить это работать правильно? В настоящее время я использую Pyside2, но ответ для Pyside2 или PyQt5 будет в порядке. Спасибо.

import queue
import sys
from PySide2.QtCore import QProcess, QTextCodec, QThread, Qt
from PySide2.QtWidgets import QApplication, QWidget, QSpinBox, \
    QPushButton, QVBoxLayout

class Window(QWidget):
    def __init__(self):
        QWidget.__init__(self)
        self.setAttribute(Qt.WA_DeleteOnClose, True)
        self.queue = queue.Queue()
        layout = QVBoxLayout(self)
        self.startBtn = QPushButton('Start', clicked=self.addToQueue)
        self.spinBox = QSpinBox(value=3)
        layout.addWidget(self.spinBox)
        layout.addWidget(self.startBtn)
        self.taskList = ['my.exe -value','my.exe -value','my.exe -value','my.exe -value',
                         'my.exe -value','my.exe -value','my.exe -value','my.exe -value']

    def addToQueue(self):
        for i in self.taskList:
            self.queue.put(i)
        self.sendToThread()

    def sendToThread(self):
        vals = {'max': self.spinBox.value()}
        self.taskThread = TaskThread(self.queue, vals)
        self.taskThread.start()

    def closeEvent(self, event):
        event.accept()

class TaskThread(QThread):
    def __init__(self, queue=None, vals=None, parent=None):
        QThread.__init__(self, parent)
        self.queue = queue
        self.vals = vals
        self.maxProcs = self.vals.get('max')
        self.procCount = 0

    def run(self):
        self.start_procs()

    def start_procs(self):
        while not self.queue.empty() and self.procCount < self.maxProcs:
            cmd = self.queue.get()
            _process = QProcess(self)
            _process.setProcessChannelMode(QProcess.MergedChannels)
            self.codec = QTextCodec.codecForLocale()
            self._decoder_stdout = self.codec.makeDecoder()
            _process.readyReadStandardOutput.connect(lambda process=_process: self._ready_read_standard_output(process))
            _process.started.connect(self.procStarted)
            _process.finished.connect(self.procFinished)
            _process.finished.connect(self.decreaseCount)
            _process.finished.connect(self.start_procs)
            _process.start(cmd)
            self.procCount += 1

    def _ready_read_standard_output(self, process):
        self.out = process.readAllStandardOutput()
        self.text = self._decoder_stdout.toUnicode(self.out)
        print(self.text)

    def decreaseCount(self):
        if self.procCount <= 0:
            pass
        else:
            self.procCount -= 1

    def procStarted(self):
        print('started')

    def procFinished(self):
        print('finished')

if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = Window()
    window.resize(200, 100)
    window.show()
    sys.exit(app.exec_())

person Richard    schedule 24.11.2018    source источник
comment
Насколько я понимаю, у вас есть n задач, и вы хотите выполнить только мои задачи в момент времени, поэтому, если одна задача завершена, ее нужно заменить другой. Я не ошибаюсь?. Чтобы представить это в числовом примере, скажем, у вас есть 50 задач, и вы хотите выполнить максимум 6 задач, тогда будут выполнены первые 6 задач, и если одна из этих задач будет завершена, ее следует заменить другой из оставшихся задач. .   -  person eyllanesc    schedule 25.11.2018
comment
С другой стороны, предположим, что вы установили максимум 6 задач, а затем с помощью QSlider вы меняете на 4 задачи, тогда вы должны убить 2 задачи или просто не добавлять задачи, пока не достигнете нового максимума?   -  person eyllanesc    schedule 25.11.2018
comment
Извините, я только что увидел ваш ответ. Я пытаюсь запустить несколько задач асинхронно в очереди. Таким образом, если у меня есть 10 задач, и я установил счетчик на 4, тогда 4 из 10 задач будут запущены, а новые задачи запустятся по мере завершения других, но никогда не будет запущено больше установленного количества задач одновременно.   -  person Richard    schedule 25.11.2018


Ответы (1)


Когда вы запускаете процесс, это не означает, что он запущен, потому что у него могут быть проблемы с выполнением, поэтому при первом запуске лучше дождаться запуска процесса или не удалось запустить следующий процесс, выполнив требования, которые количество количество запущенных процессов меньше максимального или больше не выполняет задачи или не запущено.

С другой стороны, я также реализовал задачу остановки, что означает, что больше задач не будет добавлено, но задачи, которые выполнялись до остановки, будут продолжать выполняться.

Если вы измените максимальное значение на более низкое значение, то больше задачи не будут запускаться, пока не будет выполнено условие.

Учитывая вышеизложенное, нет необходимости использовать нити

import queue
from PySide2 import QtCore, QtGui, QtWidgets

class TaskManager(QtCore.QObject):
    messageChanged = QtCore.Signal(str)
    numbersTaskRunningChanged = QtCore.Signal(int)

    def __init__(self, parent=None):
        super(TaskManager, self).__init__(parent)
        self._max_task = 1
        self._queue = queue.Queue()
        self._numbers_task_running = 0
        self._running = False

    def setMaxTask(self, max_task):
        self._max_task = max_task
        if self._running:
            self.call_task()

    def maxTask(self):
        return self._max_task

    def appendTask(self, task):
        self._queue.put(task)
        self.call_task()

    def start(self):
        self._running = True
        self.call_task()

    def stop(self):
        self._running = False

    def call_task(self):
        if self._numbers_task_running < self.maxTask() and not self._queue.empty() and self._running:
            cmd = self._queue.get()
            process = QtCore.QProcess(self)
            process.setProcessChannelMode(QtCore.QProcess.MergedChannels)
            process.readyReadStandardOutput.connect(self.on_readyReadStandardOutput)
            process.finished.connect(self.on_finished)
            process.started.connect(self.on_started)
            process.errorOccurred.connect(self.on_errorOccurred)
            process.start(cmd)

    def on_readyReadStandardOutput(self):
        codec = QtCore.QTextCodec.codecForLocale()
        decoder_stdout = codec.makeDecoder()
        process = self.sender()
        text = decoder_stdout.toUnicode(process.readAllStandardOutput())
        self.messageChanged.emit(text)

    def on_errorOccurred(self, error):
        process = self.sender()
        print("error: ", error, "-", " ".join([process.program()] + process.arguments()))
        self.call_task()

    def on_finished(self):
        process = self.sender()
        self._numbers_task_running -= 1
        self.numbersTaskRunningChanged.emit(self._numbers_task_running)
        self.call_task()

    def on_started(self):
        process = self.sender()
        print("started: ", " ".join([process.program()] + process.arguments()))
        self._numbers_task_running += 1
        self.numbersTaskRunningChanged.emit(self._numbers_task_running)
        self.call_task()

class Widget(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super(Widget, self).__init__(parent)
        self.setAttribute(QtCore.Qt.WA_DeleteOnClose, True)
        manager = TaskManager(self)
        task_list = # ...
        for task in task_list:
            manager.appendTask(task)

        button_start = QtWidgets.QPushButton("Start", clicked=manager.start)
        button_stop = QtWidgets.QPushButton("Stop", clicked=manager.stop)
        label = QtWidgets.QLabel("0", alignment=QtCore.Qt.AlignCenter)
        manager.numbersTaskRunningChanged.connect(label.setNum)
        spinBox = QtWidgets.QSpinBox()
        spinBox.valueChanged.connect(manager.setMaxTask)
        spinBox.setValue(3)
        textEdit = QtWidgets.QTextEdit()
        manager.messageChanged.connect(textEdit.append)

        lay = QtWidgets.QVBoxLayout(self)
        lay.addWidget(spinBox)
        lay.addWidget(button_start)
        lay.addWidget(button_stop)
        lay.addWidget(label)
        lay.addWidget(textEdit)

if __name__ == '__main__':
    import sys
    app = QtWidgets.QApplication(sys.argv)
    w = Widget()
    w.show()
    sys.exit(app.exec_())
person eyllanesc    schedule 24.11.2018
comment
Большое спасибо. Это работает хорошо. Основная причина, по которой я пытался использовать QThread, заключалась в том, чтобы избежать начальной задержки запуска процессов, но на самом деле это не имеет большого значения. Я рад, что ваш метод также избавился от цикла while. У меня возникли проблемы с тем, чтобы заставить его работать должным образом без дополнительного цикла. Очень ценю помощь! - person Richard; 25.11.2018