Примеры Spark + Deap не работают

Цель

Я пытаюсь распараллелить DEAP через искровой кластер. Я видел, как на это ссылались другие пользователи, так как это позволяет легко интегрироваться с существующей серверной архитектурой через пряжу. Я следил за несколькими онлайн-учебниками, указанными в ссылках. У меня есть рабочий код для deap, а затем код, который я попытался преобразовать для использования spark. Обычно возникает та же самая ошибка «Не удается получить атрибут Individual в модуле deap.creator».

Рабочий код

import numpy as np
import random

from deap import base
from deap import creator
from deap import tools
from deap import algorithms

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

def evalOneMax(individual):
    return sum(individual),

toolbox = base.Toolbox()
toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual,
    toolbox.attr_bool, 100)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", evalOneMax)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

# Define parallelism outside main
if __name__=="__main__":

    pop = toolbox.population(n=300)
    hof = tools.HallOfFame(1)
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", np.mean)
    stats.register("std", np.std)
    stats.register("min", np.min)
    stats.register("max", np.max)

    pop, log = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=40,
                                   stats=stats, halloffame=hof, verbose=True)

Ран:

python3 test-deap.py

Выход

gen nevals  avg     std     min max
0   300     50.5533 4.78893 38  63
1   184     54.2433 3.85627 44  65
2   191     57.2667 3.38756 47  65
3   182     60.0333 3.15154 51  69
4   179     62.35   2.95762 53  71
5   176     64.3267 2.77968 55  73
6   183     66.2467 2.80223 58  75
7   182     68.1733 2.66019 59  79
8   186     69.8367 2.83255 62  77
9   179     71.7233 2.70557 59  78
10  187     73.3667 2.63165 63  80
11  169     74.7933 2.43255 65  80
12  182     76.0967 2.42363 65  82
13  204     77.3    2.36995 67  83
14  203     78.6267 2.31818 70  84
15  182     79.8933 2.29535 72  84
16  183     81.02   2.29483 72  86
17  185     81.87   2.41242 73  87
18  190     83.0633 2.13057 74  87
19  182     84.06   2.16096 75  89
20  194     84.8167 2.41724 77  91
21  174     85.9633 2.22755 79  91
22  180     86.8033 2.23263 79  92
23  177     87.7533 2.3831  78  93
24  174     88.61   2.34334 79  93
25  171     89.6167 2.36144 78  95
26  195     90.57   2.4695  81  95
27  169     91.5233 2.23072 82  96
28  173     92.3733 2.16347 83  97
29  203     93.1    2.13151 85  97
30  179     93.6067 2.41356 84  98
31  169     94.3067 2.23293 86  99
32  184     94.8933 2.49706 85  99
33  175     95.8733 2.14413 88  99
34  168     96.2167 2.30428 88  99
35  173     96.88   2.22537 87  100
36  171     97.33   2.29951 87  100
37  184     97.89   2.05375 91  100
38  175     98.0333 2.52565 88  100
39  176     98.6667 2.07579 90  100
40  175     98.6867 2.32562 91  100

Не рабочий код

from pyspark import SparkContext

import numpy as np
import random

from deap import base
from deap import creator
from deap import tools
from deap import algorithms

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

def evalOneMax(individual):
    return sum(individual),

toolbox = base.Toolbox()
toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual,
    toolbox.attr_bool, 100)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", evalOneMax)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

# Define parallelism outside main
if __name__=="__main__":
    sc = SparkContext(appName="DEAP")

    def sparkMap(algorithm, population):
        return sc.parallelize(population).map(algorithm).collect()

    toolbox.register("map", sparkMap)

    pop = toolbox.population(n=300)
    hof = tools.HallOfFame(1)
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", np.mean)
    stats.register("std", np.std)
    stats.register("min", np.min)
    stats.register("max", np.max)

    pop, log = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=40,
                                   stats=stats, halloffame=hof, verbose=True)

Ран:

spark-submit --master local test-deap.py

Выход

Полный текст (pastebin)

Основное:

Traceback (most recent call last):
  File "/Users/ryapeach/Documents/Workspace/relay-death/test-deap.py", line 45, in <module>
    stats=stats, halloffame=hof, verbose=True)
  File "/usr/local/lib/python3.6/site-packages/deap-1.2.2-py3.6-macosx-10.13-x86_64.egg/deap/algorithms.py", line 150, in eaSimple
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
  File "/Users/ryapeach/Documents/Workspace/relay-death/test-deap.py", line 32, in sparkMap
    return sc.parallelize(population).map(algorithm).collect()
AttributeError: Can't get attribute 'Individual' on <module 'deap.creator' from '/usr/local/lib/python3.6/site-packages/deap-1.2.2-py3.6-macosx-10.13-x86_64.egg/deap/creator.py'>

Использованная литература:


person Ryan    schedule 09.03.2018    source источник