전자책 출간 알림 [마이크로서비스패턴 쉽게 개발하기]

티스토리 뷰

지금까지 Axon Framework의 CommandGateway & Event Sourcing, EventGateway, QueryGateway, Snapshot, State Stored Aggregate에 대해서 '코끼리 냉장고 넣기'서비스를 개발하면서 배워 봤습니다.

이제 마지막으로 Event Replay를 이용하여 조회DB 데이터를 복구 하는 방법을 실습해 보겠습니다. 


Event Replay Controller 추가

package 'org.axon.controller'에 'ReplayEventsController' class를 추가 합니다. 

두개의 API가 추가 됩니다.

  • ReplayEventsController: 지정한 Event Handler에 대해 특정 일시 이후의 Event만 Replay함 
  • replayEventsAll: Event Replay가 활성화된 모든 Event Handler에 대해 Event Replay를 함 

 

package org.axon.controller;
/*
- 목적: Event를 replay하여 최종 상태를 DB에 저장함
*/
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.config.Configuration;
import org.axonframework.eventhandling.StreamingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.Instant;
import java.util.List;

@Tag(name = "Replay Events API")
@Slf4j
@RestController
@RequestMapping("/api/admin")
class ReplayEventsController {
    private final Configuration configuration;

    public ReplayEventsController(Configuration configuration) {
        this.configuration = configuration;
    }

    //-- 특정 EventHandler class에 대해 특정 날짜 이후의 Event Replay하여 최종 상태 저장
    @GetMapping("/replay/{processingGroup}/{startDateTime}")
    @Operation(summary = "지정한 Processing Group의 Event Handler를 지정한 시간 이후 Event만 Replay하면서 수행함")
    @Parameters({
            @Parameter(name = "processingGroup", in= ParameterIn.PATH, description = "Processing Groupname",
                    required = true, example="elephant"),
            @Parameter(name = "startDateTime", in= ParameterIn.PATH, description = "시작일시(생략 시 모든 Event Replay)",
                    allowEmptyValue = true, example = "2024-01-01T00:00:00.00Z")
    })
    String replayEventFor(
            @PathVariable(name = "processingGroup") String processingGroupName,
            @PathVariable(name = "startDateTime") String startDateTime) {

        String startAt = ("".equals(startDateTime) ? "2000-01-01T00:00:00.00Z" : startDateTime);
        log.info("Executing replayEventsFor({}) from {}", processingGroupName, startAt);

        TrackingEventProcessorConfiguration tepConfig = TrackingEventProcessorConfiguration
                .forSingleThreadedProcessing()
                .andInitialTrackingToken(streamableMessageSource ->
                        streamableMessageSource.createTokenAt(Instant.parse(startAt)));

        configuration.eventProcessingConfiguration()
                .eventProcessorByProcessingGroup(processingGroupName, StreamingEventProcessor.class)
                .ifPresentOrElse(streamingEventProcessor ->
                {
                    if(streamingEventProcessor.supportsReset()) {
                        streamingEventProcessor.shutDown();
                        //streamingEventProcessor.resetTokens();    //모든 Event replay 시
                        streamingEventProcessor.resetTokens(tepConfig.getInitialTrackingToken());
                        streamingEventProcessor.start();
                        log.info("===== Start replay events for <{}> from {}", processingGroupName, startAt);
                    } else {
                        log.info("Find <{}>. But, It's not support replay", processingGroupName);
                    }
                }, () -> log.info("Can't find <{}>", processingGroupName));

        return "Replay Events is processed";
    }

