Spring Batch - 16. SimpleJob 와 SimpleJobLauncher

출처

목차

SimpleJob

Step 을 실행시킬 수 있는 Job 의 구현체로서 SimpleJobBuilder 에 의해 생성된다.

하나 이상의 Step 으로 구성되며 순차적으로 Step 을 실행시킨다. 모든 Step 의 실행이 성공적으로 완료 돼야 Job 이 최종적으로 성공 상태가 된다.

SimpleJob 메소드

  • start
    • 최초로 한번 설정 한 후 SimpleJobBuilder 가 생성되고 반환된다.
  • next
    • 다음 실행할 step 들을 순차적으로 연결하도록 설정
  • validator
    • Job 실행에 필요한 Parameter 를 검증
    • DefaultJobParametersValidator 구현체를 지원하며 JobParametersValidator 인터페이스를 직접 구현할 수 있다.
  • preventRestart
    • Job 의 재실행을 방지 하는 설정
  • incrementer
    • JobParameter 에서 필요한 값을 증가 시켜 다음에 사용할 JobParameter 오브젝트를 리턴
    • RunIdIncrementer 구현체를 지원하며 JobParametersIncrementer 인터페이스를 직접 구현할 수 있다.

SimpleJob 흐름도

public class SimpleJob extends AbstractJob {

private List<Step> steps = new ArrayList<>();

public SimpleJob() {
this(null);
}

public SimpleJob(String name) {
super(name);
}

public void setSteps(List<Step> steps) {
this.steps.clear();
this.steps.addAll(steps);
}

@Override
public Collection<String> getStepNames() {
List<String> names = new ArrayList<>();
for (Step step : steps) {
names.add(step.getName());

if(step instanceof StepLocator) {
names.addAll(((StepLocator)step).getStepNames());
}
}
return names;
}

public void addStep(Step step) {
this.steps.add(step);
}

@Override
public Step getStep(String stepName) {
for (Step step : this.steps) {
if (step.getName().equals(stepName)) {
return step;
} else if(step instanceof StepLocator) {
Step result = ((StepLocator)step).getStep(stepName);
if(result != null) {
return result;
}
}
}
return null;
}

@Override
protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException,
StartLimitExceededException {

StepExecution stepExecution = null;
for (Step step : steps) {
stepExecution = handleStep(step, execution);
if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
//
// Terminate the job if a step fails
//
break;
}
}

if (stepExecution != null) {
if (logger.isDebugEnabled()) {
logger.debug("Upgrading JobExecution status: " + stepExecution);
}
execution.upgradeStatus(stepExecution.getStatus());
execution.setExitStatus(stepExecution.getExitStatus());
}
}

}

SimpleJobLauncher

SimpleJobLauncher 는 JobLauncher 인터페이스의 구현체로 SimpleJob 을 실행합니다.

public class SimpleJobLauncher implements JobLauncher, InitializingBean {

protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);

private JobRepository jobRepository;

private TaskExecutor taskExecutor;

@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
JobParametersInvalidException {

Assert.notNull(job, "The Job must not be null.");
Assert.notNull(jobParameters, "The JobParameters must not be null.");

final JobExecution jobExecution;

// 마지막으로 실행된 JobExecution 객체가 있는지 확인한다.
JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
if (lastExecution != null) {

// Job Restart 설정을 확인한다.
if (!job.isRestartable()) {
throw new JobRestartException("JobInstance already exists and is not restartable");
}

for (StepExecution execution : lastExecution.getStepExecutions()) {
BatchStatus status = execution.getStatus();
if (status.isRunning() || status == BatchStatus.STOPPING) {
throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
+ lastExecution);
} else if (status == BatchStatus.UNKNOWN) {
throw new JobRestartException(
"Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
+ "The last execution ended with a failure that could not be rolled back, "
+ "so it may be dangerous to proceed. Manual intervention is probably necessary.");
}
}
}

// JobParamter 에 대한 검증을 진행한다.
job.getJobParametersValidator().validate(jobParameters);

jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

try {
taskExecutor.execute(new Runnable() {

@Override
public void run() {
try {
if (logger.isInfoEnabled()) {
logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
+ "]");
}
job.execute(jobExecution);
if (logger.isInfoEnabled()) {
Duration jobExecutionDuration = BatchMetrics.calculateDuration(jobExecution.getStartTime(), jobExecution.getEndTime());
logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
+ "] and the following status: [" + jobExecution.getStatus() + "]"
+ (jobExecutionDuration == null ? "" : " in " + BatchMetrics.formatDuration(jobExecutionDuration)));
}
}
catch (Throwable t) {
if (logger.isInfoEnabled()) {
logger.info("Job: [" + job
+ "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
+ "]", t);
}
rethrow(t);
}
}

private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
});
}
catch (TaskRejectedException e) {
jobExecution.upgradeStatus(BatchStatus.FAILED);
if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
jobRepository.update(jobExecution);
}

return jobExecution;
}

public void setJobRepository(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}

public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}

@Override
public void afterPropertiesSet() throws Exception {
Assert.state(jobRepository != null, "A JobRepository has not been set.");
if (taskExecutor == null) {
logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
taskExecutor = new SyncTaskExecutor();
}
}
}
Share