PSD( Private-Self-Development )

Spring Batch 멀티 스레드 프로세싱 본문

Backend/Spring Batch

Spring Batch 멀티 스레드 프로세싱

chjysm 2023. 4. 19. 16:20

멀티 스레드?

특정 작업을 처리하는 스레드가 단일이 아닌 여래 개 일 경우 멀티 스레드 라고 한다.

일반적으로 복잡한 처리나 대용량 데이터 처리를 할 때, 소요 시간 및 성능상의 이점을 위해 사용한다.

 

단, 데이터 동기화 에 신경 써줘야 하기 때문에 신중히 사용해야 한다.

 

Spring Batch 에서의 멀티 스레드 

마찬가지로 기본적으로 단일 스레드 방식으로 작업을 처리한다.

멀티 스레드를 위한 비동기 처리 및 Scale out 기능을 제공한다.

Local( 서버 내부적 ) 과 Remote( 외부 서버적 ) 처리 지원 

 

멀티 스레드 처리 종류 

1.  AsyncItemProcessor / AsyncItemWriter

  • ItemProcessor 에게 별도의 스레드가 할당되어 작업을 처리하는 방식
  • Step 내부 ItemProcessor 가 비동기적으로 동작하는 구조 
  • AsyncItemProcessor 와 AsyncItemWriter( ItemProcessor의 비동기 처리가 다 끝나면 실행 ) 가 같이 구성되어야 함
  • ItemReader 은 비동기 실행이 아니다.

 

예시

@Bean
public Step asyncStep() throws Exception {
    return stepBuilderFactory.get("asyncStep")
            .chunk(100)
            .reader(pagingItemReader())
            .processor(asyncItemProcessor())
            .writer(asyncItemWriter())
            .build();
}

@Bean
public JdbcPagingItemReader<Customer> pagingItemReader() {
    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

    reader.setDataSource(this.dataSource);
    reader.setPageSize(100);
    reader.setRowMapper(new CustomerRowMapper());

    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");

    Map<String, Order> sortKeys = new HashMap<>(1);

    sortKeys.put("id", Order.ASCENDING);

    queryProvider.setSortKeys(sortKeys);

    reader.setQueryProvider(queryProvider);

    return reader;
}

@Bean
public ItemProcessor customItemProcessor() {
    return new ItemProcessor<Customer, Customer>() {
        @Override
        public Customer process(Customer item) throws Exception {
            return new Customer(item.getId(),
                    item.getFirstName().toUpperCase(),
                    item.getLastName().toUpperCase(),
                    item.getBirthdate());
        }
    };
}

@Bean
public JdbcBatchItemWriter customItemWriter() {
    JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
    itemWriter.setDataSource(this.dataSource);
    itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
    itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
    return itemWriter;
}

@Bean
public AsyncItemProcessor asyncItemProcessor() throws Exception {
    AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setDelegate(customItemProcessor());
//        asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());// 스레드를 재사용하고 관리하는 것이 아닌 스레드를 계속 만들어 낸다! 자원을 많이 잡아먹기 때문에 사용하지 말자..
    asyncItemProcessor.setTaskExecutor(taskExecutor());                 // 스레드풀을 생성해 원하는 만큼 제어 가능하다. => 자원을 효율적으로 사용한다.
    return asyncItemProcessor;
}

@Bean
public AsyncItemWriter asyncItemWriter() throws Exception {
    AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
    asyncItemWriter.setDelegate(customItemWriter());
    return asyncItemWriter;
}

@Bean
public TaskExecutor taskExecutor(){
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);    // 동시에 실행 시킬 스레드 개수
    executor.setMaxPoolSize(8);     // 최대 스레드 갯수 Queue 가 꽉 차게 되면 최대 스레드 갯수 만큼 생성한다.
    executor.setThreadNamePrefix("async-thread-");
    return executor;
}

 

2. Multi-threaded Step

  • Step 내 Chunk 구조인 ItemReader, ItemProcessor, ItemWriter 마다 멀티 스레드가 할당되어 실행
  • ItemReader 가 Thread-safe 하도록 동기화 보장이 되어야 한다.(중복으로 데이터 조회하면 안되기 때문)
  • 스레드마다 새로운 Chunk 가 할당되어 데이터 동기화 보정 
    • A 스레드는 A Chunk 를 할당 받아  read => processe  => write 진행 
      B 스레드는 B  Chunk 를 할당 받아 진행하고 서로 공유하지 않는다.

 

예시

