Надеюсь, ты поможешь мне с этими ребятами. Это не помощь в работе - это благотворительность для очень трудолюбивых добровольцев, которые действительно могли бы использовать менее запутанную / раздражающую систему расписания, чем то, что у них есть сейчас.
Если кто-нибудь знает хорошее стороннее приложение, которое (безусловно) автоматизирует это, это будет почти так же хорошо. Просто ... пожалуйста, не предлагайте случайное расписание, например, для бронирования классов, так как я не думаю, что они могут это сделать.
Заранее благодарим за прочтение; Я знаю, что это большой пост. Я стараюсь изо всех сил задокументировать это четко и показать, что я приложил все усилия самостоятельно.
Проблема
Мне нужен алгоритм планирования рабочего / временного интервала, который генерирует смены для рабочих, который соответствует следующим критериям:
Входные данные
import datetime.datetime as dt
class DateRange:
def __init__(self, start, end):
self.start = start
self.end = end
class Shift:
def __init__(self, range, min, max):
self.range = range
self.min_workers = min
self.max_workers = max
tue_9th_10pm = dt(2009, 1, 9, 22, 0)
wed_10th_4am = dt(2009, 1, 10, 4, 0)
wed_10th_10am = dt(2009, 1, 10, 10, 0)
shift_1_times = Range(tue_9th_10pm, wed_10th_4am)
shiftclass Solution:
def __init__(self, shifts_workers):
"""shifts_workers -- a dictionary of shift objects as keys, and a
a lists of workers filling the shift as values."""
assert isinstance(dict, shifts_workers)
self.shifts_workers = shifts_workers
times = Range(wed_10th_4am, wed_10th_10am)
shiftdef check_solution(solution):
assert isinstance(Solution, solution)
def shift_check(shift, workers, workers_allowed):
assert isinstance(Shift, shift):
assert isinstance(list, workers):
assert isinstance(list, workers_allowed)
num_workers = len(workers)
assert num_workers >= shift.min_workers
assert num_workers <= shift.max_workers
for w in workers_allowed:
assert w in workers
shifts_workers = solution.shifts_workers
# all shifts should be covered
assert len(shifts_workers.keys()) == 3
assert shift1 in shifts_workers.keys()
assert shift2 in shifts_workers.keys()
assert shift3 in shifts_workers.keys()
# shift_1 should be covered by 2 people - joe, and bob
shift_check(shift_1, shifts_workers[shift_1], (joe, bob))
# shift_2 should be covered by 2 people - sam and amy
shift_check(shift_2, shifts_workers[shift_2], (sam, amy))
# shift_3 should be covered by 3 people - ned, max, and jim
shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))
times = Range(wed_10th_10am, wed_10th_2pm)
shift_1 = Shift(shift_1_times, 2,3) # allows 3, requires 2, but only 2 available
shift_2 = Shift(shiftclass Solution:
def __init__(self, shifts_workers):
"""shifts_workers -- a dictionary of shift objects as keys, and a
a lists of workers filling the shift as values."""
assert isinstance(dict, shifts_workers)
self.shifts_workers = shifts_workers
times, 2,2) # allows 2
shift_3 = Shift(shiftdef check_solution(solution):
assert isinstance(Solution, solution)
def shift_check(shift, workers, workers_allowed):
assert isinstance(Shift, shift):
assert isinstance(list, workers):
assert isinstance(list, workers_allowed)
num_workers = len(workers)
assert num_workers >= shift.min_workers
assert num_workers <= shift.max_workers
for w in workers_allowed:
assert w in workers
shifts_workers = solution.shifts_workers
# all shifts should be covered
assert len(shifts_workers.keys()) == 3
assert shift1 in shifts_workers.keys()
assert shift2 in shifts_workers.keys()
assert shift3 in shifts_workers.keys()
# shift_1 should be covered by 2 people - joe, and bob
shift_check(shift_1, shifts_workers[shift_1], (joe, bob))
# shift_2 should be covered by 2 people - sam and amy
shift_check(shift_2, shifts_workers[shift_2], (sam, amy))
# shift_3 should be covered by 3 people - ned, max, and jim
shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))
times, 2,3) # allows 3, requires 2, 3 available
shifts = ( shift_1, shift_2, shift_3 )
joe_avail = [ shift_1, shift_2 ]
bob_avail = [ shift_1, shift_3 ]
sam_avail = [ shift_2 ]
amy_avail = [ shift_2 ]
ned_avail = [ shift_2, shift_3 ]
max_avail = [ shift_3 ]
jim_avail = [ shift_3 ]
joe = Worker('joe', joe_avail)
bob = Worker('bob', bob_avail)
sam = Worker('sam', sam_avail)
ned = Worker('ned', ned_avail)
max = Worker('max', max_avail)
amy = Worker('amy', amy_avail)
jim = Worker('jim', jim_avail)
workers = ( joe, bob, sam, ned, max, amy, jim )
Обработка
Сверху, смены и worker - две основные входные переменные для обработки
В каждой смене требуется минимальное и максимальное количество рабочих. Выполнение минимальных требований для смены имеет решающее значение для успеха, но если все остальное не помогает, ротация с пропусками, которые нужно заполнить вручную, лучше, чем «ошибка» :) Основная алгоритмическая проблема заключается в том, что не должно быть ненужных пропусков, когда их достаточно рабочие имеются.
В идеале максимальное количество рабочих за смену должно быть заполнено, но это самый низкий приоритет по сравнению с другими ограничениями, поэтому, если что-то должно дать, так оно и будет.
Гибкие ограничения
Они немного гибкие, и их границы можно немного расширить, если не удастся найти «идеальное» решение. Однако такая гибкость должна быть крайней мерой, а не использоваться случайным образом. В идеале гибкость можно настраивать с помощью переменной fudge_factor или аналогичной.
- Между двумя сменами существует минимальный промежуток времени. Так, например, работник не должен работать в две смены в один день.
- Существует максимальное количество смен, которое работник может выполнять за определенный период времени (скажем, в месяц).
- Существует максимальное количество определенных смен, которые можно выполнять в месяц (например, ночные смены).
Приятно иметь, но не обязательно
Если вы сможете придумать алгоритм, который выполняет все вышеперечисленное и включает в себя все / все из них, я буду очень впечатлен и благодарен. Даже дополнительный скрипт для выполнения этих битов по отдельности тоже был бы отличным вариантом.
Перекрытие смен. Например, было бы хорошо указать смену «стойки регистрации» и смены «бэк-офиса», которые происходят одновременно. Это можно сделать с помощью отдельных вызовов программы с разными данными о смене, за исключением того, что будут упущены ограничения на планирование людей на несколько смен в заданный период времени.
Минимальный период перепланирования для работников, определяемый для каждого работника (а не глобально). Например, если Джо чувствует себя перегруженным работой или имеет дело с личными проблемами, или если он новичок в изучении канатов, мы можем планировать его реже, чем других сотрудников.
Какой-то автоматический / случайный / справедливый способ подбора персонала для заполнения минимального количества смен, когда нет свободных рабочих.
Какой-то способ справиться с внезапными отменами и просто заполнить пробелы, не переставляя другие смены.
Выходной тест
Вероятно, алгоритм должен сгенерировать как можно больше подходящих Решений, каждое из которых выглядит так:
class Solution:
def __init__(self, shifts_workers):
"""shifts_workers -- a dictionary of shift objects as keys, and a
a lists of workers filling the shift as values."""
assert isinstance(dict, shifts_workers)
self.shifts_workers = shifts_workers
Вот тестовая функция для индивидуального решения с учетом приведенных выше данных. Я думаю, что это правильно, но я бы тоже был признателен за его экспертную оценку.
def check_solution(solution):
assert isinstance(Solution, solution)
def shift_check(shift, workers, workers_allowed):
assert isinstance(Shift, shift):
assert isinstance(list, workers):
assert isinstance(list, workers_allowed)
num_workers = len(workers)
assert num_workers >= shift.min_workers
assert num_workers <= shift.max_workers
for w in workers_allowed:
assert w in workers
shifts_workers = solution.shifts_workers
# all shifts should be covered
assert len(shifts_workers.keys()) == 3
assert shift1 in shifts_workers.keys()
assert shift2 in shifts_workers.keys()
assert shift3 in shifts_workers.keys()
# shift_1 should be covered by 2 people - joe, and bob
shift_check(shift_1, shifts_workers[shift_1], (joe, bob))
# shift_2 should be covered by 2 people - sam and amy
shift_check(shift_2, shifts_workers[shift_2], (sam, amy))
# shift_3 should be covered by 3 people - ned, max, and jim
shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))
Попытки
Я пробовал реализовать это с помощью генетического алгоритма, но, похоже, не могу настроить его правильно, поэтому, хотя основной принцип, кажется, работает в одну смену, он не может решить даже простые случаи с несколькими сменами и несколькими рабочие.
Моя последняя попытка - создать все возможные перестановки в качестве решения, а затем сократить перестановки, которые не соответствуют ограничениям. Кажется, это работает намного быстрее и продвинуло меня дальше, но я использую itertools.product () python 2.6, чтобы помочь сгенерировать перестановки, и я не могу понять это правильно. Не удивлюсь, если багов будет много, так как, честно говоря, проблема в моей голове не укладывается :)
В настоящее время мой код для этого находится в двух файлах: models.py и rota.py. models.py выглядит так:
# -*- coding: utf-8 -*-
class Shift:
def __init__(self, start_datetime, end_datetime, min_coverage, max_coverage):
self.start = start_datetime
self.end = end_datetime
self.duration = self.end - self.start
self.min_coverage = min_coverage
self.max_coverage = max_coverage
def __repr__(self):
return "<Shift %s--%s (%r<x<%r)" % (self.start, self.end, self.min_coverage, self.max_coverage)
class Duty:
def __init__(self, worker, shift, slot):
self.worker = worker
self.shift = shift
self.slot = slot
def __repr__(self):
return "<Duty worker=%r shift=%r slot=%d>" % (self.worker, self.shift, self.slot)
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Duty shift=%s slot=%s" % (self.shift, self.slot)
self.worker.dump(indent=indent, depth=depth+1)
print ind + ">"
class Avail:
def __init__(self, start_time, end_time):
self.start = start_time
self.end = end_time
def __repr__(self):
return "<%s to %s>" % (self.start, self.end)
class Worker:
def __init__(self, name, availabilities):
self.name = name
self.availabilities = availabilities
def __repr__(self):
return "<Worker %s Avail=%r>" % (self.name, self.availabilities)
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Worker %s" % self.name
for avail in self.availabilities:
print ind + " " * indent + repr(avail)
print ind + ">"
def available_for_shift(self, shift):
for a in self.availabilities:
if shift.start >= a.start and shift.end <= a.end:
return True
print "Worker %s not available for %r (Availability: %r)" % (self.name, shift, self.availabilities)
return False
class Solution:
def __init__(self, shifts):
self._shifts = list(shifts)
def __repr__(self):
return "<Solution: shifts=%r>" % self._shifts
def duties(self):
d = []
for s in self._shifts:
for x in s:
yield x
def shifts(self):
return list(set([ d.shift for d in self.duties() ]))
def dump_shift(self, s, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<ShiftList"
for duty in s:
duty.dump(indent=indent, depth=depth+1)
print ind + ">"
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Solution"
for s in self._shifts:
self.dump_shift(s, indent=indent, depth=depth+1)
print ind + ">"
class Env:
def __init__(self, shifts, workers):
self.shifts = shifts
self.workers = workers
self.fittest = None
self.generation = 0
class DisplayContext:
def __init__(self, env):
self.env = env
def status(self, msg, *args):
raise NotImplementedError()
def cleanup(self):
pass
def update(self):
pass
и rota.py выглядит так:
#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-
from datetime import datetime as dt
am2 = dt(2009, 10, 1, 2, 0)
am8 = dt(2009, 10, 1, 8, 0)
pm12 = dt(2009, 10, 1, 12, 0)
def duties_for_all_workers(shifts, workers):
from models import Duty
duties = []
# for all shifts
for shift in shifts:
# for all slots
for cov in range(shift.min_coverage, shift.max_coverage):
for slot in range(cov):
# for all workers
for worker in workers:
# generate a duty
duty = Duty(worker, shift, slot+1)
duties.append(duty)
return duties
def filter_duties_for_shift(duties, shift):
matching_duties = [ d for d in duties if d.shift == shift ]
for m in matching_duties:
yield m
def duty_permutations(shifts, duties):
from itertools import product
# build a list of shifts
shift_perms = []
for shift in shifts:
shift_duty_perms = []
for slot in range(shift.max_coverage):
slot_duties = [ d for d in duties if d.shift == shift and d.slot == (slot+1) ]
shift_duty_perms.append(slot_duties)
shift_perms.append(shift_duty_perms)
all_perms = ( shift_perms, shift_duty_perms )
# generate all possible duties for all shifts
perms = list(product(*shift_perms))
return perms
def solutions_for_duty_permutations(permutations):
from models import Solution
res = []
for duties in permutations:
sol = Solution(duties)
res.append(sol)
return res
def find_clashing_duties(duty, duties):
"""Find duties for the same worker that are too close together"""
from datetime import timedelta
one_day = timedelta(days=1)
one_day_before = duty.shift.start - one_day
one_day_after = duty.shift.end + one_day
for d in [ ds for ds in duties if ds.worker == duty.worker ]:
# skip the duty we're considering, as it can't clash with itself
if duty == d:
continue
clashes = False
# check if dates are too close to another shift
if d.shift.start >= one_day_before and d.shift.start <= one_day_after:
clashes = True
# check if slots collide with another shift
if d.slot == duty.slot:
clashes = True
if clashes:
yield d
def filter_unwanted_shifts(solutions):
from models import Solution
print "possibly unwanted:", solutions
new_solutions = []
new_duties = []
for sol in solutions:
for duty in sol.duties():
duty_ok = True
if not duty.worker.available_for_shift(duty.shift):
duty_ok = False
if duty_ok:
print "duty OK:"
duty.dump(depth=1)
new_duties.append(duty)
else:
print "duty **NOT** OK:"
duty.dump(depth=1)
shifts = set([ d.shift for d in new_duties ])
shift_lists = []
for s in shifts:
shift_duties = [ d for d in new_duties if d.shift == s ]
shift_lists.append(shift_duties)
new_solutions.append(Solution(shift_lists))
return new_solutions
def filter_clashing_duties(solutions):
new_solutions = []
for sol in solutions:
solution_ok = True
for duty in sol.duties():
num_clashing_duties = len(set(find_clashing_duties(duty, sol.duties())))
# check if many duties collide with this one (and thus we should delete this one
if num_clashing_duties > 0:
solution_ok = False
break
if solution_ok:
new_solutions.append(sol)
return new_solutions
def filter_incomplete_shifts(solutions):
new_solutions = []
shift_duty_count = {}
for sol in solutions:
solution_ok = True
for shift in set([ duty.shift for duty in sol.duties() ]):
shift_duties = [ d for d in sol.duties() if d.shift == shift ]
num_workers = len(set([ d.worker for d in shift_duties ]))
if num_workers < shift.min_coverage:
solution_ok = False
if solution_ok:
new_solutions.append(sol)
return new_solutions
def filter_solutions(solutions, workers):
# filter permutations ############################
# for each solution
solutions = filter_unwanted_shifts(solutions)
solutions = filter_clashing_duties(solutions)
solutions = filter_incomplete_shifts(solutions)
return solutions
def prioritise_solutions(solutions):
# TODO: not implemented!
return solutions
# prioritise solutions ############################
# for all solutions
# score according to number of staff on a duty
# score according to male/female staff
# score according to skill/background diversity
# score according to when staff last on shift
# sort all solutions by score
def solve_duties(shifts, duties, workers):
# ramify all possible duties #########################
perms = duty_permutations(shifts, duties)
solutions = solutions_for_duty_permutations(perms)
solutions = filter_solutions(solutions, workers)
solutions = prioritise_solutions(solutions)
return solutions
def load_shifts():
from models import Shift
shifts = [
Shift(am2, am8, 2, 3),
Shift(am8, pm12, 2, 3),
]
return shifts
def load_workers():
from models import Avail, Worker
joe_avail = ( Avail(am2, am8), )
sam_avail = ( Avail(am2, am8), )
ned_avail = ( Avail(am2, am8), )
bob_avail = ( Avail(am8, pm12), )
max_avail = ( Avail(am8, pm12), )
joe = Worker("joe", joe_avail)
sam = Worker("sam", sam_avail)
ned = Worker("ned", sam_avail)
bob = Worker("bob", bob_avail)
max = Worker("max", max_avail)
return (joe, sam, ned, bob, max)
def main():
import sys
shifts = load_shifts()
workers = load_workers()
duties = duties_for_all_workers(shifts, workers)
solutions = solve_duties(shifts, duties, workers)
if len(solutions) == 0:
print "Sorry, can't solve this. Perhaps you need more staff available, or"
print "simpler duty constraints?"
sys.exit(20)
else:
print "Solved. Solutions found:"
for sol in solutions:
sol.dump()
if __name__ == "__main__":
main()
Отсекая вывод отладки до результата, в настоящее время это дает:
Solved. Solutions found:
<Solution
<ShiftList
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker joe
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker sam
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker ned
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
>
<ShiftList
<Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
<Worker bob
<2009-10-01 08:00:00 to 2009-10-01 12:00:00>
>
>
<Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
<Worker max
<2009-10-01 08:00:00 to 2009-10-01 12:00:00>
>
>
>
>