1. 環(huán)境準備1.1 引入依賴<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch" />

日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

SpringBatch高階應用:大數(shù)據(jù)批處理框架實戰(zhàn)指南

來源: 責編: 時間:2024-05-07 09:12:51 179觀看
導讀本篇文章主要內(nèi)容:通過Spring Batch從一個庫中讀取數(shù)據(jù)進過處理后寫入到另外一個庫中。
1. 環(huán)境準備1.1 引入依賴<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch

本篇文章主要內(nèi)容:通過Spring Batch從一個庫中讀取數(shù)據(jù)進過處理后寫入到另外一個庫中。
YjW28資訊網(wǎng)——每日最新資訊28at.com

1. 環(huán)境準備

1.1 引入依賴

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-data-jpa</artifactId></dependency>

2.2 配置Job

配置Job啟動器YjW28資訊網(wǎng)——每日最新資訊28at.com

@BeanJobLauncher userJobLauncher(JobRepository userJobRepository) {  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;  jobLauncher.setJobRepository(userJobRepository) ;  return jobLauncher ;}

配置任務Repository存儲元信息YjW28資訊網(wǎng)——每日最新資訊28at.com

@BeanJobRepository userJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) {  JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean() ;  factory.setDatabaseType("mysql") ;  factory.setTransactionManager(transactionManager) ;  factory.setDataSource(dataSource) ;  try {    factory.afterPropertiesSet() ;     return factory.getObject() ;  } catch (Exception e) {    throw new RuntimeException(e) ;  }}

配置ItemReader讀取器YjW28資訊網(wǎng)——每日最新資訊28at.com

@BeanItemReader<User> userReader(JobOperator jobOperator) throws Exception {  JpaPagingItemReaderBuilder<User> builder = new JpaPagingItemReaderBuilder<>() ;  builder.entityManagerFactory(entityManagerFactory) ;  // 每次分頁查詢多少條數(shù)據(jù)  builder.pageSize(10) ;  builder.queryString("select u from User u where u.uid <= 50") ;  builder.saveState(true) ;  builder.name("userReader") ;  return builder.build() ;}

配置數(shù)據(jù)源,該數(shù)據(jù)源是用來寫入操作的YjW28資訊網(wǎng)——每日最新資訊28at.com

public DataSource dataSource() {  HikariDataSource dataSource = new HikariDataSource() ;  dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/testjpa?serverTimezone=GMT%2B8&useSSL=false") ;  dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver") ;  dataSource.setUsername("root") ;  dataSource.setPassword("xxxooo") ;  return dataSource ;}

配置ItemWriter用來寫入操作(當前庫的數(shù)據(jù)寫入到另外一個庫,上面的數(shù)據(jù)源)YjW28資訊網(wǎng)——每日最新資訊28at.com

