본문 바로가기
공부/프로그래밍

[AWS, Springboot] SqsListener 사용 시 pool_size 문제 및 메세지처리 팁

by demonic_ 2019. 6. 16.
반응형

서비스를 개발하다보면 고객에게 알림을 보내야 하는 일이 있다. 그래서 모듈을 개발할 때 알림까지 전송할 수 있도록 한번에 처리한다. 그러다보니 문제가 발생했을 경우 각각의 시스템에서 점검해야 하는 문제가 발생했다. 각각의 서버환경이 다르다보니 로그파일 위치나 DBMS의 정보를 모두 저장해두어야 했고, 로그를 저장하는 방식을 기억하고 있어야 했다. 그래서 이것을 통합하기 위해 시스템을 구축중이다.

각 서비스에서 보내던 알림을 하나로 통합하기 위해 Message 를 사용하기로 했다. 유명한 Kafka나 RabbitMQ를 고려했으나 설치형 서버를 사용하는 경우 서버비용이 더 많이 들것 같아 AWS에서 제공하는 Amazon Simple Queue Service 서비스를 이용하기로 했다. 
(SNS[Amazon Simple Notification Service] 를 쓰지 않은 이유는 문자와 알림톡 서비스가 포함되어 있기 때문이다.)

 

SpringBoot에서 SqsListener 를 사용하기 위해 application.yml 를 설정을 다음과 같이 했다.

cloud:
  aws:
    credentials:
      access-key: [KEY]
      secret-key: [SECRET-KEY]
    region:
      static: [REGION]
    stack:
      auto: false
      

sqs:
  name: test-message

서비스를 개발하다보면 테스트 데이터를 만들어 두는것도 일이다. 그래서 SQS서비스에 샘플 데이터를 넣어두고, SqsListener 옵션에 deletionPolicy=SqsMessageDeletionPolicy.NEVER 를 넣어두었다. 메세지를 소비해도 사라지지 않게 하는 옵션이다.

@SqsListener(value = "${sqs.name}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void message(String message){
    ...
}

 

준비한 샘플 메세지는 20개이다. 

 

그런데 서비스를 실행하는데 다음과 같은 에러가 발생한다.

org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3b658be5[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]] did not accept task: org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable@22b99fce
...

20190616 10:59:01.720 [simpleMessageListenerContainer-2] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105529 
20190616 10:59:01.720 [simpleMessageListenerContainer-3] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105531 

로그내용을 보면 pool size가 충분하지 않아 에러가 발생한 듯 하다.

메세지가 이미 존재하고 있기 때문에 Message를 불러오다가 쓰레드가 부족해 에러가 발생한 것이다.

실제 운영에 반영하게 될 때, 모듈이 죽었다고 해서 메세지에 데이터가 쌓이지 않는것은 아니다. 그래서 이번 에러는 반드시 해결해야 하는 문제였다.


한가지 더, 위의 로그를 보면 2개의 메세지가 나온것을 확인할 수 있었다. 왜 2개일까?

에러를 발생시키는 위치의 파일은 다음과 같다.

org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.java

펴보니 SqsListener는 ThreadPoolTaskExecutor를 이용하는데, 특별한 설정이 없다면 첫번째 줄에 기본 쓰레드를 2개 생성하도록 되어있다.(위에서 메세지가 2개 나온 이유)

private static final int DEFAULT_WORKER_THREADS = 2;

흥미로운 것은 Debug모드로 해서 하나씩 실행하는 경우(실행에 딜레이를 주는 경우) TaskRejectedException 가 일어나지 않는다는 것이다. 즉, 쓰레드가 설정치 이상을 요구하게 되어 에러가 발생한 것이다.

그럼 어떤 설정을 기반으로 쓰레드 갯수를 생성할까?
SimpleMessageListenerContainer.java 파일을 보면 다음 메서드가 있다.

...

/**
 * Create a default TaskExecutor. Called if no explicit TaskExecutor has been
 * specified.
 * <p>
 * The default implementation builds a
 * {@link org.springframework.core.task.SimpleAsyncTaskExecutor} with the specified
 * bean name (or the class name, if no bean name specified) as thread name prefix.
 * @return a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} configured
 * with the thread name prefix
 * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
 */
protected AsyncTaskExecutor createDefaultTaskExecutor() {
    String beanName = getBeanName();
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setThreadNamePrefix(
            beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
    int spinningThreads = this.getRegisteredQueues().size();

    if (spinningThreads > 0) {
        threadPoolTaskExecutor
                .setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);

        int maxNumberOfMessagePerBatch = getMaxNumberOfMessages() != null
                ? getMaxNumberOfMessages() : DEFAULT_WORKER_THREADS;
        threadPoolTaskExecutor
                .setMaxPoolSize(spinningThreads * (maxNumberOfMessagePerBatch + 1));
    }

    // No use of a thread pool executor queue to avoid retaining message to long in
    // memory
    threadPoolTaskExecutor.setQueueCapacity(0);
    threadPoolTaskExecutor.afterPropertiesSet();

    return threadPoolTaskExecutor;

}

...

코드를 보면 다음 부분이 있다.

int maxNumberOfMessagePerBatch = getMaxNumberOfMessages() != null
                ? getMaxNumberOfMessages() : DEFAULT_WORKER_THREADS;

maxNumberOfMessage 를 설정하지 않으면 기본 쓰레드 생성갯수(2개)를 따라가는 것이다.

