个性化阅读
专注于IT技术分析

Spring Batch教程:使用Spring轻松进行批处理

本文概述

批处理(通常是面向批量的, 非交互的且通常长时间运行的后台执行)在几乎每个行业中都得到广泛使用, 并应用于各种各样的任务。批处理可以是数据密集型或计算密集型的, 可以顺序执行或并行执行, 并且可以通过各种调用模型(包括临时, 计划和按需)启动。

这个Spring Batch教程一般性地解释了批处理应用程序的编程模型和领域语言, 并且特别展示了一些使用当前Spring Batch 3.0.7版本进行批处理应用程序设计和开发的有用方法。

什么是Spring Batch?

Spring Batch是一个轻量级的, 全面的框架, 旨在促进健壮的批处理应用程序的开发。它还提供了更高级的技术服务和功能, 通过其优化和分区技术支持超大量和高性能的批处理作业。 Spring Batch建立在Spring框架基于POJO的开发方法之上, 所有有经验的Spring开发人员都熟悉。

作为示例, 本文考虑了一个示例项目的源代码, 该示例项目加载XML格式的客户文件, 按各种属性过滤客户, 并将过滤后的条目输出到文本文件。我们的Spring Batch示例(利用Lombok批注)的源代码在GitHub上可用, 并且需要Java SE 8和Maven。

什么是批处理?关键概念和术语

对于任何批处理开发人员来说, 熟悉并熟悉批处理的主要概念都非常重要。下图是批处理参考体系结构的简化版本, 已在许多不同平台上进行了数十年的实践证明。它介绍了Spring Batch使用的与批处理相关的关键概念和术语。

Spring Batch教程:关键概念和术语

如我们的批处理示例所示, 批处理通常由包含多个步骤的Job封装。每个步骤通常都有一个ItemReader, ItemProcessor和ItemWriter。 Job由JobLauncher执行, 有关已配置和已执行作业的元数据存储在JobRepository中。

每个作业可以与多个JobInstances相关联, 每个JobInstances由用于启动批处理作业的其特定JobParameters唯一定义。 JobInstance的每次运行都称为JobExecution。每个JobExecution通常会跟踪运行过程中发生的情况, 例如当前和退出状态, 开始和结束时间等。

步骤是批处理作业的一个独立的特定阶段, 因此每个作业都由一个或多个步骤组成。与作业类似, 步骤具有一个单独的StepExecution, 它代表执行步骤的单次尝试。 StepExecution存储有关当前和退出状态, 开始和结束时间等信息, 以及对相应的Step和JobExecution实例的引用。

ExecutionContext是一组键-值对, 其中包含范围为StepExecution或JobExecution的信息。 Spring Batch会保留ExecutionContext, 这在你要重新启动批处理运行的情况下(例如, 发生致命错误时等)很有用。所需要做的只是将要在步骤之间共享的任何对象放入上下文中, 框架将负责其余的工作。重新启动后, 来自数据库的ExecutionContext的值将被还原并应用。

JobRepository是Spring Batch中的机制, 可以实现所有这些持久性。它为JobLauncher, Job和Step实例提供CRUD操作。启动作业后, 将从存储库中获取JobExecution, 并在执行过程中将StepExecution和JobExecution实例持久保存到存储库中。

Spring Batch Framework入门

Spring Batch的优点之一是项目依赖性最小, 这使得更容易快速启动和运行。在项目的pom.xml中明确指定并说明了确实存在的一些依赖项, 可在此处访问。

应用程序的实际启动发生在类似于以下内容的类中:

@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {
    public static void main(String[] args) {
        prepareTestData(1000);
        SpringApplication.run(BatchApplication.class, args);
    }
}

@EnableBatchProcessing批注启用Spring Batch功能, 并提供用于设置批处理作业的基本配置。

@SpringBootApplication批注来自Spring Boot项目, 该项目提供独立的, 可生产的, 基于Spring的应用程序。它指定一个配置类, 该类声明一个或多个Spring Bean, 并触发自动配置和Spring的组件扫描。

