[마이크로서비스 패턴 쉽게 개발 4] Axon Framework(3): EventGateway
이번 편에서는 EventGateway를 통해서 Event를 publish하는 방법을 해 보겠습니다.
Event Publishing이 필요한 대표적인 케이스는 별도의 마이크로서비스에 CQRS패턴을 적용하여 조회 전용 DB를 만드는 경우 입니다.
Aggregate에서 AggregateLifecycle.Apply({event 객체})로 발행된 Event는 해당 서비스 내에서만 이용할 수 있습니다.
조회 전용 DB를 관리하는 별도의 서비스에서 연관된 다른 서비스 DB의 변화를 감지하려면 어떻게 해야 할까요?
그 서비스들이 자신의 DB 변화가 있을 때 변경이 있다는 Event를 발행해 줘야 합니다.
이때 사용하는 것이 EventGateway입니다.
또 하나의 케이스는 Saga 프로세스에서 참여 서비스의 실패 Event를 감지하기 위해 필요 합니다.
A서비스에서 B서비스로 Command를 보낸 후 B서비스에서 어떠한 이유로 처리가 실패했다고 가정해 봅시다.
이때 B서비스가 EventGateway로 실패 Event를 발행해야 A서비스가 Saga 프로세스에서 감지할 수 있습니다.
우리는 서비스가 1개 이므로 많이 어거지지만 10번 과정에서 에러를 발생시켜 EventGateway의 동작을 학습 하도록 하겠습니다.
QueryEventHandler의 냉장고 넣기 상태를 업데이트하는 Event Handler에서 100kg이 넘을 때 실패 Event를 발행 하겠습니다.
많이 어거지인 이유는 이러한 validation 체크는 Command를 보내기 전에 ElephantService에서 미리 체크하는게 맞기 때문입니다.
실패 Event 보내기
1) ElephantEventHandler에 EventGateway 객체 생성
아래와 같이 EventGateway객체를 @Autowired로 생성 하십시오.
public class ElephantEventHandler {
@Autowired
private ElephantRepository elephantRepository;
@Autowired
private transient EventGateway eventGateway;
...
}
주의) EventGateway 객체는 transient 옵션을 붙여서 생성해야 함 유통되는 Message를 Binary로 변환하지 않고 원문 Text 그대로 보내야 하기 때문에 이 옵션을 반드시 붙여야 합니다. |
2) EnteredElephantEvent 처리 메소드 수정
상태를 'Enter'로 바꾸는 기존 코드 밑에 100kg이 넘을 때 'FailedEnterElephantEvent'를 Axon 서버로 보내도록 추가 합니다.
@EventHandler
private void on(EnteredElephantEvent event) {
log.info("[@EventHandler] EnteredElephantEvent for Id: {}", event.getId());
Elephant elephant = getEntity(event.getId());
if(elephant != null) {
elephant.setStatus(event.getStatus());
elephantRepository.save(elephant);
//무게가 100kg을 넘으면 실패 Event를 생성/발송함
if(elephant.getWeight() > 100) {
log.info("==== 100Kg 넘어서 넣기 실패! 실패 이벤트 발송!");
eventGateway.publish(new FailedEnterElephantEvent(event.getId()));
return;
}
}
}
package 'org.axon.events' 하위에 'FailedEnterElephantEvent' class를 추가 합니다.
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class FailedEnterElephantEvent {
private String id;
}
FailedEnterElephantEvent 처리 메소드 추가
Axon 서버는 참여하는 어플리케이션 중 'FailedEnterElephantEvent'를 처리할 수 있는 어플리케이션으로 메시지를 푸시 합니다.
실습에서는 ElephantEventHandler class에 메소드를 추가 개발하도록 하겠습니다.
이 메소드에서는 코끼리의 상태를 이전 상태로 Rollback하라는 Command를 발행 합니다.
1) CommandGateway 객체 생성 추가
public class ElephantEventHandler {
@Autowired
private ElephantRepository elephantRepository;
@Autowired
private transient EventGateway eventGateway;
@Autowired
private transient CommandGateway commandGateway;
...
}
2) 'FailedEnterElephantEvent' 처리 메소드 추가
'BackToReadyCommand' Command를 CommandGateway를 통해 Axon 서버로 보냅니다.
@EventHandler
private void on(FailedEnterElephantEvent event) {
log.info("[@EventHandler] FailedEnterElephantEvent for Id: {}", event.getId());
//-- 보상처리 요청
commandGateway.send(BackToReadyCommand.builder()
.id(event.getId())
.status(StatusEnum.READY.value())
.build());
}
package 'org.axon.command'에 BackToReadyCommand class를 추가 합니다.
import lombok.Builder;
import lombok.Value;
import org.axonframework.modelling.command.TargetAggregateIdentifier;
@Value
@Builder
public class BackToReadyCommand {
@TargetAggregateIdentifier
String id;
String status;
}
Command Handler 만들기
'BackToReadyCommand'를 처리할 Command Handler를 만들어야 합니다.
어느 class에 만들까요?
ElephantAggregate에 만들어야 겠죠. 그래야 상태를 변경할 코끼리 Entity에 대해 Event Replay를 통해 최종 상태를 Replay한 후 최종 상태 Event를 발행하고, Event Store에도 추가하며, 조회 DB도 업데이트 할 수 있기 때문 입니다.
1) ElephantAggregate에 Command Handler 메소드 추가
//-- 냉장고 넣기 실패 시 보상처리: 상태를 다시 'Ready'로 변경함
@CommandHandler
private void handle(BackToReadyCommand cmd) {
log.info("[@CommandHandler] BackToReadyCommand for Id: {}", cmd.getId());
AggregateLifecycle.apply(new BackToReadyCompletedEvent(cmd.getId(), cmd.getStatus()));
}
package 'org.axon.events'에 'BackToReadyCompletedEvent'를 추가 합니다.
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class BackToReadyCompletedEvent {
private String id;
private String status;
}
2) Event Store 추가 메소드 작성
ElephantAggregate에 추가 합니다.
@EventSourcingHandler
private void on(BackToReadyCompletedEvent event) {
log.info("[@EventSourcingHandler] BackToReadyCompletedEvent for Id: {}", event.getId());
this.status = event.getStatus();
}
3) ElephantEventHandler에 조회 DB 업데이트 메소드 추가
@EventHandler
private void on(BackToReadyCompletedEvent event) {
log.info("[@EventHandler] BackToReadyCompletedEvent for Id: {}", event.getId());
Elephant elephant = getEntity(event.getId());
if(elephant != null) {
elephant.setStatus(event.getStatus());
elephantRepository.save(elephant);
}
}
테스트
이제 무게가 100kg이 넘는 새로운 코끼리를 생성하고 냉장고에 넣어 보겠습니다.
어플리케이션을 재시작 합니다.
Swagger 페이지에서 100kg이 넘는 코끼리를 생성 요청해서 만드십시오.
{
"name": "덩치",
"weight": 130
}
생성된 코끼리ID를 응답결과에서 확인하여 냉장고 넣기 API를 실행 합니다.
응답 결과를 보면 잘 요청된 걸로 나올 겁니다.
DBeaver에서 코끼리의 상태를 확인해 보십시오.
지금까지 개발한게 잘 처리되었다면 코끼리의 상태는 'Ready'로 Rollback 되어 있어야 합니다.
잘 되셨죠 ?
지금까지 EventGateway를 이용한 Event Publishing 방법에 대해 실습 했습니다.
다음 편에서는 조회DB를 QueryGateway를 이용하여 조회하는 방법을 배워 보도록 하겠습니다.
마이크로서비스 패턴 쉽게 개발하기 목차
- 마이크로서비스 패턴 이해: Saga, Event Sourcing, API Composition, CQRS 이해
- 실습환경 준비
- 주문 서비스 테스트
- Axon Framework 이해
아래 주제들은 '마이크로서비스패턴 쉽게 개발하기'라는 제 책에서 만나실 수 있습니다.
- 멀티 모듈 프로젝트 작성
- 신규 주문 정상처리 프로세스 구현
- 배송 상태 변경 및 재고 증감 처리
- 신규 주문 보상처리 프로세스 구현
- 주문 수정 정상처리 프로세스 구현
- 주문 수정 보상처리 프로세스 구현
- 주문 삭제 정상처리 프로세스 구현
- 주문 삭제 보상처리 프로세스 구현
- API Composition 패턴과 CQRS 패턴
책 한번 내겠다는 평소의 꿈을 실현하기 위해 이번에 전자책을 내게 되었습니다.
제 꿈을 응원하신다는 마음으로 전체 내용을 공유하지 않는것을 양해해 주시고
구매까지 해 주시면 더욱 감사하겠습니다.
https://happycloud-lee.tistory.com/notice/291