그래서 이것을 늘려주기 위해선 configuration 설정이 필요하다.

그래서 AmazonSqsConfiguration.java 파일을 생성하고 아래와 같이 설정했다.

@EnableSqs
@Configuration
public class AmazonSqsConfiguration {
    // 2-10 까지 설정이 가능하다.
    private String maxNumberOfMessage = 10;

    /**
     * 커스텀을 생성한 이유:
     *      message가 이미 queue에 있는 경우, org.springframework.core.task.TaskRejectedException 에러를 발생시킴
     *
     * 메세지를 maxNumberOfMessage 수 만큼 불러오면 다음 불러오기까지 기다린다.
     * 재 호출에 대한 시간을 조절하는 것은 AWS SQS 에 들어간 후 다음을 수정한다.(기본값 30초)
     *  (해당큐 선택 후) 대기열 작업 -> 대기열 구성 -> 기본 제한 시간 초과 에서 설정 변경
     * @param amazonSQSAsync
     * @return
     */
    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQSAsync);
        factory.setMaxNumberOfMessages(Integer.valueOf(maxNumberOfMessage));
        factory.setWaitTimeOut(10);
        return factory;
    }
}

실행을 하면 메세지를 10개씩 끊어서 가져오고 위와같은 pool size 에러는 없어졌다.

20190616 11:13:00.833 [simpleMessageListenerContainer-3] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105531 
20190616 11:13:00.833 [simpleMessageListenerContainer-10] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105543 
20190616 11:13:00.833 [simpleMessageListenerContainer-7] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105540 
20190616 11:13:00.833 [simpleMessageListenerContainer-4] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105532 
20190616 11:13:00.833 [simpleMessageListenerContainer-9] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105542 
20190616 11:13:00.833 [simpleMessageListenerContainer-2] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105529 
20190616 11:13:00.833 [simpleMessageListenerContainer-8] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105541 
20190616 11:13:00.833 [simpleMessageListenerContainer-11] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105544 
20190616 11:13:00.833 [simpleMessageListenerContainer-6] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105539 
20190616 11:13:00.833 [simpleMessageListenerContainer-5] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105538 


20190616 11:13:30.927 [simpleMessageListenerContainer-6] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105531 
20190616 11:13:30.927 [simpleMessageListenerContainer-4] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105544 
20190616 11:13:30.927 [simpleMessageListenerContainer-7] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105543 
20190616 11:13:30.927 [simpleMessageListenerContainer-2] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105540 
20190616 11:13:30.927 [simpleMessageListenerContainer-9] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105541 
20190616 11:13:30.927 [simpleMessageListenerContainer-8] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105542 
20190616 11:13:30.927 [simpleMessageListenerContainer-3] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105538 
20190616 11:13:30.927 [simpleMessageListenerContainer-11] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105539 
20190616 11:13:30.927 [simpleMessageListenerContainer-10] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105532 
20190616 11:13:30.929 [simpleMessageListenerContainer-5] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105529 

10개씩 끊어서 가져오기로 했는데, 해당 메세지가 제대로 소비되지 않으면 이동 중인 메세지로 표시되고 그 기간동안 Listner도 대기하게 된다. (정상적으로 소비되면 다음 메세지를 즉각 호출한다)

위의 로그를 보면 30초 간격으로 동일하게 가져온 것으로 보인다.

 

재 호출의 기본값은 30초이다. 이 설정을 변경하려면 AWS SQS에서 대기열 구성에 '기본 제한 시간 초과' 를 변경하면 된다(여기선 5초로 수정)

 

 

조절 후 실행 결과, 5초 사이에 호출 된 것으로 확인

20190616 11:26:26.503 [simpleMessageListenerContainer-8] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105541 
20190616 11:26:26.503 [simpleMessageListenerContainer-2] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105529 
20190616 11:26:26.503 [simpleMessageListenerContainer-4] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105532 
20190616 11:26:26.503 [simpleMessageListenerContainer-7] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105540 
20190616 11:26:26.503 [simpleMessageListenerContainer-10] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105543 
20190616 11:26:26.503 [simpleMessageListenerContainer-6] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105539 
20190616 11:26:26.503 [simpleMessageListenerContainer-3] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105531 
20190616 11:26:26.503 [simpleMessageListenerContainer-5] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105538 
20190616 11:26:26.503 [simpleMessageListenerContainer-11] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105544 
20190616 11:26:26.503 [simpleMessageListenerContainer-9] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105542 


20190616 11:26:31.489 [simpleMessageListenerContainer-5] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105531 
20190616 11:26:31.489 [simpleMessageListenerContainer-11] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105529 
20190616 11:26:31.489 [simpleMessageListenerContainer-10] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105532 
20190616 11:26:31.489 [simpleMessageListenerContainer-4] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105539 
20190616 11:26:31.489 [simpleMessageListenerContainer-9] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105538 
20190616 11:26:31.489 [simpleMessageListenerContainer-3] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105540 
20190616 11:26:31.490 [simpleMessageListenerContainer-8] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105544 
20190616 11:26:31.490 [simpleMessageListenerContainer-7] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105541 
20190616 11:26:31.490 [simpleMessageListenerContainer-6] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105542 
20190616 11:26:31.490 [simpleMessageListenerContainer-2] INFO e.a.s.AlertSqs : ######### senderId: AIDAYM4ZNJJXD36NCUF6S, message: TEST 입니다: 20190616105543 

끝.

반응형

댓글