Back-end
[Spring Batch] DB에서 데이터 읽고 DB에 쓰기
yeobi
2024. 6. 9. 03:03
- DB 설정
create database batch;
show databases;
use batch;
show tables;
-- 주문 테이블 생성
CREATE TABLE `batch`.`orders` (
`id` INT NOT NULL AUTO_INCREMENT,
`order_item` VARCHAR(45) NULL,
`price` INT NULL,
`order_date` DATE NULL,
PRIMARY KEY (`id`))
;
-- 정산 테이블 생성
CREATE TABLE `batch`.`accounts` (
`id` INT NOT NULL AUTO_INCREMENT,
`order_item` VARCHAR(45) NULL,
`price` INT NULL,
`order_date` DATE NULL,
`account_date` DATE NULL,
PRIMARY KEY (`id`))
;
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('카카오 선물', 15000, '2022-03-01');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('배달주문', 18000, '2022-03-01');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('교보문고', 14000, '2022-03-02');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('아이스크림', 3800, '2022-03-03');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('치킨', 21000, '2022-03-04');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('커피', 4000, '2022-03-04');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('교보문고', 13800, '2022-03-05');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('카카오 선물', 5500, '2022-03-06');
select *
from batch.orders
;
select *
from batch.accounts
;
주문 테이블에서 계산 날짜까지 추가하여 정산 테이블로 데이터 넣는 것이 목표이다.
- 주문 객체 생성(Orders)
@Data
@Entity
public class Orders {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
@Column(name = "order_item")
private String orderItem;
@Column(name = "price")
private Integer price;
@Column(name = "order_date")
private Date orderDate;
}
OrdersRepository
public interface OrdersRepository extends JpaRepository<Orders, Integer> {
}
- 정산 객체 생성
@NoArgsConstructor
@Data
@Entity
public class Accounts {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
@Column(name = "order_item")
private String orderItem;
@Column(name = "price")
private Integer price;
@Column(name = "order_date")
private Date orderDate;
@Column(name = "account_date")
private Date accountDate;
public Accounts(Orders orders) {
this.id = orders.getId();
this.orderItem = orders.getOrderItem();
this.price = orders.getPrice();
this.orderDate = orders.getOrderDate();
this.accountDate = new Date();
}
}
AccountsRepository
public interface AccountsRepository extends JpaRepository<Accounts, Integer> {
}
**주의**
@Entity를 작성할 때 꼭 @Column(name = “”) 을 명시해줘야 한다. (안 그럼 인식 못 함)
Spring Batch 5는 @EnableBatchProcessiong 을 직접 커스텀 해줘야 Bean 생성 오류가 일어나지 않는다.
- BatchAutoConfiguration 클래스
@AutoConfiguration(after = { HibernateJpaAutoConfiguration.class, TransactionAutoConfiguration.class })
@ConditionalOnClass({ JobLauncher.class, DataSource.class, DatabasePopulator.class })
@ConditionalOnBean({ DataSource.class, PlatformTransactionManager.class })
@ConditionalOnMissingBean(value = DefaultBatchConfiguration.class, annotation = EnableBatchProcessing.class)
@EnableConfigurationProperties(BatchProperties.class)
@Import(DatabaseInitializationDependencyConfigurer.class)
public class BatchAutoConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "spring.batch.job", name = "enabled", havingValue = "true", matchIfMissing = true)
public JobLauncherApplicationRunner jobLauncherApplicationRunner(JobLauncher jobLauncher, JobExplorer jobExplorer,
JobRepository jobRepository, BatchProperties properties) {
JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(jobLauncher, jobExplorer, jobRepository);
String jobNames = properties.getJob().getName();
if (StringUtils.hasText(jobNames)) {
runner.setJobName(jobNames);
}
return runner;
}
// batchDatasource 사용을 위한 수동 빈 등록
@Bean
@ConditionalOnMissingBean(BatchDataSourceScriptDatabaseInitializer.class)
BatchDataSourceScriptDatabaseInitializer batchDataSourceInitializer(DataSource dataSource,
@BatchDataSource ObjectProvider<DataSource> batchDataSource, BatchProperties properties) {
return new BatchDataSourceScriptDatabaseInitializer(batchDataSource.getIfAvailable(() -> dataSource),
properties.getJdbc());
}
@BatchDataSource
@ConfigurationProperties(prefix = "spring.datasource.batch.hikari")
@Bean("batchDataSource")
public DataSource batchDataSource() {
return DataSourceBuilder.create().type(HikariDataSource.class).build();
}
@Bean
public PlatformTransactionManager batchTransactionManager(
@Qualifier("batchDataSource") DataSource batchDataSource) {
return new DataSourceTransactionManager(batchDataSource);
}
}
Main
@EnableBatchProcessing(dataSourceRef = "batchDataSource", transactionManagerRef = "batchTransactionManager")
@Import({BatchAutoConfiguration.class})
@SpringBootApplication
public class SpringbatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbatchApplication.class, args);
}
}
@EnableBatchProcessing 대신 커스텀한 BatchAutoConfiguration 클래스를 넣어줬다.
- TrMigrationConfig 클래스
@Configuration
@RequiredArgsConstructor
@EnableBatchProcessing
public class TrMigrationConfig {
private final OrdersRepository ordersRepository;
private final AccountsRepository accountsRepository;
@Bean
public Job trMigrationJob(JobRepository jobRepository, Step trMigrationStep) { // job 셍성
return new JobBuilder("trMigrationJob", jobRepository) // job의 이름 설정
.start(trMigrationStep) // 실행할 step 설정
.build();
}
@Bean
public Step trMigrationStep(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager, ItemReader trOrdersReader, ItemProcessor trOrdersProcessor, ItemWriter trOrdersWriter) {
// DB에서 데이터를 Read 후 데이터를 처리하여 DB에 Write
return new StepBuilder("trMigrationStep", jobRepository)
.<Orders, Accounts> chunk(5, platformTransactionManager) // 어떤 데이터로 읽어서 어떤 데이터로 쓸것인지 설정하고 몇 개의 단위로 데이터를 처리할 것인지 chunk의 사이즈 설정
.reader(trOrdersReader) // 데이터를 읽어오는 Reader
.processor(trOrdersProcessor) // Writer를 하기 전 데이터를 처리할 코드
.writer(trOrdersWriter) // 처리한 데이터를 Writer
// .writer(chunk -> chunk.getItems().forEach(System.out::println)) // 단순 데이터 출력하는 Writer
.build();
}
@Bean
public RepositoryItemWriter<Accounts> trOrdersWriter() {
return new RepositoryItemWriterBuilder<Accounts>()
.repository(accountsRepository) // 사용할 repository 설정
.methodName("save") // 사용할 메서드 설정
.build();
}
// ItemWriter로 구현해도 된다.
// @Bean
// public ItemWriter<Accounts> trOrdersWriter() {
// return new ItemWriter<Accounts>() {
// @Override
// public void write(Chunk<? extends Accounts> chunk) throws Exception {
// chunk.forEach(accountsRepository::save);
// }
// };
// }
@Bean
public ItemProcessor<Orders, Accounts> trOrdersProcessor() {
return new ItemProcessor<Orders, Accounts>() {
@Override
public Accounts process(Orders item) throws Exception { // 단순이 Orders객체를 Accounts 객체로 변경
return new Accounts(item);
}
};
}
@Bean
public RepositoryItemReader<Orders> trOrdersReader() {
return new RepositoryItemReaderBuilder<Orders>()
.name("trOrdersReader") // Reader의 이름
.repository(ordersRepository) // 사용할 repository
.methodName("findAll") // 사용할 메서드 이름
.pageSize(5) // 가져올 데이터 사이즈, 보통 chunk 사이즈와 읽어올 사이즈를 동일하게 한다.
.arguments(Arrays.asList()) // 읽어올 데이터에 대한 파라미터가 없기 때문에 빈 배열 설정
.sorts(Collections.singletonMap("id", Sort.Direction.ASC)) // 오름차순으로 정렬
.build();
}
}
스프링 배치에서는 트랜잭션을 chunk라는 단위로 끊어준다.
Chunk 기반의 STEP의 Reader-Processor-Writer 처리 방식
- Item Reader : 데이터 베이스에서 데이터를 읽어온다, JDBC, JSON 방식으로 데이터를 읽어올 수 있다.
- Item Processor : 읽어온 데이터를 처리한다.
- Item Writer : 데이터를 저장한다.
Chunk size : 한 번에 처리될 트랜잭션 단위, Processor에 의해 처리된 데이터가 chunk단위만큼 쌓이면 Writer에 의해 처리
즉, Reader, Processor는 1건씩 다뤄지고 Writer에선 chunk 단위로 처리된다.
Page size : 한 번에 조회할 item의 양이다.
- 실행을 편하게 하기 위한 Controller 추가
@RestController
@RequestMapping("/batch")
@RequiredArgsConstructor
public class JobLauncherController {
private final JobLauncher jobLauncher;
private final Job trMigrationJob;
@GetMapping("/run-job")
public String runBatchJob() {
try {
JobExecution jobExecution = jobLauncher.run(trMigrationJob, new JobParametersBuilder()
.addLong("time", System.currentTimeMillis()).toJobParameters());
return "Job Execution Status: " + jobExecution.getStatus();
} catch (Exception e) {
e.printStackTrace();
return "Job Execution failed: " + e.getMessage();
}
}
}
PostMan으로 해당 주소로 GET요청을 보내면 배치 작업이 시작된다.
해당 코드를 추후에 Scheduler 설정에 추가하면 주기적으로 배치작업을 반복할 수 있다.
- 결과
쿼리가 정상적으로 나가고
accounts 테이블에 데이터가 정상적으로 들어온 것을 확인할 수 있다.