Я работал над базовой двумерной симуляцией DLA в режиме многозадачности. Диффузионная ограниченная агрегация (DLA) - это когда у вас есть частицы, выполняющие случайное блуждание и агрегирующие, когда они касаются текущего агрегата.
В моделировании у меня есть 10.000 частиц, движущихся в случайном направлении на каждом шаге. Я использую пул рабочих и очередь, чтобы кормить их. Я скармливаю им список частиц, и рабочий выполняет метод .updatePositionAndggregate()
для каждой частицы.
Если у меня есть один рабочий, я кормлю его списком из 10.000 частиц, если у меня есть два рабочих, я кормлю их списком из 5.000 частиц каждый, если у меня 3 рабочих, я кормлю их списком из 3,333 частиц каждый, и т. д. и т. д.
Я покажу вам код для рабочего сейчас
class Worker(Thread):
"""
The worker class is here to process a list of particles and try to aggregate
them.
"""
def __init__(self, name, particles):
"""
Initialize the worker and its events.
"""
Thread.__init__(self, name = name)
self.daemon = True
self.particles = particles
self.start()
def run(self):
"""
The worker is started just after its creation and wait to be feed with a
list of particles in order to process them.
"""
while True:
particles = self.particles.get()
# print self.name + ': wake up with ' + str(len(self.particles)) + ' particles' + '\n'
# Processing the particles that has been feed.
for particle in particles:
particle.updatePositionAndAggregate()
self.particles.task_done()
# print self.name + ': is done' + '\n'
И в основном потоке:
# Create the workers.
workerQueue = Queue(num_threads)
for i in range(0, num_threads):
Worker("worker_" + str(i), workerQueue)
# We run the simulation until all the particle has been created
while some_condition():
# Feed all the workers.
startWorker = datetime.datetime.now()
for i in range(0, num_threads):
j = i * len(particles) / num_threads
k = (i + 1) * len(particles) / num_threads
# Feeding the worker thread.
# print "main: feeding " + worker.name + ' ' + str(len(worker.particles)) + ' particles\n'
workerQueue.put(particles[j:k])
# Wait for all the workers
workerQueue.join()
workerDurations.append((datetime.datetime.now() - startWorker).total_seconds())
print sum(workerDurations) / len(workerDurations)
Итак, я печатаю среднее время ожидания работниками завершения своих задач. Я поэкспериментировал с другим номером резьбы.
| num threads | average workers duration (s.) |
|-------------|-------------------------------|
| 1 | 0.147835636364 |
| 2 | 0.228585818182 |
| 3 | 0.258296454545 |
| 10 | 0.294294636364 |
Мне действительно интересно, почему добавление воркеров увеличивает время обработки, я думал, что, по крайней мере, наличие двух воркеров уменьшит время обработки, но оно резко увеличивается с 0,14 с. до 0,23 с. Вы можете мне объяснить почему?
РЕДАКТИРОВАТЬ: Итак, объяснение - это реализация потоковой передачи Python, есть ли способ, чтобы я мог иметь реальную многозадачность?