@Bean
public Step step1() throws Exception {
    return stepBuilderFactory.get("step1")
            .<Customer, Customer>chunk(100)
            .reader(pagingItemReader())
            .processor((ItemProcessor<Customer, Customer>) item -> item)
            .writer(customItemWriter())
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public JdbcPagingItemReader<Customer> pagingItemReader() {
    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>(); // thread safe 한 Reader 하다( JpaPagingItemReader 동일 )

    reader.setDataSource(this.dataSource);
    reader.setPageSize(100);
    reader.setRowMapper(new CustomerRowMapper());

    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");

    Map<String, Order> sortKeys = new HashMap<>(1);

    sortKeys.put("id", Order.ASCENDING);

    queryProvider.setSortKeys(sortKeys);

    reader.setQueryProvider(queryProvider);

    return reader;
}

@Bean
public JdbcBatchItemWriter customItemWriter() {
    JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();

    itemWriter.setDataSource(this.dataSource);
    itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
    itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
    itemWriter.afterPropertiesSet();

    return itemWriter;
}

@Bean
public TaskExecutor taskExecutor(){
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(8);
    executor.setThreadNamePrefix("async-thread-");
    return executor;
}

 

3. Parallel Steps 

  • Flow 마다 스레드가 할당되어 여러개의 Flow을 병렬로 실행하는 방법
  • SplitState 를 사용해서 여러 개의 Flow 를 병렬로 실행하는 구조이다.
  • 실행 완료 후 결과들을 취합하여 다음 단계 를 결정한다.

예시

@Bean
public Job job() {
    FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("split");

    Flow flow = flowBuilder.split(taskExecutor())
            .add(flow1(), flow2())
            .end();

    return jobBuilderFactory.get("batchJob")
            .incrementer(new RunIdIncrementer())
            .start(flow1())
            .split(taskExecutor()).add(flow2()) // flow1, flow2 를 합치고 스레드 할당
            .next(flow3())                      // 앞선 모든 플로우가 끝나고 실행된다.
            .next(flow)                         // flow1, flow2 를 합치고 스레드 할당
            .end()
            .listener(new StopWatchJobListener())
            .build();
}

 

4. Partitioning

  • Master/Slave 방식 Master 가 데이터를 파티셔닝( 그리드 사이즈 만큼 ) 하고 각 파티션에 스레드를 할당하는 방식 
  • MasterStep 은 PartitionStep 이며 SlaveStep 은 TaskletStep, FlowStep 등이 올 수 있다

 

예시 

@Bean
public Step masterStep() throws Exception {
    return stepBuilderFactory.get("masterStep")
            .partitioner(slaveStep().getName(), partitioner()) // 파티셔너 설정
            .step(slaveStep())  // Slave 스텝 설정
            .gridSize(4) // 병렬 처리 수 설정
            .taskExecutor(new SimpleAsyncTaskExecutor()) // 실행자 설정
            .build();
}

@Bean
public Step slaveStep() {
    return stepBuilderFactory.get("slaveStep")
            .<Customer, Customer>chunk(1000)
            .reader(pagingItemReader(null, null))
            .writer(customerItemWriter())
            .build();
}

@Bean
public ColumnRangePartitioner partitioner() {
    ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();

    columnRangePartitioner.setColumn("id");
    columnRangePartitioner.setDataSource(this.dataSource);
    columnRangePartitioner.setTable("customer");

    return columnRangePartitioner;
}

@Bean
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(
        @Value("#{stepExecutionContext['minValue']}")Long minValue,
        @Value("#{stepExecutionContext['maxValue']}")Long maxValue) {
    System.out.println("reading " + minValue + " to " + maxValue);
    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

    reader.setDataSource(this.dataSource);
    reader.setFetchSize(1000);
    reader.setRowMapper(new CustomerRowMapper());

    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");
    queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);

    Map<String, Order> sortKeys = new HashMap<>(1);

    sortKeys.put("id", Order.ASCENDING);

    queryProvider.setSortKeys(sortKeys);

    reader.setQueryProvider(queryProvider);

    return reader;
}

@Bean
@StepScope
public JdbcBatchItemWriter<Customer> customerItemWriter() {
    JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();

    itemWriter.setDataSource(this.dataSource);
    itemWriter.setSql("insert into customer2 values (:id, :firstName, :lastName, :birthdate)");
    itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
    itemWriter.afterPropertiesSet();

    return itemWriter;
}

// 커스텀 파티셔너 
public class ColumnRangePartitioner implements Partitioner {

	private JdbcOperations jdbcTemplate;

	private String table;

	private String column;

	public void setTable(String table) {
		this.table = table;
	}
	public void setColumn(String column) {
		this.column = column;
	}
	public void setDataSource(DataSource dataSource) {
		jdbcTemplate = new JdbcTemplate(dataSource);
	}

	@Override
	public Map<String, ExecutionContext> partition(int gridSize) {
		int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") from " + table, Integer.class);	// 처음 인덱스
		int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") from " + table, Integer.class); 	// 마지막 인덱스
		int targetSize = (max - min) / gridSize + 1; // 덩어리 크기

		Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
		int number = 0; // 파티션 인덱스
		int start = min;
		int end = start + targetSize - 1;

		while (start <= max) {
			ExecutionContext value = new ExecutionContext();
			result.put("partition" + number, value);

			if (end >= max) {
				end = max;
			}
			value.putInt("minValue", start);	// 시작 인덱스
			value.putInt("maxValue", end);		// 종료 인덱스
			start += targetSize;
			end += targetSize;
			number++;
		}

		return result;
	}}

 

 

SynchronizedItemStreamReader

  • Thread-safe 하지 않은 ItemReader 를 Thread-safe 하게 처리하도록 하는 역할 
  • Spring Batch 4.0 부터 지원한다. 
  • 각 스레드가 대기하고 있다가 순차적으로 Reader 를 사용한다.

 

예시 

@Bean
@StepScope
public SynchronizedItemStreamReader<Customer> pagingItemReader(
        @Value("#{stepExecutionContext['minValue']}")Long minValue,
        @Value("#{stepExecutionContext['maxValue']}")Long maxValue) {
    System.out.println("reading " + minValue + " to " + maxValue);
    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

    reader.setDataSource(this.dataSource);
    reader.setFetchSize(1000);
    reader.setRowMapper(new CustomerRowMapper());

    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");
    queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);

    Map<String, Order> sortKeys = new HashMap<>(1);

    sortKeys.put("id", Order.ASCENDING);

    queryProvider.setSortKeys(sortKeys);

    reader.setQueryProvider(queryProvider);

    return new SynchronizedItemStreamReaderBuilder<Customer>()
    		.delegate(reader)
            .build(); // SynchronizedItemStreamReader 생성
}