Back-end

[Spring Batch] DB에서 데이터 읽고 DB에 쓰기

yeobi 2024. 6. 9. 03:03

 

- DB 설정

create database batch;

show databases;

use batch;

show tables;

-- 주문 테이블 생성
CREATE TABLE `batch`.`orders` (
                                         `id` INT NOT NULL AUTO_INCREMENT,
                                         `order_item` VARCHAR(45) NULL,
                                         `price` INT NULL,
                                         `order_date` DATE NULL,
                                         PRIMARY KEY (`id`))
;

-- 정산 테이블 생성
CREATE TABLE `batch`.`accounts` (
                                           `id` INT NOT NULL AUTO_INCREMENT,
                                           `order_item` VARCHAR(45) NULL,
                                           `price` INT NULL,
                                           `order_date` DATE NULL,
                                           `account_date` DATE NULL,
                                           PRIMARY KEY (`id`))
;

INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('카카오 선물', 15000, '2022-03-01');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('배달주문', 18000, '2022-03-01');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('교보문고', 14000, '2022-03-02');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('아이스크림', 3800, '2022-03-03');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('치킨', 21000, '2022-03-04');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('커피', 4000, '2022-03-04');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('교보문고', 13800, '2022-03-05');
INSERT INTO batch.orders(`order_item`, `price`, `order_date`) values ('카카오 선물', 5500, '2022-03-06');

select *
from batch.orders
;

select *
from batch.accounts
;

주문 테이블에서 계산 날짜까지 추가하여 정산 테이블로 데이터 넣는 것이 목표이다.

 

 

- 주문 객체 생성(Orders)

@Data
@Entity
public class Orders {
	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	private Integer id;
	@Column(name = "order_item")
	private String orderItem;
	@Column(name = "price")
	private Integer price;
	@Column(name = "order_date")
	private Date orderDate;
}

 

OrdersRepository

public interface OrdersRepository extends JpaRepository<Orders, Integer> {

}

 

 

- 정산 객체 생성

@NoArgsConstructor
@Data
@Entity
public class Accounts {
	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	private Integer id;
	@Column(name = "order_item")
	private String orderItem;
	@Column(name = "price")
	private Integer price;
	@Column(name = "order_date")
	private Date orderDate;
	@Column(name = "account_date")
	private Date accountDate;



	public Accounts(Orders orders) {
		this.id = orders.getId();
		this.orderItem = orders.getOrderItem();
		this.price = orders.getPrice();
		this.orderDate = orders.getOrderDate();
		this.accountDate = new Date();
	}
}

 

AccountsRepository

public interface AccountsRepository extends JpaRepository<Accounts, Integer> {

}

 

**주의**

@Entity를 작성할 때 꼭 @Column(name = “”) 을 명시해줘야 한다. (안 그럼 인식 못 함)

Spring Batch 5는 @EnableBatchProcessiong 을 직접 커스텀 해줘야 Bean 생성 오류가 일어나지 않는다.

 

 

- BatchAutoConfiguration 클래스

@AutoConfiguration(after = { HibernateJpaAutoConfiguration.class, TransactionAutoConfiguration.class })
@ConditionalOnClass({ JobLauncher.class, DataSource.class, DatabasePopulator.class })
@ConditionalOnBean({ DataSource.class, PlatformTransactionManager.class })
@ConditionalOnMissingBean(value = DefaultBatchConfiguration.class, annotation = EnableBatchProcessing.class)
@EnableConfigurationProperties(BatchProperties.class)
@Import(DatabaseInitializationDependencyConfigurer.class)
public class BatchAutoConfiguration {
	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnProperty(prefix = "spring.batch.job", name = "enabled", havingValue = "true", matchIfMissing = true)
	public JobLauncherApplicationRunner jobLauncherApplicationRunner(JobLauncher jobLauncher, JobExplorer jobExplorer,
		JobRepository jobRepository, BatchProperties properties) {
		JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(jobLauncher, jobExplorer, jobRepository);
		String jobNames = properties.getJob().getName();
		if (StringUtils.hasText(jobNames)) {
			runner.setJobName(jobNames);
		}
		return runner;
	}

	// batchDatasource 사용을 위한 수동 빈 등록
	@Bean
	@ConditionalOnMissingBean(BatchDataSourceScriptDatabaseInitializer.class)
	BatchDataSourceScriptDatabaseInitializer batchDataSourceInitializer(DataSource dataSource,
		@BatchDataSource ObjectProvider<DataSource> batchDataSource, BatchProperties properties) {
		return new BatchDataSourceScriptDatabaseInitializer(batchDataSource.getIfAvailable(() -> dataSource),
			properties.getJdbc());
	}

	@BatchDataSource
	@ConfigurationProperties(prefix = "spring.datasource.batch.hikari")
	@Bean("batchDataSource")
	public DataSource batchDataSource() {
		return DataSourceBuilder.create().type(HikariDataSource.class).build();
	}

	@Bean
	public PlatformTransactionManager batchTransactionManager(
		@Qualifier("batchDataSource") DataSource batchDataSource) {
		return new DataSourceTransactionManager(batchDataSource);
	}
}

 

Main

@EnableBatchProcessing(dataSourceRef = "batchDataSource", transactionManagerRef = "batchTransactionManager")
@Import({BatchAutoConfiguration.class})
@SpringBootApplication
public class SpringbatchApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringbatchApplication.class, args);
	}

}

