본문 바로가기

카테고리 없음

카프카를 통한 Pub/Sub 구현 및 분산 트랜잭션 사용기

A. MS간 통신에서 생기는 이슈들

1. 비동기/논블로킹

  • MSA(MicroService Architecture)를 사용하게 되면 관리의 범위가 작아져서 배포가 쉬워지며 복잡한 서비스에서 모놀리식 구조(Monolithic Architecture)보다 변경이 쉬워 집니다.
  • 그렇지만 이런 MSA의 장점을 온전히 이용하려면 MS간 의존도가 낮아 느슨한 결합(Loose Coupling)이 이루어져 있어야 합니다.
  • 기존의 동기 방식의 통신을 적용시 MS간 의존도가 생기기에 정합성이 매우 중요한 경우가 아니라면 비동기/논블로킹 방식을 이용하여 MS간 의존도를 낮추는 것이 권장 됩니다. 
  • 비동기/논블로킹 방식은 WebClient같은 HTTP클라이언트를 이용할 수도 있고 중간에 브로커 서버를 두는 메시지 큐(Message Queue) 방식으로 구성할 수도 있습니다. 

2. 메시지큐 방식 vs. HTTP를 이용한 비동기/논블로킹 통신

  • MSA를 사용하게 되면 MS간 서버 상황이 다를 수 있습니다.
  • 어떤 MS가 다운되어 있다면 WebClient로 보낸 비동기 메시지가 제대로 도착하지 않을 수 있습니다.
  • 메시지 큐 방식을 사용하면 MS가 다운되어 있더라도 브로커 서버(Message Broker)가 메시지를 보관하고 있기에 비동기 통신에서 데이터의 전달을 보장합니다. 
  • 브로커 서버 덕분에 비동기 통신에서 데이터의 흐름을 제어할 수 있고 시스템의 신뢰성을 높이며 가용성을 높이기 또한 수월해 집니다. 

3.  분산 트랜잭션

  • 하나의 서비스를 처리하기위해 여러개의 MS를 거쳐야 하기에 어플리케이션에서 처럼 하나의 트랜잭션에서 모든 작업을 처리할 수 없습니다. 
  • 이를 위해 MS간 통신에서는 각 MS의 데이터들의 정합성을 지키기 위해 분산된 트랜잭션의 관리를 해주어야합니다.

 

B. 분산 트랜잭션에서 정합성 지키기. 

 

1. ACID(Atomicity, Consistency, Isolation, Durability)

 

    트랜잭션이 가지는 4가지 속성을 ACID라고 하고 모놀리식 아키텍처에서는 ACID를 지키는 트랜잭션을 사용하기 수월합니다. 

  • 원자성(Atonomicity) : 하나의 트랜잭션에 포함되는 작업은 반드시 같이 성공하거나 실패해야 합니다. 하나라도 실패시 롤백을 통해 원래 상태로 돌아올 수 있어야 합니다.
  • 일관성(Consistency): 트랜잭션의 결과는 데이터베이스가 항상 유효한 상태임을 보장할 수 있어야 합니다. 유효한 상태란 데이터베이스가 요구하는 제약사항과 비즈니스 로직을 지키는 것입니다.
    • 데이터 베이스 제약조건(Constraint):
      • 자료형, 길이등 데이터 베이스 규칙
      • 고유키, 외래 키, Cascading등 스키마 제약사항
    • 비즈니스 규칙
      • 예)재고 관리 시스템이서 재고의 숫자는 0이 될수 없다 등
  • 고립성(Isolation) : 하나의 트랜잭션이 다른 트랜잭션의 영향을 받지 않아야 합니다. 그렇기에 동시에 수행되는 여러개의 트랜잭션 결과는 각 트랜잭션을 순차적으로 실행했을때와 같아야합니다. 
  • 지속성(Durability): 커밋된 트랜잭션은 어떤 장애 상황에서도 유실되지 않고 보장되어야 합니다. 

2. 2PC (Two-Phase-Commit)

  • MSA에서는 데이터베이스가 분산되어 트랜잭션도 여러 MS에서 수행되기에 ACID 속성들을 지키기 어렵습니다.
  • 분산 트랜잭션 상황에서 하나만 커밋에 성공하고 다른 트랜잭션은 그렇지 못하다면 데이터의 정합성(Data Integrity)이 깨지게 됩니다.
  • 이에 분산 트랜잭션에서 정합성을 지키기위한 방법들이 고안되었는데 대표적인 방법 중 하나가 2PC입니다.
  • 2PC는 코디네이터(또는 트랜잭션 매니저)에 의해서 관리 됩니다.
  • 순서
    • 어플리케이션이 분산 트랜잭션을 시작하려고 하면 코디네이터에서 글로벌하게 유니크한 ID를 발급 받습니다.
    • 각 참여자에게 ID를 전달하고 읽기 또는 쓰기의 작업중 문제가 생기면 각 참여자는 
    • 어플리케이션이 커밋할 준비가 되면 prepare요청을 보내고 모두 준비가 되면 커밋을 진행하고 그렇지 못하거나 타임아웃되면 트랜잭션을 abort합니다.
  • 단점
    • 코디네이터가 다운 된다면 다른 참여자들은 코디네이터가 회복할때까지 트랜잭션을 완료할 수 없습니다.
    • NoSQL에서는 2PC를 지원하지 않습니다.

