1. 環境準備1.1 引入依賴<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch" />

日韩成人免费在线_国产成人一二_精品国产免费人成电影在线观..._日本一区二区三区久久久久久久久不

當前位置:首頁 > 科技  > 軟件

SpringBatch高階應用:大數據批處理框架實戰指南

來源: 責編: 時間:2024-05-07 09:12:51 163觀看
導讀本篇文章主要內容:通過Spring Batch從一個庫中讀取數據進過處理后寫入到另外一個庫中。
1. 環境準備1.1 引入依賴<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch

本篇文章主要內容:通過Spring Batch從一個庫中讀取數據進過處理后寫入到另外一個庫中。
LNl28資訊網——每日最新資訊28at.com

1. 環境準備

1.1 引入依賴

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-data-jpa</artifactId></dependency>

2.2 配置Job

配置Job啟動器LNl28資訊網——每日最新資訊28at.com

@BeanJobLauncher userJobLauncher(JobRepository userJobRepository) {  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;  jobLauncher.setJobRepository(userJobRepository) ;  return jobLauncher ;}

配置任務Repository存儲元信息LNl28資訊網——每日最新資訊28at.com

@BeanJobRepository userJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) {  JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean() ;  factory.setDatabaseType("mysql") ;  factory.setTransactionManager(transactionManager) ;  factory.setDataSource(dataSource) ;  try {    factory.afterPropertiesSet() ;     return factory.getObject() ;  } catch (Exception e) {    throw new RuntimeException(e) ;  }}

配置ItemReader讀取器LNl28資訊網——每日最新資訊28at.com

@BeanItemReader<User> userReader(JobOperator jobOperator) throws Exception {  JpaPagingItemReaderBuilder<User> builder = new JpaPagingItemReaderBuilder<>() ;  builder.entityManagerFactory(entityManagerFactory) ;  // 每次分頁查詢多少條數據  builder.pageSize(10) ;  builder.queryString("select u from User u where u.uid <= 50") ;  builder.saveState(true) ;  builder.name("userReader") ;  return builder.build() ;}

配置數據源,該數據源是用來寫入操作的LNl28資訊網——每日最新資訊28at.com

public DataSource dataSource() {  HikariDataSource dataSource = new HikariDataSource() ;  dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/testjpa?serverTimezone=GMT%2B8&useSSL=false") ;  dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver") ;  dataSource.setUsername("root") ;  dataSource.setPassword("xxxooo") ;  return dataSource ;}

配置ItemWriter用來寫入操作(當前庫的數據寫入到另外一個庫,上面的數據源)LNl28資訊網——每日最新資訊28at.com