@EnableBatchProcessing 대신 커스텀한 BatchAutoConfiguration 클래스를 넣어줬다.

 

 

- TrMigrationConfig 클래스

@Configuration
@RequiredArgsConstructor
@EnableBatchProcessing
public class TrMigrationConfig {

	private final OrdersRepository ordersRepository;

	private final AccountsRepository accountsRepository;



	@Bean
	public Job trMigrationJob(JobRepository jobRepository, Step trMigrationStep) {	// job 셍성
		return new JobBuilder("trMigrationJob", jobRepository)	// job의 이름 설정
			.start(trMigrationStep)	// 실행할 step 설정
			.build();
	}


	@Bean
	public Step trMigrationStep(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager, ItemReader trOrdersReader, ItemProcessor trOrdersProcessor, ItemWriter trOrdersWriter) {
		// DB에서 데이터를 Read 후 데이터를 처리하여 DB에 Write
		return new StepBuilder("trMigrationStep", jobRepository)
			.<Orders, Accounts> chunk(5, platformTransactionManager)	// 어떤 데이터로 읽어서 어떤 데이터로 쓸것인지 설정하고 몇 개의 단위로 데이터를 처리할 것인지 chunk의 사이즈 설정
			.reader(trOrdersReader)	// 데이터를 읽어오는 Reader
			.processor(trOrdersProcessor)	// Writer를 하기 전 데이터를 처리할 코드
			.writer(trOrdersWriter)	// 처리한 데이터를 Writer
			// .writer(chunk -> chunk.getItems().forEach(System.out::println))	// 단순 데이터 출력하는 Writer
			.build();	
	}


	@Bean
	public RepositoryItemWriter<Accounts> trOrdersWriter() {
		return new RepositoryItemWriterBuilder<Accounts>()
			.repository(accountsRepository)	// 사용할 repository 설정
			.methodName("save")	// 사용할 메서드 설정
			.build();
	}

  // ItemWriter로 구현해도 된다.
	// @Bean
	// public ItemWriter<Accounts> trOrdersWriter() {
	// 	return new ItemWriter<Accounts>() {
	// 		@Override
	// 		public void write(Chunk<? extends Accounts> chunk) throws Exception {
	// 			chunk.forEach(accountsRepository::save);
	// 		}
	// 	};
	// }


	@Bean
	public ItemProcessor<Orders, Accounts> trOrdersProcessor() {
		return new ItemProcessor<Orders, Accounts>() {
			@Override
			public Accounts process(Orders item) throws Exception {	// 단순이 Orders객체를 Accounts 객체로 변경
				return new Accounts(item);
			}
		};
	}


	@Bean
	public RepositoryItemReader<Orders> trOrdersReader() {
		return new RepositoryItemReaderBuilder<Orders>()
			.name("trOrdersReader")	// Reader의 이름
			.repository(ordersRepository)	// 사용할 repository
			.methodName("findAll")	// 사용할 메서드 이름
			.pageSize(5)	// 가져올 데이터 사이즈, 보통 chunk 사이즈와 읽어올 사이즈를 동일하게 한다.
			.arguments(Arrays.asList())	// 읽어올 데이터에 대한 파라미터가 없기 때문에 빈 배열 설정
			.sorts(Collections.singletonMap("id", Sort.Direction.ASC))	// 오름차순으로 정렬
			.build();
	}

}

스프링 배치에서는 트랜잭션을 chunk라는 단위로 끊어준다.

Chunk 기반의 STEP의 Reader-Processor-Writer 처리 방식

  1. Item Reader : 데이터 베이스에서 데이터를 읽어온다, JDBC, JSON 방식으로 데이터를 읽어올 수 있다.
  2. Item Processor : 읽어온 데이터를 처리한다.
  3. Item Writer : 데이터를 저장한다.

Chunk size : 한 번에 처리될 트랜잭션 단위, Processor에 의해 처리된 데이터가 chunk단위만큼 쌓이면 Writer에 의해 처리

즉, Reader, Processor는 1건씩 다뤄지고 Writer에선 chunk 단위로 처리된다.

 

Page size : 한 번에 조회할 item의 양이다.

 

- 실행을 편하게 하기 위한 Controller 추가

@RestController
@RequestMapping("/batch")
@RequiredArgsConstructor
public class JobLauncherController {

	private final JobLauncher jobLauncher;

	private final Job trMigrationJob;

	@GetMapping("/run-job")
	public String runBatchJob() {
		try {
			JobExecution jobExecution = jobLauncher.run(trMigrationJob, new JobParametersBuilder()
				.addLong("time", System.currentTimeMillis()).toJobParameters());
			return "Job Execution Status: " + jobExecution.getStatus();
		} catch (Exception e) {
			e.printStackTrace();
			return "Job Execution failed: " + e.getMessage();
		}
	}

}

PostMan으로 해당 주소로 GET요청을 보내면 배치 작업이 시작된다.

해당 코드를 추후에 Scheduler 설정에 추가하면 주기적으로 배치작업을 반복할 수 있다.

 

- 결과

쿼리가 정상적으로 나가고

 

accounts 테이블에 데이터가 정상적으로 들어온 것을 확인할 수 있다.