У меня была та же проблема, что и в пункте 3 Элада, и в конце концов я решил ее, используя базовую структуру, как показано здесь, но с модифицированными версиями DeployerPartitionHandler и DeployerStepExecutionHandler.
Сначала я попробовал наивный подход создания двухуровневого разделения, в котором шаг, который выполняет каждый рабочий, сам разбивается на подразделы. Но структура, похоже, этого не поддерживает; он запутался в состоянии шага.
Поэтому я вернулся к плоскому набору разделов, но передав несколько идентификаторов шагов выполнения каждому исполнителю. Чтобы это сработало, я создал DeployerMultiPartitionHandler, который запускает настроенное количество рабочих и передает каждому список идентификаторов выполнения шагов. Обратите внимание, что теперь есть две степени свободы: количество воркеров и gridSize, который представляет собой общее количество разделов, которые распределяются между воркерами максимально равномерно. К сожалению, мне пришлось продублировать здесь много кода DeployerPartitionHandler.
@Slf4j
@Getter
@Setter
public class DeployerMultiPartitionHandler implements PartitionHandler, EnvironmentAware, InitializingBean {
public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_IDS =
"spring.cloud.task.step-execution-ids";
public static final String SPRING_CLOUD_TASK_JOB_EXECUTION_ID =
"spring.cloud.task.job-execution-id";
public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_ID =
"spring.cloud.task.step-execution-id";
public static final String SPRING_CLOUD_TASK_STEP_NAME =
"spring.cloud.task.step-name";
public static final String SPRING_CLOUD_TASK_PARENT_EXECUTION_ID =
"spring.cloud.task.parentExecutionId";
public static final String SPRING_CLOUD_TASK_NAME = "spring.cloud.task.name";
private int maxWorkers = -1;
private int gridSize = 1;
private int currentWorkers = 0;
private TaskLauncher taskLauncher;
private JobExplorer jobExplorer;
private TaskExecution taskExecution;
private Resource resource;
private String stepName;
private long pollInterval = 10000;
private long timeout = -1;
private Environment environment;
private Map<String, String> deploymentProperties;
private EnvironmentVariablesProvider environmentVariablesProvider;
private String applicationName;
private CommandLineArgsProvider commandLineArgsProvider;
private boolean defaultArgsAsEnvironmentVars = false;
public DeployerMultiPartitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer,
Resource resource,
String stepName) {
Assert.notNull(taskLauncher, "A taskLauncher is required");
Assert.notNull(jobExplorer, "A jobExplorer is required");
Assert.notNull(resource, "A resource is required");
Assert.hasText(stepName, "A step name is required");
this.taskLauncher = taskLauncher;
this.jobExplorer = jobExplorer;
this.resource = resource;
this.stepName = stepName;
}
@Override
public Collection<StepExecution> handle(StepExecutionSplitter stepSplitter,
StepExecution stepExecution) throws Exception {
final Set<StepExecution> tempCandidates =
stepSplitter.split(stepExecution, this.gridSize);
// Following two lines due to https://jira.spring.io/browse/BATCH-2490
final List<StepExecution> candidates = new ArrayList<>(tempCandidates.size());
candidates.addAll(tempCandidates);
int partitions = candidates.size();
log.debug(String.format("%s partitions were returned", partitions));
final Set<StepExecution> executed = new HashSet<>(candidates.size());
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
launchWorkers(candidates, executed);
candidates.removeAll(executed);
return pollReplies(stepExecution, executed, partitions);
}
private void launchWorkers(List<StepExecution> candidates, Set<StepExecution> executed) {
int partitions = candidates.size();
int numWorkers = this.maxWorkers != -1 ? Math.min(this.maxWorkers, partitions) : partitions;
IntStream.range(0, numWorkers).boxed()
.map(i -> candidates.subList(partitionOffset(partitions, numWorkers, i), partitionOffset(partitions, numWorkers, i + 1)))
.filter(not(List::isEmpty))
.forEach(stepExecutions -> processStepExecutions(stepExecutions, executed));
}
private void processStepExecutions(List<StepExecution> stepExecutions, Set<StepExecution> executed) {
launchWorker(stepExecutions);
this.currentWorkers++;
executed.addAll(stepExecutions);
}
private void launchWorker(List<StepExecution> workerStepExecutions) {
List<String> arguments = new ArrayList<>();
StepExecution firstWorkerStepExecution = workerStepExecutions.get(0);
ExecutionContext copyContext = new ExecutionContext(firstWorkerStepExecution.getExecutionContext());
arguments.addAll(
this.commandLineArgsProvider
.getCommandLineArgs(copyContext));
String jobExecutionId = String.valueOf(firstWorkerStepExecution.getJobExecution().getId());
String stepExecutionIds = workerStepExecutions.stream().map(workerStepExecution -> String.valueOf(workerStepExecution.getId())).collect(joining(","));
String taskName = String.format("%s_%s_%s",
taskExecution.getTaskName(),
firstWorkerStepExecution.getJobExecution().getJobInstance().getJobName(),
firstWorkerStepExecution.getStepName());
String parentExecutionId = String.valueOf(taskExecution.getExecutionId());
if(!this.defaultArgsAsEnvironmentVars) {
arguments.add(formatArgument(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
jobExecutionId));
arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS,
stepExecutionIds));
arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_NAME, this.stepName));
arguments.add(formatArgument(SPRING_CLOUD_TASK_NAME, taskName));
arguments.add(formatArgument(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
parentExecutionId));
}
copyContext = new ExecutionContext(firstWorkerStepExecution.getExecutionContext());
log.info("launchWorker context={}", copyContext);
Map<String, String> environmentVariables = this.environmentVariablesProvider.getEnvironmentVariables(copyContext);
if(this.defaultArgsAsEnvironmentVars) {
environmentVariables.put(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
jobExecutionId);
environmentVariables.put(SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
String.valueOf(firstWorkerStepExecution.getId()));
environmentVariables.put(SPRING_CLOUD_TASK_STEP_NAME, this.stepName);
environmentVariables.put(SPRING_CLOUD_TASK_NAME, taskName);
environmentVariables.put(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
parentExecutionId);
}
AppDefinition definition =
new AppDefinition(resolveApplicationName(),
environmentVariables);
AppDeploymentRequest request =
new AppDeploymentRequest(definition,
this.resource,
this.deploymentProperties,
arguments);
taskLauncher.launch(request);
}
private String resolveApplicationName() {
if(StringUtils.hasText(this.applicationName)) {
return this.applicationName;
}
else {
return this.taskExecution.getTaskName();
}
}
private String formatArgument(String key, String value) {
return String.format("--%s=%s", key, value);
}
private Collection<StepExecution> pollReplies(final StepExecution masterStepExecution,
final Set<StepExecution> executed,
final int size) throws Exception {
final Collection<StepExecution> result = new ArrayList<>(executed.size());
Callable<Collection<StepExecution>> callback = new Callable<Collection<StepExecution>>() {
@Override
public Collection<StepExecution> call() {
Set<StepExecution> newExecuted = new HashSet<>();
for (StepExecution curStepExecution : executed) {
if (!result.contains(curStepExecution)) {
StepExecution partitionStepExecution =
jobExplorer.getStepExecution(masterStepExecution.getJobExecutionId(), curStepExecution.getId());
if (isComplete(partitionStepExecution.getStatus())) {
result.add(partitionStepExecution);
currentWorkers--;
}
}
}
executed.addAll(newExecuted);
if (result.size() == size) {
return result;
}
else {
return null;
}
}
};
Poller<Collection<StepExecution>> poller = new DirectPoller<>(this.pollInterval);
Future<Collection<StepExecution>> resultsFuture = poller.poll(callback);
if (timeout >= 0) {
return resultsFuture.get(timeout, TimeUnit.MILLISECONDS);
}
else {
return resultsFuture.get();
}
}
private boolean isComplete(BatchStatus status) {
return status.equals(BatchStatus.COMPLETED) || status.isGreaterThan(BatchStatus.STARTED);
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
@Override
public void afterPropertiesSet() {
Assert.notNull(taskExecution, "A taskExecution is required");
if(this.environmentVariablesProvider == null) {
this.environmentVariablesProvider =
new CloudEnvironmentVariablesProvider(this.environment);
}
if(this.commandLineArgsProvider == null) {
SimpleCommandLineArgsProvider simpleCommandLineArgsProvider = new SimpleCommandLineArgsProvider();
simpleCommandLineArgsProvider.onTaskStartup(taskExecution);
this.commandLineArgsProvider = simpleCommandLineArgsProvider;
}
}
}
Разделы распределяются между рабочими с помощью статической функции partitionOffset, которая гарантирует, что количество разделов, которые получает каждый рабочий, отличается не более чем на единицу:
static int partitionOffset(int length, int numberOfPartitions, int partitionIndex) {
return partitionIndex * (length / numberOfPartitions) + Math.min(partitionIndex, length % numberOfPartitions);
}
На принимающей стороне я создал DeployerMultiStepExecutionHandler, который наследует параллельное выполнение разделов от TaskExecutorPartitionHandler и, кроме того, реализует интерфейс командной строки, соответствующий DeployerMultiPartitionHandler:
@Slf4j
public class DeployerMultiStepExecutionHandler extends TaskExecutorPartitionHandler implements CommandLineRunner {
private JobExplorer jobExplorer;
private JobRepository jobRepository;
private Log logger = LogFactory.getLog(org.springframework.cloud.task.batch.partition.DeployerStepExecutionHandler.class);
@Autowired
private Environment environment;
private StepLocator stepLocator;
public DeployerMultiStepExecutionHandler(BeanFactory beanFactory, JobExplorer jobExplorer, JobRepository jobRepository) {
Assert.notNull(beanFactory, "A beanFactory is required");
Assert.notNull(jobExplorer, "A jobExplorer is required");
Assert.notNull(jobRepository, "A jobRepository is required");
this.stepLocator = new BeanFactoryStepLocator();
((BeanFactoryStepLocator) this.stepLocator).setBeanFactory(beanFactory);
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
}
@Override
public void run(String... args) throws Exception {
validateRequest();
Long jobExecutionId = Long.parseLong(environment.getProperty(SPRING_CLOUD_TASK_JOB_EXECUTION_ID));
Stream<Long> stepExecutionIds = Stream.of(environment.getProperty(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS).split(",")).map(Long::parseLong);
Set<StepExecution> stepExecutions = stepExecutionIds.map(stepExecutionId -> jobExplorer.getStepExecution(jobExecutionId, stepExecutionId)).collect(Collectors.toSet());
log.info("found stepExecutions:\n{}", stepExecutions.stream().map(stepExecution -> stepExecution.getId() + ":" + stepExecution.getExecutionContext()).collect(joining("\n")));
if (stepExecutions.isEmpty()) {
throw new NoSuchStepException(String.format("No StepExecution could be located for step execution id %s within job execution %s", stepExecutionIds, jobExecutionId));
}
String stepName = environment.getProperty(SPRING_CLOUD_TASK_STEP_NAME);
setStep(stepLocator.getStep(stepName));
doHandle(null, stepExecutions);
}
private void validateRequest() {
Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_JOB_EXECUTION_ID), "A job execution id is required");
Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS), "A step execution id is required");
Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_STEP_NAME), "A step name is required");
Assert.isTrue(this.stepLocator.getStepNames().contains(environment.getProperty(SPRING_CLOUD_TASK_STEP_NAME)), "The step requested cannot be found in the provided BeanFactory");
}
}
person
Stefan Reisner
schedule
18.03.2019