我们的示例项目只有一个由CustomerReportJobConfig配置的作业, 其中注入了JobBuilderFactory和StepBuilderFactory。可以在CustomerReportJobConfig中定义最小作业配置, 如下所示:

@Configuration
public class CustomerReportJobConfig {
    @Autowired
    private JobBuilderFactory jobBuilders;

    @Autowired
    private StepBuilderFactory stepBuilders;

    @Bean
    public Job customerReportJob() {
        return jobBuilders.get("customerReportJob")
            .start(taskletStep())
            .next(chunkStep())
            .build();
    }

    @Bean
    public Step taskletStep() {
        return stepBuilders.get("taskletStep")
            .tasklet(tasklet())
            .build();
    }

    @Bean
    public Tasklet tasklet() {
        return (contribution, chunkContext) -> {
            return RepeatStatus.FINISHED;
        };
    }
}

建立步骤有两种主要方法。

如上例所示, 一种方法是基于任务的。 Tasklet支持仅具有一个方法execute()的简单接口, 该方法被重复调用, 直到它返回RepeatStatus.FINISHED或引发异常以指示失败。对Tasklet的每次调用都包装在一个事务中。

另一方法是面向块的处理, 是指顺序读取数据并创建将在事务边界内写出的”块”。从ItemReader中读取每个单独的项目, 将其交给ItemProcessor进行汇总。一旦读取的项目数等于提交间隔, 就通过ItemWriter写入整个块, 然后提交事务。面向块的步骤可以配置如下:

@Bean
public Job customerReportJob() {
    return jobBuilders.get("customerReportJob")
        .start(taskletStep())
        .next(chunkStep())
        .build();
}

@Bean
public Step chunkStep() {
    return stepBuilders.get("chunkStep")
        .<Customer, Customer>chunk(20)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .build();
}

chunk()方法构建一个步骤, 以提供的大小处理块中的项目, 然后将每个块传递给指定的读取器, 处理器和写入器。本文的下一部分将更详细地讨论这些方法。

自定义阅读器

对于我们的Spring Batch示例应用程序, 为了从XML文件读取客户列表, 我们需要提供org.springframework.batch.item.ItemReader接口的实现:

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

ItemReader提供数据, 并且应该是有状态的。通常每个批次都会多次调用它, 每次调用read()都会返回下一个值, 并在所有输入数据都用完后最终返回null。

Spring Batch提供了一些ItemReader的即用型实现, 可以用于多种目的, 例如读取集合, 文件, 集成JMS和JDBC以及多个源等等。

在我们的示例应用程序中, CustomerItemReader类将实际的read()调用委托给IteratorItemReader类的延迟初始化的实例:

public class CustomerItemReader implements ItemReader<Customer> {

    private final String filename;

    private ItemReader<Customer> delegate;

    public CustomerItemReader(final String filename) {
        this.filename = filename;
    }

    @Override
    public Customer read() throws Exception {
        if (delegate == null) {
            delegate = new IteratorItemReader<>(customers());
        }
        return delegate.read();
    }

    private List<Customer> customers() throws FileNotFoundException {
        try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) {
            return (List<Customer>) decoder.readObject();
        }
    }
}

使用@Component和@StepScope批注创建用于此实现的Spring bean, 让Spring知道该类是步进作用域的Spring组件, 并且每执行一次将创建一次, 如下所示:

@StepScope
@Bean
public ItemReader<Customer> reader() {
    return new CustomerItemReader(XML_FILE);
}

定制处理器

ItemProcessor在面向项目的处理场景中转换输入项目并引入业务逻辑。他们必须提供接口org.springframework.batch.item.ItemProcessor的实现:

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

方法process()接受I类的一个实例, 并且可能会或可能不会返回相同类型的实例。返回null表示不应继续处理该项目。像往常一样, Spring提供的标准处理器很少, 例如CompositeItemProcessor, 它通过一系列注入的ItemProcessor传递项目, 而ValidatingItemProcessor可以验证输入。

在我们的示例应用程序中, 处理器用于满足以下要求来过滤客户:

  • 客户必须在当月出生(例如, 标记特殊生日礼物等)
  • 客户完成的交易必须少于五笔(例如, 识别新客户)