    //-- Event Replay를 허용하는 모든 Event Handler에 대해 전체 Event를 Replay하여 최종 상태 저장
    @GetMapping("/replayAll")
    @Operation(summary = "Replay가 가능한 모든 Event Handler를 찾아 Event를 처음부터 Replay하도록 함")
    String replayEventsAll() {
        log.info("Executing replayEventsAll");
        List<StreamingEventProcessor> streamingEventProcessors = configuration.eventProcessingConfiguration()
                .eventProcessors().values().stream()
                .filter(StreamingEventProcessor.class::isInstance)
                .map(StreamingEventProcessor.class::cast)
                .toList();

        streamingEventProcessors.forEach(streamingEventProcessor -> {
            if(streamingEventProcessor.supportsReset()) {
                log.info("Trigger ResetTriggeredEvent for streamingEventProcessor <{}>", streamingEventProcessor);
                streamingEventProcessor.shutDown();
                streamingEventProcessor.resetTokens();
                streamingEventProcessor.start();
            }
        });

        return "Replay Events All is Processed";
    }
}

 

EventHandler class에 Event Replay 활성화 및 @ResetHandler 추가

ElephantEventHandler class에 Event Replay를 활성화하는 어노테이션 '@AllowReplay'를 추가 합니다. 

또한 @ProcessingGroup 어노테이션으로 Process Group명도 추가 합니다. 

위에서 만든 'ReplayEventsController' API에서 Process Group명을 입력하는데 이 이름을 사용하면 됩니다. 

@Slf4j
@Component
@ProcessingGroup("elephant")    //전체 Event Replay시 대상 class를 구별하기 위해 부여
@AllowReplay                    //Event Replay를 활성화 함. 비활성화할 EventHandler에는 @DisallowReplay를 지정
public class ElephantEventHandler {
    @Autowired
...
}
참고) Event Replay를 허용하지 않을 Event Handler 메소드는 @DisallowReplay 어노테이션을 붙이십시오. 

 

Event Replay를 시작 하기 전에 조회DB의 기존 데이터를 모두 삭제하는 Reset Handler를 맨 아래 추가 하십시오.

enter_count 테이블의 모든 데이터도 삭제해야 합니다. 

    //===================== 전체 이벤트 Replay하여 DB에 최종 상태 저장 ===========
    @ResetHandler
    private void replayAll() {
        log.info("[@ResetHandler] Executing replayAll");
        elephantRepository.deleteAll();
        enterCountRepository.deleteAll();
    }

 

EnterCountRepository를 만들어서 에러를 없앱니다. 

package 'org.axon.repository' 하위에 작성 합니다. 

import org.axon.entity.EnterCount;
import org.springframework.data.jpa.repository.JpaRepository;

public interface EnterCountRepository extends JpaRepository<EnterCount, String> {

}

 

ElephantEventHandler class에 EnterCountRepository의 객체를 생성하는 수행을 추가 합니다. 

public class ElephantEventHandler {
	...
    @Autowired
    private EnterCountRepository enterCountRepository;
    ...
}

 

TIP) 특정 일시부터 현재까지 발생한 Event만 이용하여 데이터 생성이 가능함

두번째 API를 응용하여 정기적으로 다른 조회 DB에 Event Replay를 하여 데이터를 생성할 수 있습니다.  

예를 들어 매월 말에 그 달에 발생한 데이터만 생성하여 월간보고 등에 이용할 수 있습니다.    

 


 

테스트

어플리케이션을 재시작하고 Swagger 페이지를 열어 보십시오.

이제 Event Replay 요청 API가 하단에 표시될겁니다. 

- '/api/admin/replayAll' 실행 

API 실행 후 콘솔의 두번째 줄을 보면 'elephant'라는 Processing group을 찾아서 Event Replay를 실행한 걸 확인할 수 있습니다. 

  Executing replayEventsAll
   Trigger ResetTriggeredEvent for streamingEventProcessor <elephant>
   Shutdown state set for Processor 'elephant'.
   Processor 'elephant' awaiting termination...
   Released claim
   Worker for segment Segment[0/0] stopped.
   [@ResetHandler] Executing replayAll
   Worker assigned to segment Segment[0/0] for processing
   Using current Thread for last segment worker: TrackingSegmentWorker{processor=elephant, segment=Segment[0/0]}
   Fetched token: ReplayToken{currentToken=ReplayToken{currentToken=null, tokenAtReset=IndexTrackingToken{globalIndex=2}, context=null}, tokenAtReset=IndexTrackingToken{globalIndex=2}, context=null} for segment: Segment[0/0]
   [@EventHandler] CreatedElephantEvent for Id: 664
   [@EventHandler] CreatedElephantEvent for Id: 046
   [@CommandHandler] CreateEnterCountCommand for Id: 664
   [@CommandHandler] CreateEnterCountCommand for Id: 046
   [@EventHandler] CreatedElephantEvent for Id: 143
   [@CommandHandler] CreateEnterCountCommand for Id: 143

