Spring Boot - 批处理服务

您可以创建一个可执行的 JAR 文件,并使用 Maven 或 Gradle 命令运行 Spring Boot 应用程序,如下所示 −

对于 Maven,您可以使用下面给出的命令 −

mvn clean install

在“BUILD SUCCESS”之后,您可以在目标目录下找到 JAR 文件。

对于 Gradle,可以使用如图所示的命令 −

gradle clean build

在“BUILD SUCCESSFUL”之后,您可以在 build/libs 目录下找到 JAR 文件。

使用此处给出的命令运行 JAR 文件 −

java –jar <JARFILE>

现在,应用程序已在 Tomcat 端口 8080 上启动,如图所示。

批处理服务应用程序在 Tomcat 端口上启动

现在,在您的 Web 浏览器中点击 URL http://localhost:8080/ 并连接 Web 套接字并发送问候语并接收消息。

Web Socket 发送和接收消息

Batch 批处理服务是在单个任务中执行多个命令的过程。 在本章中,您将学习如何在 Spring Boot 应用程序中创建批处理服务。

让我们考虑一个将 CSV 文件内容保存到 HSQLDB 的示例。

要创建 Batch Service 程序,我们需要在构建配置文件中添加 Spring Boot Starter Batch 依赖和 HSQLDB 依赖。

Maven 用户可以在 pom.xml 文件中添加以下依赖项。

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
   <groupId>org.hsqldb</groupId>
   <artifactId>hsqldb</artifactId>
</dependency>

Gradle 用户可以在 build.gradle 文件中添加以下依赖项。

compile("org.springframework.boot:spring-boot-starter-batch")
compile("org.hsqldb:hsqldb")

现在,在类路径资源下添加简单的 CSV 数据文件 - src/main/resources 并将文件命名为 file.csv,如图所示 −

William,John
Mike, Sebastian
Lawarance, Lime

接下来为HSQLDB写一个SQL脚本 – classpath 资源目录下 – request_fail_hystrix_timeout

DROP TABLE USERS IF EXISTS;
CREATE TABLE USERS  (
   user_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
   first_name VARCHAR(20),
   last_name VARCHAR(20)
);

为 USERS 模型创建一个 POJO 类,如图所示 −

package com.tutorialspoint.batchservicedemo;
public class User {
   private String lastName;
   private String firstName;

   public User() {
   }
   public User(String firstName, String lastName) {
      this.firstName = firstName;
      this.lastName = lastName;
   }
   public void setFirstName(String firstName) {
      this.firstName = firstName;
   }
   public String getFirstName() {
      return firstName;
   }
   public String getLastName() {
      return lastName;
   }
   public void setLastName(String lastName) {
      this.lastName = lastName;
   }

   @Override
   public String toString() {
      return "firstName: " + firstName + ", lastName: " + lastName;
   }   
}

现在,创建一个中间处理器来执行从 CSV 文件中读取数据之后和将数据写入 SQL 之前的操作。

package com.tutorialspoint.batchservicedemo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;

public class UserItemProcessor implements ItemProcessor<User, User> {
   private static final Logger log = LoggerFactory.getLogger(UserItemProcessor.class);

   @Override
   public User process(final User user) throws Exception {
      final String firstName = user.getFirstName().toUpperCase();
      final String lastName = user.getLastName().toUpperCase();
      final User transformedPerson = new User(firstName, lastName);

      log.info("Converting (" + user + ") into (" + transformedPerson + ")");
      return transformedPerson;
   }
}

让我们创建一个批处理配置文件,从 CSV 读取数据并写入 SQL 文件,如下所示。 我们需要在配置类文件中添加@EnableBatchProcessing 注解。 @EnableBatchProcessing 注解用于启用 Spring Boot 应用程序的批处理操作。

package com.tutorialspoint.batchservicedemo;

import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
   @Autowired
   public JobBuilderFactory jobBuilderFactory;

   @Autowired
   public StepBuilderFactory stepBuilderFactory;

   @Autowired
   public DataSource dataSource;

   @Bean
   public FlatFileItemReader<User> reader() {
      FlatFileItemReader<User> reader = new FlatFileItemReader<User>();
      reader.setResource(new ClassPathResource("file.csv"));
      reader.setLineMapper(new DefaultLineMapper<User>() {
         {
            setLineTokenizer(new DelimitedLineTokenizer() {
               {
                  setNames(new String[] { "firstName", "lastName" });
               }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<User>() {
               {
                  setTargetType(User.class);
               }
            });
         }
      });
      return reader;
   }
   @Bean
   public UserItemProcessor processor() {
      return new UserItemProcessor();
   }
   @Bean
   public JdbcBatchItemWriter<User> writer() {
      JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<User>();
      writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<User>());
      writer.setSql("INSERT INTO USERS (first_name, last_name) VALUES (:firstName, :lastName)");
      writer.setDataSource(dataSource);
      return writer;
   }
   @Bean
   public Job importUserJob(JobCompletionNotificationListener listener) {
      return jobBuilderFactory.get("importUserJob").incrementer(
         new RunIdIncrementer()).listener(listener).flow(step1()).end().build();
   }
   @Bean
   public Step step1() {
      return stepBuilderFactory.get("step1").<User, User>chunk(10).reader(reader()).processor(processor()).writer(writer()).build();
   }
}

reader() 方法用于从 CSV 文件中读取数据,而 writer() 方法用于将数据写入 SQL。

接下来,我们将不得不编写一个 Job Completion Notification Listener 类 - 用于在 Job 完成后进行通知。

package com.tutorialspoint.batchservicedemo;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
   private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
   private final JdbcTemplate jdbcTemplate;

   @Autowired
   public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
      this.jdbcTemplate = jdbcTemplate;
   }
   @Override
   public void afterJob(JobExecution jobExecution) {
      if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
         log.info("!!! JOB FINISHED !! It's time to verify the results!!");

         List<User> results = jdbcTemplate.query(
            "SELECT first_name, last_name FROM USERS", new RowMapper<User>() {
            
            @Override
            public User mapRow(ResultSet rs, int row) throws SQLException {
               return new User(rs.getString(1), rs.getString(2));
            }
         });

         for (User person : results) {
            log.info("Found <" + person + "> in the database.");
         }
      }
   }
}

现在,创建一个可执行 JAR 文件,并使用以下 Maven 或 Gradle 命令运行 Spring Boot 应用程序。

对于 Maven,使用如图所示的命令 −

mvn clean install

在“BUILD SUCCESS”之后,您可以在目标目录下找到 JAR 文件。

对于 Gradle,可以使用如图所示的命令 −

gradle clean build

在“BUILD SUCCESSFUL”之后,您可以在 build/libs 目录下找到 JAR 文件。

使用此处给出的命令运行 JAR 文件 −

java –jar <JARFILE>

您可以在控制台窗口中看到输出,如图所示 −

控制台窗口中的批处理服务输出