구현하고 싶은 기능
여행 관광지나 축제 리뷰를 지도로 관리하는 프로젝트를 진행하면서 사용자가 관심있는 지역을 팔로우하고, 그 지역에 새로운 관광지나 축제가 추가 될 때 알림을 보내고 싶었습니다.
일단 구현화면부터 보여드리겠습니다.
지역 팔로우
관리자가 새로운 컨텐츠 추가시 팔로워들에게 알림(좌측 화면 사용자/ 우측 화면 관리자)
SSE 선택 이유
프로젝트에서 새로운 관광지나 축제가 생길시 서버에서 클라이언트로 단방향 알림 푸쉬가 필요했습니다.
요청을 주기적으로 보내면서 이벤트를 체크하는 polling은 http 오버헤드 때문에 서버의 부담이 크고, 양방향 통신이 필요없기에 websocket은 고려하지 않았습니다.
http 요청을 이벤트 발생시까지 유지하는 Long polling과 text/event-stream 타입의 http 연결을 유지하는 SSE 중 서버와 클라이언트 간의 연결이 유지되어 자주 이벤트가 발생할 때 클라이언트에 대한 부하가 상대적으로 적고 Spring4.2부터 SSE Emitter를 지원해서 2주라는 빠듯한 프로젝트 일정에 빠르게 구현할 수 있을 것 같아 SSE를 선택했습니다.
sse 구현
알림 발송 플로우
1. 사용자가 관심 있는 지역 팔로우
2. 지역 관리자의 새로운 컨텐츠 추가
3. 새로운 컨텐츠가 생기는 지역의 팔로워들에게 알림 발송
3-1. 접속중인 사용자에게 SSE 사용해 실시간 알림 발송
3-2. 서비스 미사용중인 사용자는 로그인 시에 SSE 연결 요청을 진행할 때 안읽은 알림 발송
SSE Emmitters
public class SseEmitters {
// 접속자 관리 Thread-Safe한 자료구조 사용 필요
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
private static final Long DEFAULT_TIMEOUT = 1000L * 60;
// 접속자 체크
public boolean checkSse(Long accountId) {
if(emitters.get(accountId) == null) return false;
return true;
}
// 사용자 로그인 시 SSE 연결 처리
public SseEmitter add(Long accountId, List<Notification> notifications) {
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitters.put(accountId, emitter);
try {
emitter.send(SseEmitter.event()
.name("connect")
.data("connected!"));
} catch (IOException e) {
throw new RuntimeException(e);
}
makeLoginNotification(accountId, notifications);
emitter.onCompletion(() -> {
log.info("onCompletion callback");
this.emitters.remove(accountId); // 만료되면 리스트에서 삭제
});
emitter.onTimeout(() -> {
log.info("onTimeout callback");
emitter.complete();
});
return emitter;
}
// 실시간 알림 푸쉬
public void makeNotification(Long accountId, Long notificationId) {
try {
emitters.get(accountId)
.send(SseEmitter.event()
.name("notification")
.data(String.valueOf(notificationId)));
} catch (Exception e) {
e.printStackTrace();
}
}
// 로그인 시 안읽은 알림 푸쉬
public void makeLoginNotification(Long accountId, List<Notification> notifications) {
for (int i = 0; i < notifications.size(); i++) {
try {
makeNotification(accountId, notifications.get(i).getId());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
emitter의 onCompletion과 onTimeout 콜백은 별도의 스레드에서 실행되기에 접속자 요청 컨트롤하는 SSE Emitter들을 관리하는 컬렉션은 Thread-Safe한 자료구조를 사용해야 concurrentModificationException이 발생하지 않습니다. SSE Emitter의 생성과 삭제, 연결 요청이 동시에 이뤄질 수 있어 여러 쓰레드에서 안전하게 사용할 수 있는 자료구조를 사용해야하기 때문입니다.
전체적인 코드 흐름을 볼 수 있는 프로젝트 레포지토리입니다.
https://github.com/JaegeonYu/yesterday-s-Trip
SSE 효율, 한계
Spring MVC 기반의 프로젝트에서 SSE 연결은 클라이언트와 서버를 지속적으로 연결하기에 스레드풀의 규모와 직접적인 연관이 있어 WebFlux 기반의 프로젝트보다 최대 연결 개수가 적습니다.
SSE 연결 관리를 메모리에서 관리하기에 서비스 규모가 작은 단일 서버에서는 효과적으로 실시간으로 단방향 통신을 할 수 있지만, 도메인 별로 서버를 나누는 MSA 환경이나 대규모 서비스에서 다중 서버 환경에서는 Redis Pub/Sub, Kafka를 덧붙이거나 Web Push API로 처리해야합니다.
참고
https://www.baeldung.com/spring-server-sent-events
https://tecoble.techcourse.co.kr/post/2022-10-11-server-sent-events/
https://sothoughtful.dev/posts/sse/
https://dkswnkk.tistory.com/702