@BeanItemWriter<User> userWriter() {  // 通過JDBC批量處理  JdbcBatchItemWriterBuilder<User> builder = new JdbcBatchItemWriterBuilder<>() ;  DataSource dataSource = dataSource() ;  builder.dataSource(dataSource) ;  builder.sql("insert into st (id, name, sex, mobile, age, birthday) values (?, ?, ?, ?, ?, ?)") ;  builder.itemPreparedStatementSetter(new ItemPreparedStatementSetter<User>() {    @Override    public void setValues(User item, PreparedStatement ps) throws SQLException {      ps.setInt(1, item.getUid()) ;      ps.setString(2, item.getName()) ;      ps.setString(3, item.getSex()) ;      ps.setString(4, item.getMobile()) ;      ps.setInt(5, item.getAge()) ;      ps.setObject(6, item.getBirthday()) ;    }  }) ;  return builder.build() ;}

配置ItemProcessor處理器,數(shù)據(jù)從當前庫讀取處理后經(jīng)過處理后再寫入另外的庫中YjW28資訊網(wǎng)——每日最新資訊28at.com

@BeanItemProcessor<User, User> userProcessor() {  return new ItemProcessor<User, User>() {    @Override    public User process(User item) throws Exception {      System.out.printf("%s - 開始處理數(shù)據(jù):%s%n", Thread.currentThread().getName(), item.toString()) ;      // 模擬耗時操作      TimeUnit.SECONDS.sleep(1) ;      // 在這里你可以對數(shù)據(jù)進行相應的處理。      return item ;    }  } ;}

配置Step將ItemReader、ItemProcessor、ItemWriter串聯(lián)在一起。YjW28資訊網(wǎng)——每日最新資訊28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")    .<User, User>chunk(5)    .reader(userReader)    .processor(userProcessor)    .writer(userWriter)    .build() ;}

配置Job,Job是封裝整個批處理流程的實體。在 Spring Batch 中,Job只是Step實例的容器。它將邏輯上屬于一個流程的多個步驟組合在一起,并允許對所有步驟的全局屬性(如可重啟性)進行配置。作業(yè)配置包含:YjW28資訊網(wǎng)——每日最新資訊28at.com

  • 簡單的工作名稱。
  • Step實例的定義和排序。
  • Job是否可重新啟動。
@BeanJob userJob(Step userStep1, Step userStep2) {  return jobs.get("userJob").start(userStep1).build();}

以上是Spring Batch定義配置一個Job所需的核心組件。接下來會以上面的基礎配置進行高階知識點進行介紹。YjW28資訊網(wǎng)——每日最新資訊28at.com

2. 高階配置管理

2.1 通過Controller接口啟動Job

@RequestMapping("/userJob")public class UserJobController {  @Resource  private JobLauncher userJobLauncher ;  @GetMapping("/start")  public Object start() throws Exception {    JobParameters jobParameters = new JobParameters() ;    this.userJobLauncher.run(userJob, jobParameters) ;    return "started" ;  }}

通過JobLauncher#run方法啟動Job。當你調(diào)用該接口時,你會發(fā)現(xiàn)接口一直不會返回,一直阻塞,下圖是Job的啟動序列YjW28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片YjW28資訊網(wǎng)——每日最新資訊28at.com

根據(jù)上圖能知道,當你調(diào)用run方法后,會等待整個Job退出狀態(tài)為FINISHED或者FAILED后才能結(jié)束。所以,你需要異步完成,以便 SimpleJobLauncher 立即返回給調(diào)用者。而正確的序列應該是如下:YjW28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片YjW28資訊網(wǎng)——每日最新資訊28at.com

上圖通過異步方式啟動Job序列。
YjW28資訊網(wǎng)——每日最新資訊28at.com

2.2 異步啟動Job

@BeanTaskExecutor taskExecutor() {  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor() ;  taskExecutor.setThreadNamePrefix("spring_batch_launcher") ;  taskExecutor.setCorePoolSize(10) ;  taskExecutor.setMaxPoolSize(10) ;  taskExecutor.initialize() ;   return taskExecutor ;}@BeanJobLauncher userJobLauncher(JobRepository userJobRepository) {  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;  jobLauncher.setJobRepository(userJobRepository) ;  jobLauncher.setTaskExecutor(taskExecutor()) ;  return jobLauncher ;}

通過上面配置后,Job啟動將是異步的會直接返回JobExecution。YjW28資訊網(wǎng)——每日最新資訊28at.com

2.3 重啟Job

當一個Job正在執(zhí)行,由于斷電或者強制終止了程序。當程序恢復后你希望能夠接著程序終止前的進度繼續(xù)執(zhí)行,這時候你需要進行如下的操作(本人沒有發(fā)現(xiàn)有什么API能夠操作的,可能文檔沒看仔細)。
YjW28資訊網(wǎng)——每日最新資訊28at.com

當程序非正常終止是,下面兩張表的狀態(tài)都是STARTED,END_TIME為null
YjW28資訊網(wǎng)——每日最新資訊28at.com

batch_job_execution表YjW28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片YjW28資訊網(wǎng)——每日最新資訊28at.com

batch_step_execution表YjW28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片YjW28資訊網(wǎng)——每日最新資訊28at.com

想要重新啟動必須將上面的狀態(tài)修改為STOPPED,END_TIME字段設置上值(是什么值無所謂)。YjW28資訊網(wǎng)——每日最新資訊28at.com

然后我們就可以繼續(xù)使用上面的Controller接口啟動任務繼續(xù)執(zhí)行了。YjW28資訊網(wǎng)——每日最新資訊28at.com

2.4 多線程執(zhí)行Step

為了加快程序的執(zhí)行,我們可以為Step配置線程池
YjW28資訊網(wǎng)——每日最新資訊28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")    .<User, User>chunk(5)    .reader(userReader)    .processor(userProcessor)    .writer(userWriter)    // 配置線程池    .taskExecutor(taskExecutor())    .build() ;}

注意:Step中使用的任何池化資源(如數(shù)據(jù)源)都可能對并發(fā)性設置限制。請確保這些資源池至少與步驟中所需的并發(fā)線程數(shù)一樣大。YjW28資訊網(wǎng)——每日最新資訊28at.com

通過上面配置線程池后,你將在控制臺看到如下輸出。YjW28資訊網(wǎng)——每日最新資訊28at.com

圖片圖片YjW28資訊網(wǎng)——每日最新資訊28at.com

默認將有4個線程同時進行處理。可以通過如下配置進行調(diào)整YjW28資訊網(wǎng)——每日最新資訊28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")      // ...      // 節(jié)流限制10,這里配置的大小應該與你的數(shù)據(jù)庫連接池大小及使用的線程池核心線程數(shù)一致。      .throttleLimit(10)      .build() ;}

2.5 重復啟動Job

要想重復啟動Job,我們可以在啟動Job時設置不同的JobParameters參數(shù),只要參數(shù)不同那么就可以重復的啟動Job。如下示例:YjW28資訊網(wǎng)——每日最新資訊28at.com

@GetMapping("/start/{page}")public Object start(@PathVariable("page") Long page) throws Exception {  Map<String, JobParameter> parameters = new HashMap<>() ;  // 每次設置的參數(shù)值不同即可。  parameters.put("page", new JobParameter(page)) ;  JobParameters jobParameters = new JobParameters(parameters) ;  this.userJobLauncher.run(userJob, jobParameters) ;  return "started" ;}

以上是本篇文章的全部內(nèi)容,希望對你有幫助。YjW28資訊網(wǎng)——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-87012-0.htmlSpringBatch高階應用:大數(shù)據(jù)批處理框架實戰(zhàn)指南

聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 架構(gòu)設計中如何應對接口級故障?

下一篇: Web Components 取代 Vue?我覺得不太行!

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 南康市| 紫云| 安阳市| 金堂县| 信丰县| 玉溪市| 宁安市| 谷城县| 新津县| 建宁县| 盈江县| 益阳市| 雷州市| 乌拉特后旗| 绩溪县| 永年县| 沅江市| 东丽区| 宜都市| 太保市| 枣阳市| 客服| 海安县| 涿鹿县| 江永县| 万州区| 那曲县| 凉山| 盐城市| 长沙市| 大埔县| 丰城市| 老河口市| 郎溪县| 北流市| 泰和县| 乾安县| 东乌珠穆沁旗| 上饶县| 麟游县| 琼结县|