@BeanItemWriter<User> userWriter() {  // 通過JDBC批量處理  JdbcBatchItemWriterBuilder<User> builder = new JdbcBatchItemWriterBuilder<>() ;  DataSource dataSource = dataSource() ;  builder.dataSource(dataSource) ;  builder.sql("insert into st (id, name, sex, mobile, age, birthday) values (?, ?, ?, ?, ?, ?)") ;  builder.itemPreparedStatementSetter(new ItemPreparedStatementSetter<User>() {    @Override    public void setValues(User item, PreparedStatement ps) throws SQLException {      ps.setInt(1, item.getUid()) ;      ps.setString(2, item.getName()) ;      ps.setString(3, item.getSex()) ;      ps.setString(4, item.getMobile()) ;      ps.setInt(5, item.getAge()) ;      ps.setObject(6, item.getBirthday()) ;    }  }) ;  return builder.build() ;}

配置ItemProcessor處理器,數據從當前庫讀取處理后經過處理后再寫入另外的庫中LNl28資訊網——每日最新資訊28at.com

@BeanItemProcessor<User, User> userProcessor() {  return new ItemProcessor<User, User>() {    @Override    public User process(User item) throws Exception {      System.out.printf("%s - 開始處理數據:%s%n", Thread.currentThread().getName(), item.toString()) ;      // 模擬耗時操作      TimeUnit.SECONDS.sleep(1) ;      // 在這里你可以對數據進行相應的處理。      return item ;    }  } ;}

配置Step將ItemReader、ItemProcessor、ItemWriter串聯在一起。LNl28資訊網——每日最新資訊28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")    .<User, User>chunk(5)    .reader(userReader)    .processor(userProcessor)    .writer(userWriter)    .build() ;}

配置Job,Job是封裝整個批處理流程的實體。在 Spring Batch 中,Job只是Step實例的容器。它將邏輯上屬于一個流程的多個步驟組合在一起,并允許對所有步驟的全局屬性(如可重啟性)進行配置。作業配置包含:LNl28資訊網——每日最新資訊28at.com

  • 簡單的工作名稱。
  • Step實例的定義和排序。
  • Job是否可重新啟動。
@BeanJob userJob(Step userStep1, Step userStep2) {  return jobs.get("userJob").start(userStep1).build();}

以上是Spring Batch定義配置一個Job所需的核心組件。接下來會以上面的基礎配置進行高階知識點進行介紹。LNl28資訊網——每日最新資訊28at.com

2. 高階配置管理

2.1 通過Controller接口啟動Job

@RequestMapping("/userJob")public class UserJobController {  @Resource  private JobLauncher userJobLauncher ;  @GetMapping("/start")  public Object start() throws Exception {    JobParameters jobParameters = new JobParameters() ;    this.userJobLauncher.run(userJob, jobParameters) ;    return "started" ;  }}

通過JobLauncher#run方法啟動Job。當你調用該接口時,你會發現接口一直不會返回,一直阻塞,下圖是Job的啟動序列LNl28資訊網——每日最新資訊28at.com

圖片圖片LNl28資訊網——每日最新資訊28at.com

根據上圖能知道,當你調用run方法后,會等待整個Job退出狀態為FINISHED或者FAILED后才能結束。所以,你需要異步完成,以便 SimpleJobLauncher 立即返回給調用者。而正確的序列應該是如下:LNl28資訊網——每日最新資訊28at.com

圖片圖片LNl28資訊網——每日最新資訊28at.com

上圖通過異步方式啟動Job序列。
LNl28資訊網——每日最新資訊28at.com

2.2 異步啟動Job

@BeanTaskExecutor taskExecutor() {  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor() ;  taskExecutor.setThreadNamePrefix("spring_batch_launcher") ;  taskExecutor.setCorePoolSize(10) ;  taskExecutor.setMaxPoolSize(10) ;  taskExecutor.initialize() ;   return taskExecutor ;}@BeanJobLauncher userJobLauncher(JobRepository userJobRepository) {  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;  jobLauncher.setJobRepository(userJobRepository) ;  jobLauncher.setTaskExecutor(taskExecutor()) ;  return jobLauncher ;}

通過上面配置后,Job啟動將是異步的會直接返回JobExecution。LNl28資訊網——每日最新資訊28at.com

2.3 重啟Job

當一個Job正在執行,由于斷電或者強制終止了程序。當程序恢復后你希望能夠接著程序終止前的進度繼續執行,這時候你需要進行如下的操作(本人沒有發現有什么API能夠操作的,可能文檔沒看仔細)。
LNl28資訊網——每日最新資訊28at.com

當程序非正常終止是,下面兩張表的狀態都是STARTED,END_TIME為null
LNl28資訊網——每日最新資訊28at.com

batch_job_execution表LNl28資訊網——每日最新資訊28at.com

圖片圖片LNl28資訊網——每日最新資訊28at.com

batch_step_execution表LNl28資訊網——每日最新資訊28at.com

圖片圖片LNl28資訊網——每日最新資訊28at.com

想要重新啟動必須將上面的狀態修改為STOPPED,END_TIME字段設置上值(是什么值無所謂)。LNl28資訊網——每日最新資訊28at.com

然后我們就可以繼續使用上面的Controller接口啟動任務繼續執行了。LNl28資訊網——每日最新資訊28at.com

2.4 多線程執行Step

為了加快程序的執行,我們可以為Step配置線程池
LNl28資訊網——每日最新資訊28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")    .<User, User>chunk(5)    .reader(userReader)    .processor(userProcessor)    .writer(userWriter)    // 配置線程池    .taskExecutor(taskExecutor())    .build() ;}

注意:Step中使用的任何池化資源(如數據源)都可能對并發性設置限制。請確保這些資源池至少與步驟中所需的并發線程數一樣大。LNl28資訊網——每日最新資訊28at.com

通過上面配置線程池后,你將在控制臺看到如下輸出。LNl28資訊網——每日最新資訊28at.com

圖片圖片LNl28資訊網——每日最新資訊28at.com

默認將有4個線程同時進行處理。可以通過如下配置進行調整LNl28資訊網——每日最新資訊28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")      // ...      // 節流限制10,這里配置的大小應該與你的數據庫連接池大小及使用的線程池核心線程數一致。      .throttleLimit(10)      .build() ;}

2.5 重復啟動Job

要想重復啟動Job,我們可以在啟動Job時設置不同的JobParameters參數,只要參數不同那么就可以重復的啟動Job。如下示例:LNl28資訊網——每日最新資訊28at.com

@GetMapping("/start/{page}")public Object start(@PathVariable("page") Long page) throws Exception {  Map<String, JobParameter> parameters = new HashMap<>() ;  // 每次設置的參數值不同即可。  parameters.put("page", new JobParameter(page)) ;  JobParameters jobParameters = new JobParameters(parameters) ;  this.userJobLauncher.run(userJob, jobParameters) ;  return "started" ;}

以上是本篇文章的全部內容,希望對你有幫助。LNl28資訊網——每日最新資訊28at.com

本文鏈接:http://www.www897cc.com/showinfo-26-87012-0.htmlSpringBatch高階應用:大數據批處理框架實戰指南

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 架構設計中如何應對接口級故障?

下一篇: Web Components 取代 Vue?我覺得不太行!

標簽:
  • 熱門焦點
Top 主站蜘蛛池模板: 麻城市| 威远县| 昌平区| 鄢陵县| 娄烦县| 房产| 东辽县| 阳高县| 宁乡县| 崇阳县| 三门峡市| 雷州市| 成安县| 乳山市| 明星| 灵川县| 手游| 剑河县| 牡丹江市| 子洲县| 阳城县| 曲阜市| 禄丰县| 孟连| 桦南县| 平潭县| 上蔡县| 会同县| 石棉县| 雷州市| 新余市| 南丰县| 泸溪县| 海安县| 孝义市| 宜君县| 林州市| 太仓市| 平乐县| 遵化市| 炉霍县|