“当前月份”要求是通过自定义ItemProcessor实现的:

public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> {
    @Override
    public Customer process(final Customer item) throws Exception {
        if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) {
            return item;
        }
        return null;
    }
}

“有限数量的交易”要求被实现为ValidatingItemProcessor:

public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> {
    public TransactionValidatingProcessor(final int limit) {
        super(
            item -> {
                if (item.getTransactions() >= limit) {
                    throw new ValidationException("Customer has less than " + limit + " transactions");
                }
            }
        );
        setFilter(true);
    }
}

然后将这对处理器封装在实现委托模式的CompositeItemProcessor中:

@StepScope
@Bean
public ItemProcessor<Customer, Customer> processor() {
    final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>();
    processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5)));
    return processor;
}

定制作家

为了输出数据, Spring Batch提供了org.springframework.batch.item.ItemWriter接口, 用于根据需要序列化对象:

public interface ItemWriter<T> {
    void write(List<? extends T> items) throws Exception;
}

write()方法负责确保刷新所有内部缓冲区。如果事务处于活动状态, 通常还需要在随后的回滚中丢弃输出。编写器向其发送数据的资源通常应该能够自行处理。有一些标准实现, 例如CompositeItemWriter, JdbcBatchItemWriter, JmsItemWriter, JpaItemWriter, SimpleMailMessageItemWriter等。

在我们的示例应用程序中, 过滤出的客户列表如下所示:

public class CustomerItemWriter implements ItemWriter<Customer>, Closeable {
    private final PrintWriter writer;

    public CustomerItemWriter() {
        OutputStream out;
        try {
            out = new FileOutputStream("output.txt");
        } catch (FileNotFoundException e) {
            out = System.out;
        }
        this.writer = new PrintWriter(out);
    }

    @Override
    public void write(final List<? extends Customer> items) throws Exception {
        for (Customer item : items) {
            writer.println(item.toString());
        }
    }

    @PreDestroy
    @Override
    public void close() throws IOException {
        writer.close();
    }
}

调度Spring Batch作业

默认情况下, Spring Batch执行启动时可以找到的所有作业(即在CustomerReportJobConfig中配置的作业)。若要更改此行为, 请通过在application.properties中添加以下属性来禁用启动时的作业执行:

spring.batch.job.enabled=false

然后, 通过将@EnableScheduling批注添加到配置类, 并将@Scheduled批注添加到执行作业本身的方法来实现实际的调度。可以使用延迟, 速率或cron表达式配置调度:

// run every 5000 msec (i.e., every 5 secs)
@Scheduled(fixedRate = 5000)
public void run() throws Exception {
    JobExecution execution = jobLauncher.run(
        customerReportJob(), new JobParametersBuilder().toJobParameters()
    );
}

上面的例子有一个问题。在运行时, 作业将仅在第一次成功。当它第二次启动时(即五秒钟后), 它将在日志中生成以下消息(请注意, 在以前的Spring Batch版本中, 将抛出JobInstanceAlreadyCompleteException):

INFO 36988 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}]
INFO 36988 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler     : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=
INFO 36988 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler     : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

发生这种情况是因为只能创建和执行唯一的JobInstance, 而Spring Batch无法区分第一个JobInstance和第二个JobInstance。

计划批处理作业时, 有两种方法可以避免此问题。

一定要为每个作业引入一个或多个唯一参数(例如, 实际的开始时间以纳秒为单位):

@Scheduled(fixedRate = 5000)
public void run() throws Exception {
    jobLauncher.run(
        customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters()
    );
}

或者, 你可以使用SimpleJobOperator.startNextInstance()在由附加到指定作业的JobParametersIncrementer确定的JobInstances序列中启动下一个作业:

@Autowired
private JobOperator operator;
 
@Autowired
private JobExplorer jobs;
 
@Scheduled(fixedRate = 5000)
public void run() throws Exception {
    List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1);
    if (lastInstances.isEmpty()) {
        jobLauncher.run(customerReportJob(), new JobParameters());
    } else {
        operator.startNextInstance(JOB_NAME);
    }
}

