Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 스프링 배치
- 타입스크립트
- 알고리즘
- 디자인패턴
- Spring Cloud Netfilx Eureka
- The law of Demeter
- Action Pattern
- 배치
- thread
- 사가 패턴
- zipkin
- Parallel Old GC
- 스레드
- 키클락
- Resilinece4j
- Serial GC
- 생산자 소비자 패턴
- Transaction Pattern
- java 정렬
- spring cloud
- 멀티스레드
- TypeScript
- JPA
- MSA
- Java
- saga pattern
- 체인 패턴
- Spring Boot Actuator
- 디자인 패턴
- spring batch
Archives
- Today
- Total
PSD( Private-Self-Development )
Spring Batch 멀티 스레드 프로세싱 본문
멀티 스레드?
특정 작업을 처리하는 스레드가 단일이 아닌 여래 개 일 경우 멀티 스레드 라고 한다.
일반적으로 복잡한 처리나 대용량 데이터 처리를 할 때, 소요 시간 및 성능상의 이점을 위해 사용한다.
단, 데이터 동기화 에 신경 써줘야 하기 때문에 신중히 사용해야 한다.
Spring Batch 에서의 멀티 스레드
마찬가지로 기본적으로 단일 스레드 방식으로 작업을 처리한다.
멀티 스레드를 위한 비동기 처리 및 Scale out 기능을 제공한다.
Local( 서버 내부적 ) 과 Remote( 외부 서버적 ) 처리 지원
멀티 스레드 처리 종류
1. AsyncItemProcessor / AsyncItemWriter
- ItemProcessor 에게 별도의 스레드가 할당되어 작업을 처리하는 방식
- Step 내부 ItemProcessor 가 비동기적으로 동작하는 구조
- AsyncItemProcessor 와 AsyncItemWriter( ItemProcessor의 비동기 처리가 다 끝나면 실행 ) 가 같이 구성되어야 함
- ItemReader 은 비동기 실행이 아니다.
예시
@Bean
public Step asyncStep() throws Exception {
return stepBuilderFactory.get("asyncStep")
.chunk(100)
.reader(pagingItemReader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}
@Bean
public JdbcPagingItemReader<Customer> pagingItemReader() {
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setPageSize(100);
reader.setRowMapper(new CustomerRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public ItemProcessor customItemProcessor() {
return new ItemProcessor<Customer, Customer>() {
@Override
public Customer process(Customer item) throws Exception {
return new Customer(item.getId(),
item.getFirstName().toUpperCase(),
item.getLastName().toUpperCase(),
item.getBirthdate());
}
};
}
@Bean
public JdbcBatchItemWriter customItemWriter() {
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(this.dataSource);
itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
return itemWriter;
}
@Bean
public AsyncItemProcessor asyncItemProcessor() throws Exception {
AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setDelegate(customItemProcessor());
// asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());// 스레드를 재사용하고 관리하는 것이 아닌 스레드를 계속 만들어 낸다! 자원을 많이 잡아먹기 때문에 사용하지 말자..
asyncItemProcessor.setTaskExecutor(taskExecutor()); // 스레드풀을 생성해 원하는 만큼 제어 가능하다. => 자원을 효율적으로 사용한다.
return asyncItemProcessor;
}
@Bean
public AsyncItemWriter asyncItemWriter() throws Exception {
AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(customItemWriter());
return asyncItemWriter;
}
@Bean
public TaskExecutor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4); // 동시에 실행 시킬 스레드 개수
executor.setMaxPoolSize(8); // 최대 스레드 갯수 Queue 가 꽉 차게 되면 최대 스레드 갯수 만큼 생성한다.
executor.setThreadNamePrefix("async-thread-");
return executor;
}
2. Multi-threaded Step
- Step 내 Chunk 구조인 ItemReader, ItemProcessor, ItemWriter 마다 멀티 스레드가 할당되어 실행
- ItemReader 가 Thread-safe 하도록 동기화 보장이 되어야 한다.(중복으로 데이터 조회하면 안되기 때문)
- 스레드마다 새로운 Chunk 가 할당되어 데이터 동기화 보정
- A 스레드는 A Chunk 를 할당 받아 read => processe => write 진행
B 스레드는 B Chunk 를 할당 받아 진행하고 서로 공유하지 않는다.
- A 스레드는 A Chunk 를 할당 받아 read => processe => write 진행
예시
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(100)
.reader(pagingItemReader())
.processor((ItemProcessor<Customer, Customer>) item -> item)
.writer(customItemWriter())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public JdbcPagingItemReader<Customer> pagingItemReader() {
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>(); // thread safe 한 Reader 하다( JpaPagingItemReader 동일 )
reader.setDataSource(this.dataSource);
reader.setPageSize(100);
reader.setRowMapper(new CustomerRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public JdbcBatchItemWriter customItemWriter() {
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(this.dataSource);
itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
itemWriter.afterPropertiesSet();
return itemWriter;
}
@Bean
public TaskExecutor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setThreadNamePrefix("async-thread-");
return executor;
}
3. Parallel Steps
- Flow 마다 스레드가 할당되어 여러개의 Flow을 병렬로 실행하는 방법
- SplitState 를 사용해서 여러 개의 Flow 를 병렬로 실행하는 구조이다.
- 실행 완료 후 결과들을 취합하여 다음 단계 를 결정한다.
예시
@Bean
public Job job() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("split");
Flow flow = flowBuilder.split(taskExecutor())
.add(flow1(), flow2())
.end();
return jobBuilderFactory.get("batchJob")
.incrementer(new RunIdIncrementer())
.start(flow1())
.split(taskExecutor()).add(flow2()) // flow1, flow2 를 합치고 스레드 할당
.next(flow3()) // 앞선 모든 플로우가 끝나고 실행된다.
.next(flow) // flow1, flow2 를 합치고 스레드 할당
.end()
.listener(new StopWatchJobListener())
.build();
}
4. Partitioning
- Master/Slave 방식 Master 가 데이터를 파티셔닝( 그리드 사이즈 만큼 ) 하고 각 파티션에 스레드를 할당하는 방식
- MasterStep 은 PartitionStep 이며 SlaveStep 은 TaskletStep, FlowStep 등이 올 수 있다
예시
@Bean
public Step masterStep() throws Exception {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep().getName(), partitioner()) // 파티셔너 설정
.step(slaveStep()) // Slave 스텝 설정
.gridSize(4) // 병렬 처리 수 설정
.taskExecutor(new SimpleAsyncTaskExecutor()) // 실행자 설정
.build();
}
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<Customer, Customer>chunk(1000)
.reader(pagingItemReader(null, null))
.writer(customerItemWriter())
.build();
}
@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
columnRangePartitioner.setColumn("id");
columnRangePartitioner.setDataSource(this.dataSource);
columnRangePartitioner.setTable("customer");
return columnRangePartitioner;
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(
@Value("#{stepExecutionContext['minValue']}")Long minValue,
@Value("#{stepExecutionContext['maxValue']}")Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
@StepScope
public JdbcBatchItemWriter<Customer> customerItemWriter() {
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(this.dataSource);
itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
itemWriter.afterPropertiesSet();
return itemWriter;
}
// 커스텀 파티셔너
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
private String table;
private String column;
public void setTable(String table) {
this.table = table;
}
public void setColumn(String column) {
this.column = column;
}
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class); // 처음 인덱스
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class); // 마지막 인덱스
int targetSize = (max - min) / gridSize + 1; // 덩어리 크기
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0; // 파티션 인덱스
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
value.putInt("minValue", start); // 시작 인덱스
value.putInt("maxValue", end); // 종료 인덱스
start += targetSize;
end += targetSize;
number++;
}
return result;
}}
SynchronizedItemStreamReader
- Thread-safe 하지 않은 ItemReader 를 Thread-safe 하게 처리하도록 하는 역할
- Spring Batch 4.0 부터 지원한다.
- 각 스레드가 대기하고 있다가 순차적으로 Reader 를 사용한다.
예시
@Bean
@StepScope
public SynchronizedItemStreamReader<Customer> pagingItemReader(
@Value("#{stepExecutionContext['minValue']}")Long minValue,
@Value("#{stepExecutionContext['maxValue']}")Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return new SynchronizedItemStreamReaderBuilder<Customer>()
.delegate(reader)
.build(); // SynchronizedItemStreamReader 생성
}
'Backend > Spring Batch' 카테고리의 다른 글
Spring Batch 테스트 (0) | 2023.04.25 |
---|---|
Spring Batch 이벤트 리스너 (0) | 2023.04.24 |
Spring Batch 반복 및 오류 제어 (0) | 2023.04.17 |
JobBuilder 와 StepBuilder 그리고 배치 상태 종류 (0) | 2023.03.15 |
Spring Batch 도메인 이해 (0) | 2023.02.06 |