DBeaver에서 데이터를 확인해 보면 모두 지우고 Event Replay를 한 후 Data가 다시 복원된 걸 확인할 수 있습니다. 

 

두번째 API를 테스트 해 봅니다.  

특정 날짜 이후의 Event만 Replay할 수 있습니다. 실행을 하고 DBeaver에서 확인해 보면 데이터가 그대로 복원되었을 겁니다. 

이번에는 시작 일시를 오늘 날짜 이후로 지정하고 실행해 보십시오. 

데이터가 복원되지 않습니다. 

다시 시작일시를 어제 날짜로 지정하고 실행해 봅니다. 

이번에는 데이터가 복원되었을 겁니다. 

원본 데이터는 Event Store에 저장되어 있기 때문에 복원이 되는 것입니다. 

 


이상으로 총 7개 편으로 나누어 Axon Framework에 대해 실습을 통해 이해 하였습니다.

  1. 코끼리 냉장고에 넣기 프로젝트 시작
  2. CommandGateway와 Event Sourcing
  3. EventGateway
  4. QueryGateay
  5. Snapshot
  6. State Stored Aggregate
  7. Event Replay로 조회DB 데이터 복구

Axon Framework은 지금까지 설명한 기능 외에 Saga 패턴을 쉽게 구현할 수 있는 방법도 제공 합니다. 

 


마이크로서비스 패턴 쉽게 개발하기 목차

  1. 마이크로서비스 패턴 이해: Saga, Event Sourcing, API Composition, CQRS 이해
  2. 실습환경 준비
  3. 주문 서비스 테스트 
  4. Axon Framework 이해 
    1. 코끼리 냉장고에 넣기 프로젝트 시작
    2. CommandGateway와 Event Sourcing
    3. EventGateway
    4. QueryGateay
    5. Snapshot
    6. State Stored Aggregate
    7. Event Replay로 조회DB 데이터 복구

아래 주제들은 '마이크로서비스패턴 쉽게 개발하기'라는 제 책에서 만나실 수 있습니다. 

  • 멀티 모듈 프로젝트 작성
  • 신규 주문 정상처리 프로세스 구현 
  • 배송 상태 변경 및 재고 증감 처리
  • 신규 주문 보상처리 프로세스 구현
  • 주문 수정 정상처리 프로세스 구현
  • 주문 수정 보상처리 프로세스 구현
  • 주문 삭제 정상처리 프로세스 구현
  • 주문 삭제 보상처리 프로세스 구현
  • API Composition 패턴과 CQRS 패턴

책 한번 내겠다는 평소의 꿈을 실현하기 위해 이번에 전자책을 내게 되었습니다. 

제 꿈을 응원하신다는 마음으로 전체 내용을 공유하지 않는것을 양해해 주시고 

구매까지 해 주시면 더욱 감사하겠습니다. 

https://happycloud-lee.tistory.com/notice/291

 

[마이크로서비스패턴 쉽게 개발하기] 전자책 출간

안녕하세요? 온달입니다. 그동안 몇 번 시도하다 포기했던 책내기에 드디어 성공 했습니다. 책 제목은 '마이크로서비스패턴 쉽게 개발하기'입니다. 2024년 2월 19일 부터 교보문고, Yes24, 알라딘 등

happycloud-lee.tistory.com

 

댓글

전자책 출간 알림 [마이크로서비스패턴 쉽게 개발하기]