3. SAGA 패턴

출처: Azderica

  • 2PC가 트랜잭션 매니저를 통해 DB 레벨에서 이루어진다면 SAGA패턴은 어플리케이션이 주체가 됩니다.
  • SAGA패턴은 MS들 끼리 이벤트를 주고 받아서 데이터의 정합성을 지킵니다.
  • 트랜잭션이 실패하면 그때까지 성공한 트랜잭션들을 되돌리는 보상 이벤트(Complementary)를 발행하여 데이터를 롤백 시킵니다. 
  • 보상 이벤트를 처리하기까지 시간이 걸리기에 실시간으로 원자성과 일관성을 보장할 수 없지만 결과적인 일관성(Eventual Consistency)을 보장합니다.
    • 은행 계좌 정보같이 데이터의 정합성이 매우 중요하다면 MS의 SRP보다 동기 트랜잭션이 ACID를 지키기에는 더 적합할수 있습니다.
  • 이벤트를 순차적으로 별로 완료, 실패 이벤트 발행하여 관리하는 코레오그래피(Chreography based SAGA pattern)과 SAGA Manager의 관리를 통해 이루어지는 오케스트레이션(Orchestration based SAGA pattern)이 있습니다.

C. 카프카 선택 이유

1. RabbitMQ vs. Kafka

  • MSA구조를 도입하면서 비동기 통신만으로는 데이터의 전달을 확신할수 없어 메시지큐를 사용하는 미들웨어를 도입하였습니다.
  • 가장 대표적인 2가지 
  • 차이점
    • 메시징방식/ 안정성
      • RabbitMQ는 컨슈머에게 메시지는 Push후 응답받으면 사라집니다.
      • Kafka는 컨슈머가 원하는 속도로 Pull하여 소비할 수 있고 메시지가 만료될 때까지 보관합니다.
    • 확장성
      • RabbitMQ는 수평적 확장시 클러스터링 설정이 필요하며 수직적 확장을 할 수 있지만 처리량이 늘지 않을 수 있습니다.
      • 카프카는 파티션을 통한 수평정 확장에 유리하며 주키퍼를 통해 확장된 클러스터들을 관리하기 편합니다.
    • 처리량 (Confluent)
      • Kafka: 605 MB/s의 최대 처리량
      • RabbitMQ: 38 MB/s의 최대 처리량
    • 리소스
      • 카프카는 도입시 최소 3개의 서버추가가 권장되고 주키퍼(zoo keeper)라는 매니저가 필요합니다.
  • 선택
    • 복잡하지 않은 프로젝트라면 도입의 부담이 적은 RabbitMQ로 메시지큐를 구현하는 것이 더 편리하고 카프카는 오버엔지니어링이라고 여겨질 수 있다고 생각합니다. 
    • 그렇지만 MSA구조를 탐구해보고 싶은 프로젝트의 특성상 MS의 상황에 맞게 배압(Back Pressure)을 조절해 메시징을 할 수 있고 가용성이 중요한 MSA구조에 맞게 수평적 확장으로 대응할 수 있는 카프카를 사용해 보기로 했습니다.

2. 스프링 카프카의 트랜잭션

  • @Transaction은 카프카 트랜잭션을 실행 시켜주지만 DB트랜잭션과 따로 동작할 수 있기에 주의해서 사용이 필요합니다.

-1. DB 와 따로 사용

@Transactional
public void process() {
    // DB와 Kafka가 독립적인 트랜잭션으로 실행됨
    memberRepository.save(member);    // DB 트랜잭션
    kafkaTemplate.send("topic", message);  // Kafka 트랜잭션
    // DB는 성공하고 Kafka는 실패할 수 있음
}

 

-2. 중첩 트랜잭션으로 이용

  • 외부 트랜잭션 실패시 내부 트랜잭션도 롤백
  • 내부 트랜잭션 롤백시 외부 트랜잭션도 롤백됨
  • 그렇지만 내부 트랜잭션이 커밋된다면 외부 트랜잭션 롤백해도 내부 트랜잭션은 롤백 되지 않음(순서가 중요)
