최근 개인 프로젝트를 하나 진행하면서 약 120만건 정도 되는 json 데이터를 파싱해 정제한 후 DB에 적재해야 되는 부분을 개발하게 되었고, 그 과정에서 배운 부분들 중 동시성 문제를 분산락 도입으로 해결하는 과정에서 배운 부분을 기록으로 남긴다.
Spring Batch 쓰면 되잖아?
Spring Batch의 개발 및 운영 환경이 구성이 되어 있는 회사 환경에서의 경우에는 배치 job을 개발하고 등록해서 사용하면 될 터였지만, 아직까지 개인 프로젝트에서는 Spring Batch까지 써가며 처리해야 될 문제들이 없었다보니 Spring Batch 환경을 구성하고 개발하는 것 부터가 오버 엔지니어링으로 느껴졌다. 특히 개발 단계에서야 테이블을 drop하고, 다시 파싱하는 과정을 여러번 거치다보니 배치를 개발해두면 유용하게 쓰이긴 하겠지만, 오픈 후 운영을 하게 되었을 때에는 관리자 계정으로 가끔(사실은 거의 없다시피) 온디맨드로 배치를 수행해주면 되는 부분이라 도메인별로 분리해둔 RestController에 엔드포인트만 하나 더 열어서 권한 등록해서 사용할 요량으로 RequestMapping해 바로 배치 서비스에 작업을 요청하도록 구성해 테스트를 진행했다.
비동기 처리
5만건 단위로 끊어서 처리하도록 구성을 하긴 했지만, 역시나 시간이 오래 걸렸고, 테스트를 하면서 클라이언트를 여러개 띄우게 되니 불편해져 작업은 비동기로 돌리면 어떨까 생각돼 비동기로 처리하기로 했다.
Spring Boot 에서는 @Async 어노테이션을 이용해 선언적으로 비동기 처리가 가능하다. 내부적으로는 등록된 TaskExecutor에서 별도의 스레드가 할당되어 작업을 할당해 수행하는 것으로, 내 구성에서는 Http 요청으로부터 실행된 DispatcherServlet 스레드와 분리된 흐름에서 작업이 수행된다.
또한 @Async 어노테이션이 동작하기 위해서는 @EnableAsync 어노테이션으로 비동기 기능을 활성화 해주어야 하는데, @SpringBootApplication이 선언된 클래스에 붙여줄 수도 있지만 비동기 작업을 담당할 TaskExecutor를 커스터마이징하고, 메인 클래스와 설정 클래스를 분리하기 위해 새로 AsyncConfig 클래스를 만들어 여기에 붙여준다.
application.yml
##### 비동기 관련 설정 #####
async:
executor:
# 기본 스레드 개수
core-pool-size: 4
# 최대 스레드 개수
max-pool-size: 8
# 작업 큐 용량 (초과시 새로운 스레드 생성)
queue-capacity: 100
# 생성될 스레드 이름 접두어
thread-name-prefix: "async-task-executor-"
AsyncConfig.java
@EnableAsync
@Configuration
public class AsyncConfig {
@Value("${async.executor.core-pool-size}")
private int corePoolSize;
@Value("${async.executor.max-pool-size}")
private int maxPoolSize;
@Value("${async.executor.queue-capacity}")
private int queueCapacity;
@Value("${async.executor.thread-name-prefix}")
private String threadNamePrefix;
@Bean(name = "asyncBatchTaskExecutor")
public Executor asyncBatchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix(threadNamePrefix);
executor.initialize();
return executor;
}
}
TaskExecutor의 설정 값들은 외부 설정으로 뺐고, 빈으로 등록한 TaskExecutor는 이후에 추가적인 TaskExecutor를 등록하면 충돌이 날 수 있으므로 name을 명시적으로 줬다. 아마 TaskExecutor를 추가로 필요로 하는 시점이 오면 다른 방법을 고민해보지 않을까 싶지만 일단은 name만 붙여주는 것이므로 붙여둔다.
이렇게 처리하고 서비스 쪽에서 @Async 어노테이션을 붙이고 등록한 TaskExecutor를 사용하도록 처리해준다.
BatchJob.java
@Slf4j
@Service
@RequiredArgsConstructor
public class BatchJob {
/**
* 비동기로 작업 수행
* asyncBatchTaskExecutor 사용
*/
@Async("asyncBatchTaskExecutor")
public CompletableFuture<Integer> executeBatchLogic() {
AtomicInteger result = new AtomicInteger(0);
/**
* 로직 수행
*/
log.debug("배치 수행 완료");
return CompletableFuture.completedFuture(result.intValue());
}
}
이렇게 처리 후 기존의 컨트롤러 쪽에서 일반 서비스에 요청하던 부분을 @Async 처리를 한 BatchJob의 메소드를 호출해 요청하도록 변경해주었더니 비동기로 잘 동작하는 것을 확인했다.
배치가 아직 돌고있는데?
비동기로 처리하니 또 다른 문제가 보였다. 온디맨드로 실행이 가능하도록 구성한 배치 작업이다 보니 비동기로 로직을 돌려놓고, 응답을 클라이언트에서 받아버리니 테스트 도중 아무 생각없이 다시 배치를 수행하도록 요청을 보내버리는 일이 생겼다. 관리자용 API니 정책으로 풀어도 되는 문제지만 개인 공부 목적이 큰 프로젝트에서 그렇게 하면 안되지. 게다가 비동기 처리가 아니었어도 여러 클라이언트로부터 동시에 요청이 들어오면 똑같이 동시성 문제가 발생하기는 매한가지로 애초에 설계에 결함이 있는 것이다.
동시성 문제
동시성 문제란 둘 이상의 작업이 동일한 자원에 동시에 접근할 때 발생하는 문제로, 내 케이스에서는 여러 클라이언트가 같은 배치 작업을 동시에 실행하면 데이터 오염이 발생할 수 있어 반드시 해결해야하는 문제이다.
이 문제의 해결 방법으로는 이런 방법들이 있을 것 같다.
Synchronized/Lock | 자바 기본 동기화 방식으로 단일 인스턴스 환경에서만 유효하기 때문에 분산 환경에선 적용 불가. |
Spring Batch 도입 | 온디맨드 실행을 처리를 위해서는 추가적으로 실행할 환경 구성 필요. |
배치 실행 상태 체크 로직 추가 | DB나 캐시에 배치 실행 상태를 저장하고 실행 중인지 여부 확인. |
분산락 | 분산 환경에서 각각 실행되는 인스턴스 사이에서 락을 보유한 작업만 실행하도록 보장. |
Spring Batch 도입과 배치 실행 상태 체크 로직 추가의 경우 지금의 상황에선 배보다 배꼽이 더 큰 상황으로 제외하고 생각하기로 했다.
자바 기본 동기화 방식만 사용하기에도 이후 프로젝트가 커지면 Scale out을 통한 확장을 고려하고 있기 때문에, 단독으로 사용하기가 꺼려졌다. 분산락을 직접 적용해서 사용해본 경험은 없어 이 기회에 공부하면서 적용해보면 좋을 것 같아 분산락을 사용해 문제를 해결해보기로 했다.
먼저 분산락의 종류는 이렇게 있다고 한다.
DB 기반 락 | 특정 row 또는 table에 락을 걸어 구현. 트랜잭션 제어 필요. |
Redis 기반 락 | 가장 널리 쓰임. SET NX, Redisson, lettuce 등의 방식 존재. |
ZooKeeper 기반 락 | 고가용성이 필요한 복잡한 분산 시스템에서 사용. 오버헤드 존재. |
인메모리 (단일 인스턴스용) | 테스트나 소규모 환경에 적합. 서비스 인스턴스가 늘어나면 무력화됨. |
당장은 레디스를 올려둘 인프라도 없고, 비용도 없다. 초기 오픈에는 DB락으로 걸어도 괜찮을 것 같은데, 당장은 필요하지 않아 락 인터페이스로 추상화하고, 인메모리 락을 구현해 사용하고 이후에 변경될 지점에 대해 대응하기로 했다.
분산락 인터페이스
먼저 여러 도메인에서 사용할 수 있으므로, 공통 패키지 하위에 lock이라는 패키지로 묶어서 관리하기로 하고 분산락 인터페이스와 스레드가 락을 획득한 후 실행할 로직 즉 처리하고자 하는 콜백을 받을 수 있도록 LockCallback 인터페이스를 만든다.
LockCallback의 경우에는 클라이언트 객체에서 분산락에 람다로 전달할 수 있도록 함수형 인터페이스로 정의해준다.
/**
* 동기 분산락 내에서 수행할 로직 콜백
* @param <T>
*/
@FunctionalInterface
public interface LockCallback<T> {
T doInLock();
}
public interface DistributedLock {
<T> T executeWithLock(String lockKey, LockCallback<T> callback);
}
아직은 프로젝트 규모가 크지 않아 synchronized 키워드로 직접 동기화 처리를 해주어도 괜찮겠지만 최적화가 잘 되어 있는 ConcurrentHashMap을 이용해 인메모리 락을 구현해 사용하다가 이후에 분산 환경으로 전환할 시기에 맞춰 레디스 락 구현체로 전환할 계획이다.
ConcurrentHashMap을 이용해 분산락 인터페이스를 구현하고 @Component 어노테이션으로 컴포넌트 스캔의 대상이 되도록 등록해주었다.
package com.bb.webcanvasservice.common.lock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 인메모리 분산 락 구현체
*/
@Slf4j
@Component
public class InMemoryDistributedLock implements DistributedLock {
private final ConcurrentHashMap<String, AtomicBoolean> lockMap = new ConcurrentHashMap<>();
@Override
public <T> T executeWithLock(String lockKey, LockCallback<T> callback) {
AtomicBoolean lock = lockMap.computeIfAbsent(lockKey, key -> new AtomicBoolean(false));
if (!lock.compareAndSet(false, true)) {
// 커스텀 Exception
log.info("{} 락 획득 실패", lockKey);
throw new LockAlreadyOccupiedException();
}
try {
return callback.doInLock();
}
finally {
log.info("{} 락 해제", lockKey);
lock.set(false);
}
}
}
어디에서 락을 걸까?
간단하게 분산락을 구현하고 나니 다음 고민은 어디에서 락을 걸어줄까 하는 문제였다. 처음엔 서비스 레이어에 위치한 BatchJob 쪽의 @Async 어노테이션이 붙어있는 메소드에서 락을 걸어주려고 시도해보았다.
@Slf4j
@Service
@RequiredArgsConstructor
public class BatchJob {
private final TargetService targetService;
private final DistributedLock distributedLock;
@Async("asyncBatchTaskExecutor")
public CompletableFuture<Integer> wrongBatch() {
return distributedLock.executeWithLock("wrong-batch-lock", () -> {
AtomicInteger result = new AtomicInteger(0);
try {
log.info("배치 작업 수행 시작");
// 30초 걸리는 작업 시뮬레이션
Thread.sleep(30000);
}
catch(Exception e) {
}
log.info("배치 수행 완료");
return CompletableFuture.completedFuture(result.intValue());
});
}
이렇게 구성을 하고 API를 여러번 호출하며 찍어둔 로그를 확인해보았다.
원했던 시나리오는 이러하다.
1. 첫 번째 요청이 들어오면, DispatcherServlet 스레드에서 @Async 어노테이션에 의해 task-executor-1 스레드로 비동기 처리하도록 작업을 위임한다. 이 때 오래 걸리는 작업 이전 LockCallback 로직의 맨 앞에 찍어둔 "배치 작업 수행 시작" 로그를 찍는다.
2. 실행중인 배치 작업이 끝나기 전, 두 번째 요청이 들어왔을 때, 분산락 로직에 의해 락 획득에 실패하면 LockCallback을 실행하지 않고 예외를 던진다. 이 때, DispatcherServlet 스레드에서 예외를 던져 미리 구성해둔 @RestControllerAdvice로 Handler Exception 공통 처리 로직을 태운다. (배치 작업을 수행하지 못했음을 클라이언트에 알리기 위함)
3. 배치 작업을 수행하던 task-executor-1 스레드에서 작업을 종료하고 락을 해제해 다른 클라이언트로부터 들어오는 요청을 수행할 수 있도록 한다.
로그를 보아하니, 1번과 3번은 예상대로 동작했으나 2번이 예상대로 동작하지 않았다.
두번 째 요청의 DispatcherServlet 스레드로 보이는 nio-9200-exec-3 스레드에서 락 획득 실패 로그가 찍힌 것이 아니라 task-executor-2 스레드에서 찍혀있는 것을 확인할 수 있다.
@Async
BatchJob 클래스의 메소드를 다시 살펴보자.
배치 작업을 비동기로 처리하기 위해 @Async 어노테이션을 메서드에 적용했다.
이는 스프링이 프록시 기반 AOP를 통해 해당 메서드를 별도의 스레드에서 실행되도록 하여, 호출한 쓰레드와는 비동기적으로 작업을 수행하게 만든다는 의미다. 따라서 @Async가 적용된 메서드 내부에서 분산 락 로직을 수행할 경우, 락 획득 실패 예외도 이미 별도 스레드 내에서 발생하게 된다. 그 결과 예외는 원래의 DispatcherServlet 스레드에서 발생하지 않으며, @RestControllerAdvice를 통한 공통 예외 처리 흐름으로 전달되지 않는다.
Presentation Layer인 컨트롤러에 락을 적용하기보단 BatchJob의 실행과 락을 담당하는 Executor 클래스를 하나 생성하기로 했다.
@Slf4j
@Service
@RequiredArgsConstructor
public class BatchExecutor {
private final BatchJob batchJob;
private final DistributedLock distributedLock;
public void wrongBatch() {
String batchId = "wrong batch-lock";
try {
distributedLock.executeWithLock(batchId, batchJob::wrongBatch);
}
catch (LockAlreadyOccupiedException e) {
log.error(e.getMessage());
log.error("이미 실행중인 배치입니다. ====== {}", batchId);
throw new LockAlreadyOccupiedException("이미 실행중인 배치입니다.");
}
}
}
그리고 BatchJob쪽에 걸었던 락도 풀어준다.
@Slf4j
@Service
@RequiredArgsConstructor
public class BatchJob {
private final TargetService targetService;
private final DistributedLock distributedLock;
@Async("asyncBatchTaskExecutor")
public CompletableFuture<Integer> wrongBatch() {
AtomicInteger result = new AtomicInteger(0);
try {
log.info("배치 작업 수행 시작");
// 30초 걸리는 작업 시뮬레이션
Thread.sleep(30000);
}
catch(Exception e) {
}
log.info("배치 수행 완료");
return CompletableFuture.completedFuture(result.intValue());
}
이렇게 구성 후 기존 컨트롤러에서 BatchJob의 wrongBatch를 바로 호출하던 것을 BatchJobExecutor를 통해 호출하도록 변경해주면 된다.
이렇게하고 다시 테스트를 진행한다.
이번엔 아예 배치가 시작하기도 전에 DispatcherServlet 스레드에서 락을 해제해버렸다.
락이 풀렸으니 첫 번째 배치가 채 완료되기 전에 두 번째 요청이 들어오면 배치를 다시 수행해버린다.
@Async 어노테이션에 의해 BatchJob의 로직이 비동기로 수행이 되어버리니 DispatcherServlet 스레드에서 실행되는 분산락 입장에서는 LockCallback이 바로 종료된 것으로 간주하고 finally 절을 실행해 락을 해제해버린 것이다.
정리해보자면 다음과 같이 구성을 해야한다.
분산락 로직은 DispatcherServlet 스레드에서 동작해 락을 획득하지 못할 시 DispatcherServlet 스레드에서 예외를 발생시켜야 하고, 락을 획득한 경우 분산락은 파라미터로 전달받은 LockCallback을 실행한 결과값인 CompletableFuture를 리턴하되, whenComplete 훅에서 배치 작업이 완료된 시점을 잡아 락을 해제해줘야한다. 이렇게 처리하면 비동기 처리는 의도대로 DispatcherServlet과 분리된 스레드에서 비동기적으로 실행하면서도 락을 얻지 못했을 때 발생시키는 예외를 공통처리할 수 있다.
그럼 비동기 처리에 사용할 CompletableFuture를 리턴하는 LockCallback 함수형 인터페이스와 분산락의 메소드를 추가하고 구현해보자.
/**
* /**
* 비동기 분산락 내에서 수행할 로직 콜백
* @param <T>
*/
@FunctionalInterface
public interface AsyncLockCallback<T> {
CompletableFuture<T> doInLock();
}
public interface DistributedLock {
/**
* 동기 처리 락
* @param lockKey
* @param callback
* @return
* @param <T>
*/
<T> T executeWithLock(String lockKey, LockCallback<T> callback);
/**
* 비동기 처리 락
* @param lockKey
* @param asyncCallBack
* @return
* @param <T>
*/
<T> CompletableFuture<T> executeAsyncWithLock(String lockKey, AsyncLockCallback<T> asyncCallBack);
}
@Slf4j
@Component
public class InMemoryDistributedLock implements DistributedLock {
private final ConcurrentHashMap<String, AtomicBoolean> lockMap = new ConcurrentHashMap<>();
@Override
public <T> T executeWithLock(String lockKey, LockCallback<T> callback) {
AtomicBoolean lock = lockMap.computeIfAbsent(lockKey, key -> new AtomicBoolean(false));
if (!lock.compareAndSet(false, true)) {
log.info("{} 락 획득 실패", lockKey);
throw new LockAlreadyOccupiedException();
}
try {
return callback.doInLock();
}
finally {
log.info("{} 락 해제", lockKey);
lock.set(false);
}
}
@Override
public <T> CompletableFuture<T> executeAsyncWithLock(String lockKey, AsyncLockCallback<T> asyncCallBack) {
AtomicBoolean lock = lockMap.computeIfAbsent(lockKey, key -> new AtomicBoolean(false));
if (!lock.compareAndSet(false, true)) {
// 락을 획득하지 못한경우 예외를 throw한다.
// 이 예외는 아직 @Async 어노테이션이 적용된 비동기 메소드 실행 전이므로
// DispatcherServlet 스레드에서 발생하게 된다.
throw new LockAlreadyOccupiedException();
}
try {
return asyncCallBack.doInLock()
.whenComplete((result, throwable) -> {
lock.set(false);
});
}
catch (Exception e) {
lock.set(false);
log.error(e.getMessage(), e);
CompletableFuture<T> failureFuture = new CompletableFuture<>();
failureFuture.completeExceptionally(e);
return failureFuture;
}
}
}
CompletableFuture를 리턴하는 AsyncLockCallback의 메소드를 실행하고 whenComplete에서 배치가 종료된 시점을 잡아 락을 해제해 준다.
이 try - catch 절에서는 비동기 처리가 실행된 시점이므로 catch 절에서 completeExceptionally로 예외를 담아 리턴해준다. 여기에서 throw를 하더라도 예외는 @Async 어노테이션에 의해 할당된 스레드에서 발생하게 되므로 처리가 어렵다.
이렇게 적용해 준 후 BatchExecutor에서 executeAsyncWithLock으로 락 메소드를 변경해주면 된다.
@Slf4j
@Service
@RequiredArgsConstructor
public class BatchExecutor {
private final BatchJob batchJob;
private final DistributedLock distributedLock;
public void wrongBatch() {
String batchId = "wrong batch-lock";
try {
distributedLock.executeAsyncWithLock(batchId, batchJob::wrongBatch);
}
catch (LockAlreadyOccupiedException e) {
log.error(e.getMessage());
log.error("이미 실행중인 배치입니다. ====== {}", batchId);
throw new LockAlreadyOccupiedException("이미 실행중인 배치입니다.");
}
}
}
테스트를 진행해보자.
락을 획득시도에 실패한 경우 예외가 발생하는 스레드가 DispatcherServlet 스레드인 것을 확인할 수 있고, 그렇기에 @ExceptionHandler로 공통 예외처리한 로직도 잘 타는 것을 확인할 수 있다. 물론 락도 잘 걸리고, task-executor-1 스레드를 할당받아 비동기로 작업을 수행하여 클라이언트로 바로 응답을 할 수 있게 되었다.
결과만 놓고 보면 이렇게 처리한 데이터가 한글 데이터라 정제가 영 쉽지 않아 다른 데이터를 추가로 알아보고 있다.
그래도 이번 작업을 통해 @Async 어노테이션의 동작 원리나 분산락의 동작 방식, 그리고 동시성 문제를 해결하는 방법 등에 대해 배울 수 있게 되었다.
소스코드는 남아있지만 다른 작업을 하다보면 잊혀질 수 있으니까 기록을 남겨본다.
'Spring Boot' 카테고리의 다른 글
[Spring Boot] WebSocket 서버 설정 1 - 연결 설정 및 메세지 매핑 (0) | 2025.05.15 |
---|---|
[Lessons learned] 대용량 파일 다운로드 처리 (0) | 2024.11.29 |
[Spring Boot] default locale 및 encoding 관련 이슈 해결 기록 (0) | 2024.11.19 |
[Spring Boot] MappingJackson2HttpMessageConverter 커스터마이징 (0) | 2024.08.22 |