Spring批处理单元测试

通常, 要在Spring Boot应用程序中运行单元测试, 框架必须加载相应的ApplicationContext。为此使用了两个注释:

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {...})

有一个实用程序类org.springframework.batch.test.JobLauncherTestUtils测试批处理作业。它提供了启动整个作业的方法, 并允许对单个步骤进行端到端测试, 而不必运行作业中的每个步骤。必须将其声明为Spring bean:

@Configuration
public class BatchTestConfiguration {
    @Bean
    public JobLauncherTestUtils jobLauncherTestUtils() {
        return new JobLauncherTestUtils();
    }
}

一个工作和一个步骤的典型测试如下所示(并且也可以使用任何模拟框架):

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class})
public class CustomerReportJobConfigTest {

    @Autowired
    private JobLauncherTestUtils testUtils;

    @Autowired
    private CustomerReportJobConfig config;

    @Test
    public void testEntireJob() throws Exception {
        final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters());
        Assert.assertNotNull(result);
        Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus());
    }

    @Test
    public void testSpecificStep() {
        Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus());
    }
}

Spring Batch为步骤和作业上下文引入了其他范围。这些作用域中的对象将Spring容器用作对象工厂, 因此每个执行步骤或作业每个此类bean只有一个实例。此外, 还提供了对从StepContext或JobContext访问的引用的后期绑定的支持。在运行时配置为分步或作业范围的组件很难作为独立组件进行测试, 除非你有办法像设置步长或执行作业那样设置上下文。这是Spring Batch中org.springframework.batch.test.StepScopeTestExecutionListener和org.springframework.batch.test.StepScopeTestUtils组件以及JobScopeTestExecutionListener和JobScopeTestUtils的目标。

TestExecutionListeners在类级别上声明, 其工作是为每个测试方法创建步骤执行上下文。例如:

@RunWith(SpringRunner.class)
@TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class})
@ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class})
public class BirthdayFilterProcessorTest {

    @Autowired
    private BirthdayFilterProcessor processor;

    public StepExecution getStepExecution() {
        return MetaDataInstanceFactory.createStepExecution();
    }

    @Test
    public void filter() throws Exception {
        final Customer customer = new Customer();
        customer.setId(1);
        customer.setName("name");
        customer.setBirthday(new GregorianCalendar());
        Assert.assertNotNull(processor.process(customer));
    }

}

有两个TestExecutionListener。一种来自常规的Spring Test框架, 它处理来自配置的应用程序上下文的依赖项注入。另一个是Spring Batch StepScopeTestExecutionListener, 它为将依赖项注入到单元测试中设置了步骤范围上下文。在测试方法期间将创建一个StepContext, 并使其可用于所注入的任何依赖项。默认行为是仅创建具有固定属性的StepExecution。或者, 测试用例可以将StepContext作为返回正确类型的工厂方法来提供。

另一种方法是基于StepScopeTestUtils实用程序类。此类用于在单元测试中以更灵活的方式创建和操作StepScope, 而无需使用依赖项注入。例如, 可以按以下方式读取由上面的处理器过滤的客户的ID:

@Test
public void filterId() throws Exception {
    final Customer customer = new Customer();
    customer.setId(1);
    customer.setName("name");
    customer.setBirthday(new GregorianCalendar());
    final int id = StepScopeTestUtils.doInStepScope(
        getStepExecution(), () -> processor.process(customer).getId()
    );
    Assert.assertEquals(1, id);
}

准备好进行高级Spring Batch了吗?

本文介绍了Spring Batch应用程序设计和开发的一些基础知识。但是, 本文没有涉及许多更高级的主题和功能, 例如缩放, 并行处理, 侦听器等。希望本文为入门提供了有用的基础。

有关这些更高级主题的信息, 可以在Spring Batch的官方Spring Back文档中找到。

赞(1)
未经允许不得转载:srcmini » Spring Batch教程:使用Spring轻松进行批处理

评论 抢沙发

评论前必须登录!