@Transactional("kafkaTransactionManager")
public void outerProcess() {
    kafkaTemplate.send("topic", message);  // Kafka 트랜잭션 시작
    innerProcess();  // DB 트랜잭션 시작
    // Kafka나 DB 중 하나라도 실패하면 모두 롤백
}

@Transactional("dbTransactionManager")
private void innerProcess() {
    memberRepository.save(member);
}

 

-3. DB와 카프카 묶어서 사용

    @Transactional(transactionManager = "transactionManager", rollbackFor = {KafkaTransactionException.class})
    public String processOrder(OrderRequest request) {

        try {
            orderRepository.save(orderEntity);
            kafkaTemplate.send("kafka-topic-order", orderEvent);
            
            if (request.getAmount() <= 0) {
                throw new RuntimeException("주문 금액이 유효하지 않습니다");
            }
            
            kafkaTemplate.send("kafka-topic-payment", paymentEvent);
        } catch (Exception ex) {
            log.error("주문 처리 중 예외 발생", ex);
            throw new KafkaTransactionException("주문 처리 트랜잭션 실패");
        }

        return "order processed successfully";
    }
}

 

D. 카프카 적용기

  •  줄서기 MS에서 Member의 위치정보를 받아서 메시징을 보냈으나 위치정보를 확인후 메시지를 보내는것은 Member MS의 책임이라고 느껴서 줄서기 등록시 Member MS가 알림을 보내도록 변경하였습니다.

1. 줄서기 MS - 프로듀서

  kafka:
    topic: alarm-event
    template:
      default-topic: alarm-events
    producer:
      bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        acks: all
        retries: 10
        retry.backoff.ms: 1000
    admin:
      properties:
        bootstrap.servers: localhost:9092,localhost:9093,localhost:9094

 

@Component
@RequiredArgsConstructor
@Slf4j
public class AlarmEventProducer {

    private final KafkaTemplate<Long, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

       @Value("${spring.kafka.topic}")
    public String topic;

    public CompletableFuture<SendResult<Long, String>> sendAlarmEvent(HelpAlarmEvent helpAlarmEvent) {
        try {
            Long key = helpAlarmEvent.helpRegisterId();
            String value = objectMapper.writeValueAsString(helpAlarmEvent);

            ProducerRecord<Long, String> producerRecord = buildProducerRecord(key, value, topic);
            var completableFuture = kafkaTemplate.send(producerRecord);
            return completableFuture
                    .whenComplete((sendResult, throwable) -> {
                        if (throwable != null) {
                            handleFailure(throwable);
                        } else {
                            handleSuccess(key, value, sendResult);
                        }
                    });
        } catch (Exception e) {
            handleFailure(e);
        }
        return null;
    }

    private ProducerRecord<Long, String> buildProducerRecord(Long key, String value, String topic) {
        List<Header> recordHeaders = List.of(new RecordHeader("event-source", "scanner".getBytes()));
        return new ProducerRecord<Long, String>(topic, null, key, value, recordHeaders);
    }

    private void handleFailure(Throwable ex) {
        log.error("Error Sending the Message and the exception is {}", ex.getMessage());
    }

    private void handleSuccess(Long key, String value, SendResult<Long, String> result) {
        log.info("Message Sent SuccessFully for the key : {} and the value is {} , partition is {}", key, value, result.getRecordMetadata().partition());
    }


}

 

2. MemberMS - 컨슈머

  kafka:
    template:
      default-topic: alarm-events
    consumer:
      bootstrap-servers: kafkaA:19092,kafkaB:19093,kafkaC:19094
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: alarm-events-listener-group
      auto-offset-reset: latest
    producer:
      bootstrap-servers:
        - localhost:9092,localhost:9093,localhost:9094
        #- kafkaA:19092
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
@Component
@Slf4j
public class AlarmEventConsumer {

    @Autowired
    private ObjectMapper objectMapper;

    @KafkaListener(topics = {"alarm-events"}
            , groupId = "alarm-events-listener-group")
    public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {
        log.info("ConsumerRecord : {}", consumerRecord);
        try {
            HelpAlarmEvent helpAlarmEvent = objectMapper.readValue(consumerRecord.value(), HelpAlarmEvent.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

 

출처

WSO2 - 마이크로서비스 사용사례 - MSA 의 핵심 아키텍처 컨셉

Deok님의 MSA, EDA 그리고 Event Sourcing의 이해

F-Lab - 메시지 큐를 활용한 시스템 설계 개선 전략

SE Concepts님의 Two-Phase-Commit이란?(2PC)

Azderica님의 [Architecture] MSA: SAGA 패턴이란

Confluent -Benchmarking Apache Pulsar, Kafka, and RabbitMQ

maxxyoung님의 [Kafka] 카프카 너로 정했다! - 트랜잭션

류니님의 [KAFKA] DB, KAFKA Transaction