[마이크로서비스 패턴 쉽게 개발 4] Axon Framework(4): QueryGateway
QueryGateway는 비동기 통신으로 데이터를 조회하는 방법입니다.
동기로 조회하는 방법과 비동기로 조회하는 방법의 차이가 뭘까요 ?
동기통신으로 조회한다는 것은 요청자와 제공자가 직접 통신한다는 의미이고, 비동기 통신으로 조회한다는 것은 요청자와 제공자 사이에 중계자를 통해 조회하는 것입니다.
동기 조회는 요청자가 제공자가 누군지 알아야 하고 제공자의 응답 포맷이 변하면 소스를 수정해야 합니다.
비동기 조회는 반대로 요청자는 제공자가 누군지 알 필요 없고 제공자가 요청자가 요구하는 포맷에 맞추는 방식입니다.
예를 들어 모든 은행에서 내가 가진 계좌의 정보를 조회하는 서비스를 만든다고 생각해 봅시다.
동기 통신이라면 아래와 같이 '전계좌 조회'서비스에서 각 은행 계좌 서비스에 요청을 해야 합니다.
'New Bank'라는 새로운 은행이 생기면 '전계좌 조회' 서비스는 요청을 위해 소스를 수정해야 합니다.
비동기통신 조회를 하면 이러한 문제가 해결 됩니다.
이제 전계좌 조회는 MQ(Message Queue)에 Query 메시지만 보내면 됩니다.
각 은행은 MQ 채널에 Query요청이 오면 응답을 MQ로 보내고 되고, MQ는 '전계좌 조회'서비스에 결과를 리턴하게 됩니다.
이런 비동기 통신에서는 제공자의 수가 아무리 늘어나도 요청자는 신경 쓸 필요가 없습니다.
한마디로 요청자와 제공자의 관계가 Loosely Coupling 되기 때문에 비동기 통신 조회가 더 나은 방식이라고 말할 수 있습니다.
Axon Framework은 QueryGateway를 통해 비동기 통신 조회를 지원 합니다.
그럼 실습을 통해 이해해 보도록 하겠습니다.
Axon Framework이 제공하는 Query 방식에는 4가지가 있습니다.
주문현황 서비스를 예로 들어 그 차이를 설명해 보겠습니다.
1) Point to Point Query: 한 번에 하나씩의 서비스만 Query함
아래 그림과 같이 각각의 서비스의 Query Handler에서 응답을 받는 방식입니다.
2) Scatter-Gather Query: 동시에 여러 서비스를 Query하여 합친 응답을 받음
요청자 서비스는 한 번만 Query 요청을 보내고 병합된 응답을 받습니다.
위 계좌 조회 서비스는 Scatter-Gather Query를 사용한 예가 되겠습니다.
3) Subscription Query: 제공자 서비스가 Query결과의 변화가 있을 때마다 응답을 함
1회성으로 Query하는 것이 아니라 제공자 서비스의 Query 결과 변화가 있을 때마다 응답을 받습니다.
Client와 '주문현황' 서비스는 변경 상황을 반영하기 위해 계속 Connection을 맺고 있어야 하고 각 Subscription Query마다 서버의 Thread가 늘어나야 해서 실시간으로 Client에 변화가 반영되어야 하는 경우에만 사용하는 것이 좋습니다.
4) Streaming Query: 대용량 응답 결과를 일정 크기 단위로 지속적으로 응답 받는 방식
실습에서는 Point to Point Query와 Scatter-Gather Query만 해 보도록 하겠습니다.
Point to Point Query: QueryGateway.query(...)
지정한 코끼리 정보만 Query하는 API와 등록된 코끼리 리스트를 Query하는 API를 만들면서 실습 하겠습니다.
1) 지정한 코끼리 정보 Query
- APIController에 API 추가: 코끼리ID를 입력 받아 elephantService.getElephant(id)를 호출하여 결과를 리턴합니다.
@GetMapping("/elephant/{id}")
@Operation(summary = "코끼리 정보 API")
@Parameters({
@Parameter(name = "id", in= ParameterIn.PATH, description = "코끼리ID", required = true)
})
private ResultVO<Elephant> getElephant(@PathVariable(name = "id") String id) {
log.info("[@GetMapping '/elephant/{id}'] Id: {}", id);
return elephantService.getElephant(id);
}
- ElephantService에 getElephant 메소드 추가
QueryGateway객체를 먼저 생성 합니다.
Query메시지를 binary로 변환하지 않고 원문 그대로 보내야 하므로 'transient' 옵션을 꼭 붙이셔야 합니다.
public class ElephantService {
...
@Autowired
private transient QueryGateway queryGateway;
...
}
getElephant 메소드를 추가 합니다.
public ResultVO<Elephant> getElephant(String id) {
log.info("[ElephantService] Executing getElephant for Id: {}", id);
ResultVO<Elephant> retVo = new ResultVO<>();
Elephant elephant = queryGateway.query(new GetElephantQuery(id),
ResponseTypes.instanceOf(Elephant.class)).join();
if(elephant != null) {
retVo.setReturnCode(true);
retVo.setReturnMessage("ID: "+ id);
retVo.setResult(elephant);
} else {
retVo.setReturnCode(false);
retVo.setReturnMessage("Can't get elephant for Id:"+id);
}
return retVo;
}
- queryGateay.query({조회 조건을 담고 있는 객체}, {응답 형식}).join()
- new GetElephantQuery(id): 코끼리ID를 갖고 있는 객체 생성. GetElephantQuery는 조금 이따 작성함.
- ResponseTypes.instanceOf(Elephant.class)): 응답 형식을 ResponseTypes class의 객체를 만들어 전달해야 함
- Query Handler는 아래 예처럼 만들어야 합니다. Axon Server는 @QueryHandler 어노테이션이 있고 'GetElephantQuery' 객체를 파라미터로 갖고 있는 Query Handler가 있는 어플리케이션으로 Query를 푸시 합니다.
@QueryHandler
private Elephant getElephant(GetElephantQuery query) {
Optional<Elephant> optElephant = elephantRepository.findById(query.getId());
return optElephant.orElse(null);
}
- queryGateay.query({Query Handler 이름}, {조회 조건}, {응답 형식}).join()로 바꿀 수도 있습니다.
- 예) queryGateay.query("get_elephant", id, ResponseTypes.instanceOf(Elephant.class))).join()
- Query Handler에서는 @QueryHandler(queryName = "get_elephant")라고 어노테이션 되어 있어야 합니다.
- Query명을 지정하여 요청하는 경우 Query Handler는 아래와 같이 바꿔야 합니다.
@QueryHandler(queryName = "get_elephant")
private Elephant getElephant(String id) {
Optional<Elephant> optElephant = elephantRepository.findById(id);
return optElephant.orElse(null);
}
- GetElephantQuery class 추가
package 'org.axon.queries'를 만들고 하위에 추가 합니다.
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class GetElephantQuery {
private String id;
}
- Query Handler 작성
조회 어플리케이션을 추가하고 거기에 만드는게 자연스러우나 일이 많아지니 현재 서비스에 만들도록 하겠습니다.
package 'org.axon.queries' 하위에 'ElephantQueryHandler' class를 만들고 Query Handler 메소드를 추가 합니다.
import lombok.extern.slf4j.Slf4j;
import org.axon.entity.Elephant;
import org.axon.repository.ElephantRepository;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
@Slf4j
@Component
public class ElephantQueryHandler {
private final ElephantRepository elephantRepository;
@Autowired
public ElephantQueryHandler(ElephantRepository elephantRepository) {
this.elephantRepository = elephantRepository;
}
//-- 코끼리 정보 리턴
@QueryHandler
private Elephant getElephant(GetElephantQuery query) {
Optional<Elephant> optElephant = elephantRepository.findById(query.getId());
return optElephant.orElse(null);
}
}
- 테스트
어플리케이션을 재시작 하고 swagger page에서 테스트 합니다.
2) 코끼리 리스트 API
코끼리 정보 API가 단일 값을 Query하는 예제라면 코끼리 리스트 API는 복수 값을 Query하는 예제입니다.
- APIController에 API 추가
@GetMapping("/elephants")
@Operation(summary = "코끼리 리스트 API")
private ResultVO<List<Elephant>> getLists() {
log.info("[@GetMapping '/elephants']");
return elephantService.getLists();
}
- ElephantService에 'getLists' 추가
- Query객체 대신 Query이름을 지정하였습니다. 두번째 파라미터인 검색 조건은 생략이 안되어 빈 문자열을 넘깁니다.
- 응답 형식이 복수 값이기 때문에 ResponsTypes.multipleInstancesOf 로 지정 합니다.
public ResultVO<List<Elephant>> getLists() {
log.info("[ElephantService] Executing getLists");
ResultVO<List<Elephant>> retVo = new ResultVO<>();
//Point to Point Query
List<Elephant> elephants = queryGateway.query("list", "",
ResponseTypes.multipleInstancesOf(Elephant.class)).join();
retVo.setReturnCode(true);
retVo.setReturnMessage("코끼리수: "+elephants.size());
retVo.setResult(elephants);
return retVo;
}
- ElephantQueryHandler에 Query Handler 메소드 추가
@QueryHandler(queryName = "list")로 Query명을 지정해 줘야 합니다.
//-- 코끼리 리스트를 리턴
@QueryHandler(queryName = "list")
private List<Elephant> getElephants(String dummy) {
Optional<List<Elephant>> optElephants = Optional.of(elephantRepository.findAll());
return optElephants.orElse(null);
}
- 테스트
어플리케이션을 재시작 하고 Swagger Page에서 테스트 합니다.
Scatter-Gather Query: QueryGateway.scatterGather(...)
이번에는 위에서 만든 Query 요청을 Scatter-Gather Query 방식으로 바꿔 보겠습니다.
1) 코끼리 정보 API
- ElephantService > getElement 변경
- queryGateway.scatterGather
- 여러 서비스에서 응답 받고 그중 장애가 있는 서비스도 있을 수 있으니 Timeout 조건을 추가해야 합니다.
- scatterGather로 Query한 결과는 무조건 배열입니다. 'get(0)'으로 그 중에 첫번째 값만 Elephant객체에 할당 합니다.
- 기존 Point to Point 방식의 Query는 Remark 합니다.
public ResultVO<Elephant> getElephant(String id) {
log.info("[ElephantService] Executing getElephant for Id: {}", id);
ResultVO<Elephant> retVo = new ResultVO<>();
//-- Scatter Gather Query
Elephant elephant = queryGateway.scatterGather(new GetElephantQuery(id),
ResponseTypes.instanceOf(Elephant.class), 30, TimeUnit.SECONDS)
.toList().get(0);
//Elephant elephant = queryGateway.query(new GetElephantQuery(id), ResponseTypes.instanceOf(Elephant.class)).join();
if(elephant != null) {
retVo.setReturnCode(true);
retVo.setReturnMessage("ID: "+ id);
retVo.setResult(elephant);
} else {
retVo.setReturnCode(false);
retVo.setReturnMessage("Can't get elephant for Id:"+id);
}
return retVo;
}
2) 코끼리 리스트 API
- ElephantService > getLists 변경
- queryGateway.scatterGather로 변경하고 Timeout 파라미터를 추가 합니다.
- listElephants.stream().flatMap(Collection::stream).toList(): 여러 서비스에서 각각 응답 온 배열값을 합칩니다.
public ResultVO<List<Elephant>> getLists() {
log.info("[ElephantService] Executing getLists");
ResultVO<List<Elephant>> retVo = new ResultVO<>();
//-- Scatter Gather Query
List<List<Elephant>> listElephants = queryGateway.scatterGather("list", "",
ResponseTypes.multipleInstancesOf(Elephant.class), 30, TimeUnit.SECONDS)
.toList();
List<Elephant> elephants = listElephants.stream()
.flatMap(Collection::stream).toList();
//Point to Point Query
// List<Elephant> elephants = queryGateway.query("list", "",
// ResponseTypes.multipleInstancesOf(Elephant.class)).join();
retVo.setReturnCode(true);
retVo.setReturnMessage("코끼리수: "+elephants.size());
retVo.setResult(elephants);
return retVo;
}
3) 테스트
어플리케이션을 재시작하고 Swagger page에서 잘 동작하는지 테스트 합니다.
TIP) @Autowired 객체의 Warning 제거
필드를 Auto wired하기 때문에 발생합니다. Warning을 없애려면 아래와 같이 생성자를 이용하여 Auto wired하도록 변경하십시오.
public class ElephantService {
private transient final CommandGateway commandGateway;
private transient final QueryGateway queryGateway;
private final ElephantRepository elephantRepository;
@Autowired
public ElephantService(CommandGateway commandGateway, QueryGateway queryGateway, ElephantRepository elephantRepository) {
this.commandGateway = commandGateway;
this.queryGateway = queryGateway;
this.elephantRepository = elephantRepository;
}
...
}
이번 편에서는 QueryGateway를 이용한 Point to Point Query와 Scatter-Gather Query를 만들어 봤습니다.
다음 편에서는 Event Store를 이용한 Event Replay 속도를 높이는 Snapshot 사용 방법을 배워 보겠습니다.
마이크로서비스 패턴 쉽게 개발하기 목차
- 마이크로서비스 패턴 이해: Saga, Event Sourcing, API Composition, CQRS 이해
- 실습환경 준비
- 주문 서비스 테스트
- Axon Framework 이해
아래 주제들은 '마이크로서비스패턴 쉽게 개발하기'라는 제 책에서 만나실 수 있습니다.
- 멀티 모듈 프로젝트 작성
- 신규 주문 정상처리 프로세스 구현
- 배송 상태 변경 및 재고 증감 처리
- 신규 주문 보상처리 프로세스 구현
- 주문 수정 정상처리 프로세스 구현
- 주문 수정 보상처리 프로세스 구현
- 주문 삭제 정상처리 프로세스 구현
- 주문 삭제 보상처리 프로세스 구현
- API Composition 패턴과 CQRS 패턴
책 한번 내겠다는 평소의 꿈을 실현하기 위해 이번에 전자책을 내게 되었습니다.
제 꿈을 응원하신다는 마음으로 전체 내용을 공유하지 않는것을 양해해 주시고
구매까지 해 주시면 더욱 감사하겠습니다.
https://happycloud-lee.tistory.com/notice/291