개발일지

알림 기능을 구현해보자 - SSE(Server-Sent-Events)!

gilssang97 2021. 12. 24. 11:25

시작하기에 앞서

이번에 개발을 진행하면서 알림에 대한 요구사항을 만족시켜야하는 상황이 발생했다.

여기서 말하는 알림이 무엇인지 자세하게 살펴보자.

A라는 사람이 스터디를 생성했고 B라는 사람은 어느 스터디에도 가입하지 않은 정규 회원이라고 가정하자.

A라는 사람은 스터디 생성자일 것이고 B라는 사람은 위에서 언급했듯이 단순 정규 회원이다.

B라는 사람은 어느 스터디에도 가입할 수 있고 A라는 사람이 생성한 스터디에 가입 신청을 하여 스터디에 가입할 수 있다.

A라는 사람은 항상 웹페이지에 들어가지도 않을 것이고 항상 스터디 관리페이지에 들어가지 않을 것이다.

이런 상황속에서 A라는 사람은 어느 누군가(현재 상황에선 B라는 사람)가 스터디에 지원을 했는지 알고 이를 승인하거나 거절해야할까?

바로 가장 쉬운 접근방법은 알림으로 생성자나 관리자에게 어느 누군가가 스터디에 신청했다는 사실을 알리는 것이다.

이 뿐 아니라 다양한 곳에서 이러한 알림들이 사용된다.

스터디에 신청한 사람도 스터디 가입에 승인됐는지, 거절됐는지에 대한 여부를 통보받을 수 있으며 어느 스터디 가입자가 생성한 게시글에 누군가 댓글을 단다면 이에 대한 부분을 입력받을 수 있어야 한다.

하지만 생각해보면 그리 간단하지만은 않다.

우리가 현재 사용하는 방식에서 B라는 사람이 서버에게 저 스터디 신청합니다.라고 말했을뿐인데 서버가 다른 사용자인 A에게 누군가 너의 스터디에 스터디 신청했어라고 말해줄 순 없으니 말이다.

이 부분에 대해서는 어떤 방식으로 해결해볼 수 있을까?

다양한 접근법에 대해 알아보자.

클라이언트 - 서버 통신

우리가 현재 이용하는 HTTP 프로토콜의 특징 중 중요한 부분 중 하나는 비연결성이라는 것이다.

그렇기에 A라는 사람이 이전에 연결한 적이 있어도 서버는 연결을 그냥 끊어버린다.

이로 인해 B라는 사람이 스터디 가입 신청을 한 뒤 서버가 A에게 가입했다는 알림 메세지를 주려고 하나 Connection이 존재하지 않아 보낼 수 없는 상황이 발생하는 것이다.

이걸 해결하는 방식이 존재한다. 바로 폴링(Polling)방식이다.

이것이 무엇인지 알아보자.

폴링(Polling)

앞서 설명했듯이, HTTP는 비연결성이라는 특징을 가지고 있어 연결을 끊어버린다.

이에 대한 해결책으로 계속 평범한 Request를 날려 이벤트가 발생한다면 그 이후의 Request를 통해 해당 이벤트에 대한 결과를 리턴해주면 되는 것이다. (Connection이 생겼기 때문이다.)

쉽게 생각할 수 있는 방식이다만, 잠깐만 생각해보면 별로 좋지 않은 결과를 줄 수 있다는 것을 알 수 있다.

클라이언트가 계속 Request를 보내면서 서버의 부담은 점점 늘어난다.

그리고 Connection을 맺고 끊는 것에 대한 비용 부담이 커지기 때문에 좋지 않을 뿐더러 이러한 방식이 진짜 실시간으로 동작한다고 보기에도 어려운 점이 존재한다.(주기적으로 보내는 것이기에 바로 응답한다는 것에 대한 보장이 없다.)

하지만 주기적으로 갱신되는 데이터가 존재한다면 이러한 방식이 도움이 될 순 있을 것이다.

그러면 다른 방법은 없을까? 바로 긴 폴링(Long Polling)이 존재한다.

긴 폴링(Long Polling)

이번에는 계속적으로 요청하는 것이 아니라 유지 시간을 조금 길게 가져본다는 것이다.

즉, Connection을 계속 열어두고 요청이 온다면 해당 요청을 처리하는 것이다.

처음에 긴 Connection을 가질 수 있게 Request를 보낸다.

그리고 지속되고 있는 Connection 시간동안 어떤 이벤트가 발생했을 때 그 이벤트에 대한 결과 값을 지속되고 있는 Connection을 통해 보내면 된다.

그리고 다시 긴 Connection을 가질 수 있게 Request를 보낸다.

그리고 위의 방식을 반복하게 된다.

이렇게 된다면 폴링 방식과 달리 계속적으로 Connection을 열어두고 있다가 바로 그에 대한 결과값을 받기에 실시간성이 보장될 것이다.

또한, 이 방법은 기존 폴링 방식과 달리 지속적인 요청을 보내지 않아 그나마 부담이 덜하다.

하지만 시간 간격이 좁다면 사실상 기존 폴링과 큰 차이가 없게되고 지속적으로 연결되어 있기 때문에 다수의 클라이언트에게 동시에 이벤트가 발생될 경우 Response를 보내고 Request를 다시 보내야하기 때문에 순간적인 부담이 급증하게 된다.

결국 이벤트가 자주 발생하는 경우에는 부담이 될 수 밖에 없다.

그렇다면 다른 방법은 없을까? 바로 스트리밍(Streaming)이다.

스트리밍(Streaming)

이번에는 Request에 대한 응답으로 Response을 내려 Connection을 종료하는 것이 아니라 Connection을 유지한채로 다음 응답을 계속 받아오면 된다.

서버는 일정 시간동안 요청을 대기시키고, Chunked 메시지를 이용해서 응답 시 연결을 계속 유지한다.

하지만 클라이언트에서 서버로 데이터를 보내는게 힘들다.

위의 세 방식 모두 HTTP를 통해 통신하기 때문에 Request/Response 모두 헤더가 불필요하게 크다.

이러한 단점들을 해소하기 위해 나온 것이 바로 웹 소켓(WebSocket)이다.

웹 소켓(WebSocket)

웹 소켓은 HTTP와 같은 프로토콜의 일종으로 클라이언트와 서버 간의 효율적인 양방향 통신을 실현하기 위한 구조이다.

웹 소켓은 양방향 통신으로 진행되고 최초 접속이 일반 Http 요청을 이용한 Handshaking으로 이루어진다. (80, 443 포트로 접속하므로 추가로 방화벽을 여리 않고도 가능하다. 그렇기 때문에 HTTP 규격인 CORS 적용, 인증 등을 기존과 동일하게 보장받을 수 있다.)

Http와 같이 연결 후 끊어버리는 것이 아니라 연결 후 계속적으로 Connection을 지속하므로 연결하는데 필요한 불필요한 비용을 제거할 수 있다.

또한 위에서 HTTP 통신의 Request/Response 헤더가 불필요하게 크기에 문제가 생겼는데 웹소켓을 이용하면 최초 접속시에만 헤더 정보를 보내고 더 이상 보내지 않으므로 이를 처리해줄 수 있다.

그리고 기존 Http 요청과 달리 웹소켓 포트에 접속해 있는 모든 클라이언트에게 이벤트 방식으로 응답한다.

이전 HTTP를 활용한 방식보다 훨씬 좋아보인다!

그러면 이걸 활용해서 알림 서비스를 이용하면 되겠네라고 말할 수 있겠지만 하지만 조금만 생각을 해보자.

웹소켓은 말그대로 양방향 통신에 적합하다. 채팅에서와 같이 클라이언트와 서버가 양방향 통신이 필요한 부분에서는 좋은 선택이겠지만 알림 서비스는 단지 알림을 받는 사람 입장에서는 전혀 요청을 하지 않고 서버에서만 응답을 받는 단방향 시스템이라는 사실을 알 수 있다.

물론 서버에서 클라이언트로 보내는 (거의) 방향에서도 사용할 수 있겠지만 이 방법은 조금 비효율적일 수 있겠다는 생각이 들었다. 그래서 다른 방안이 있나 찾아보게 되었다. 바로 SSE(Server-Sent-Events)이다.

SSE(Server-Sent-Events)

SSE는 웹소켓과 달리, 클라이언트는 서버로부터 데이터만 받을 수 있게 된다.

SSE는 웹소켓과 달리 별도의 프로토콜을 사용하지 않고 HTTP 프로토콜만으로 사용이 가능하기에 훨씬 가볍다.

접속에 문제가 있으면 자동으로 재연결을 시도하지만 클라이언트가 페이지를 닫아도 서버에서 감지하기가 어렵다.

또 다른 특징으로 HTTP/1.1의 경우 브라우저당 6개의 접속만을 허가하며 HTTP/2에서는 100개까지의 접속을 허용한다.

알림 기능만을 고려했을 때, 웹소켓보다 가벼운 SSE를 선택하는 것이 더욱 좋은 선택이라 생각했고 이를 통해 알림 기능을 구현해보려고 했다.

현재 프로젝트를 진행하기 위해 사용중인 프레임워크는 Spring Framework로, Spring Framework는 4.2(2015년)부터 SseEmitter 클래스를 제공하여 서버 사이드에서의 SSE 통신 구현이 가능해졌다.

JS에서는 EventSource를 이용하여 연결 생성 및 전송된 이벤트에 대한 제어가 가능하다.

EventSource를 이용하여 연결 생성 요청을 서버에 보낸다면 서버는 이를 처리해 연결을 진행해주어야한다.

그렇기 위해서는 서버에서 이 요청을 처리해줄 수 있는 부분을 구현해야한다.

이를 이용해 실제로 구현해보자!

Controller 구현

구현하기에 앞서 몇몇 부분에 대해 알고있어야 한다.

연결 요청을 처리하기 위해서, MIME Type을 text/event-stream형태로 받아줘야한다.

그리고 헤더에 가끔 Last-Event-ID라는 값을 담기도 한다.

이 값은 이전에 받지 못한 이벤트가 존재하는 경우(SSE 연결에 대한 시간 만료 혹은 종료), 받은 마지막 이벤트 ID 값을 넘겨 그 이후의 데이터(받지 못한 데이터)부터 받을 수 있게 할 수 있는 정보를 의미한다.

방금 말했듯이, 항상 전달받는 정보는 아니기 때문에 필수정보는 아니다.

이를 코드로 구현하면 아래와 같다.

@RestController
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationService notificationService;

    @ApiOperation(value = "알림 구독", notes = "알림을 구독한다.")
    @GetMapping(value = "/subscribe", produces = "text/event-stream")
    @ResponseStatus(HttpStatus.OK)
    public SseEmitter subscribe(@AuthenticationPrincipal MemberDetails memberDetails,
                                @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
        return notificationService.subscribe(memberDetails.getId(), lastEventId);
    }
}
  • 실제 클라이언트로부터 오는 알림 구독 요청을 받는다.
  • 현재 누구로부터 온 알림 구독인지에 대한 부분은 @AuthenticationPrincipal을 활용해 입력받는다. (현재 Spring Security를 이용 중이기 때문에 이에 대한 정보를 받아올 수 있다.)
  • 이전에 받지 못한 정보가 있다면, Last-Event-ID라는 헤더와 함께 날아오므로 이에 대한 정보를 받아주도록 한다. (항상 날아오는 정보가 아니기 때문에)
  • 그리고 실제 구독하는 작업을 진행한다.

모든 테스트 코드의 경우 아래 계층부터 천천히 살펴보겠다.

이제 실질적으로 어떤식으로 구현됐는지에 대해 알아보기 위해 도메인 계층부터 다시 올라가보자.

도메인 구현

알림의 구성요소는 어떻게 이루어져있을까에 대해 먼저 생각해보자.

알림은 "누구 : ~에 대한 알림이 도착했습니다." 이런 형식에 해당 알림을 클릭하면 해당 관련 페이지로 이동하는 식이 아닌가 싶다.

그렇다면 누구에게 온 알림인지 (회원), 어떤 내용의 알림인지 (내용), 어떤 페이지로 이동할지 (링크)로 구성될 것이다.

아직 부족한 부분이 있다. 보통 한 번 알림을 읽으면 읽음 표시되어 더이상 우리에게 반짝이지 않는 것처럼 해당 알림을 읽었는지에 대한 처리도 해줘야할 것이다.

이를 실제로 구성한다면 아래와 같다.

@Entity
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@EqualsAndHashCode(of = "id")
public class Notification extends EntityDate {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "notification_id")
    private Long id;

    @Embedded
    private NotificationContent content;

    @Embedded
    private RelatedURL url;

    @Column(nullable = false)
    private Boolean isRead;

    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    private NotificationType notificationType;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "member_id")
    @OnDelete(action = OnDeleteAction.CASCADE)
    private Member receiver;

    @Builder
    public Notification(Member receiver, NotificationType notificationType, String content, String url, Boolean isRead) {
        this.receiver = receiver;
        this.notificationType = notificationType;
        this.content = new NotificationContent(content);
        this.url = new RelatedURL(url);
        this.isRead = isRead;
    }

    public String getContent() {
        return content.getContent();
    }

    public String getUrl() {
        return url.getUrl();
    }
}
  • NotificationContent - 알림의 내용이다. 비어있지 않아야하며 50자 이내여야한다.
  • RelatedUrl - 관련 링크이다. 비어있지 않아야한다.
  • isRead - 읽었는지에 대한 여부를 나타낸다.
  • NotificationType - 알림 종류에 관한 것이다. [스터디 신청, 스터디 승인, 스터디 거절, 댓글]으로 구성되어있으며 필요하다면 추후 추가한다.
  • receiver - 회원 정보에 대한 연관관계 매핑이다.
  • Getter - 각 래퍼 클래스에 대한 추출 메소드이다.

도메인 테스트

알림 내용에 대한 테스트를 진행해보자.

class NotificationContentTest {

    @Test
    @DisplayName("알림 내용이 50자 이내일 경우 성공한다.")
    public void test1() throws Exception {
        //given

        //when, then
        Assertions.assertDoesNotThrow(() -> new NotificationContent("hi".repeat(2)));
    }

    @Test
    @DisplayName("알림 내용이 50자 이상일 경우 실패한다.")
    public void test2() throws Exception {
        //given

        //when, then
        Assertions.assertThrows(InvalidNotificationContentException.class, () -> new NotificationContent("hi".repeat(31)));
    }

    @Test
    @DisplayName("알림 내용이 공백일 경우 실패한다.")
    public void test3() throws Exception {
        //given

        //when, then
        Assertions.assertThrows(InvalidNotificationContentException.class, () -> new NotificationContent(" "));
    }
}

알림 링크에 대한 테스트를 진행해보자.

class RelatedURLTest {

    @Test
    @DisplayName("관련 링크가 공백이 아닐 경우 성공한다.")
    public void test1() throws Exception {
        //given

        //when, then
        Assertions.assertDoesNotThrow(() -> new RelatedURL("koner.kr/study"));
    }

    @Test
    @DisplayName("관련 링크가 공백일 경우 실패한다.")
    public void test2() throws Exception {
        //given

        //when, then
        Assertions.assertThrows(InvalidRelatedURLException.class, () -> new RelatedURL(null));
    }
}

이에 대한 테스트를 진행하면 모두 성공적으로 완료되는 모습을 보인다.

이제 레포지토리 계층을 살펴보자.

레포지토리 구현

현재 알림에 대한 CRUD를 제외하고 크게 사용하는 부분이 없다.

우리는 Data JPA를 이용하고 있어 편하게 이를 구현할 수 있다.

public interface NotificationRepository extends JpaRepository<Notification, Long> {
}

하지만 추가적으로 고려해야하는 부분이 존재한다.

우리는 SseEmitter를 이용해 알림을 실제로 보내게 되는데 어떤 회원에게 어떤 Emitter가 연결되어있는지를 저장해줘야하고 어떤 이벤트들이 현재까지 발생했는지에 대해서도 저장하고 있어야 한다. (추후 Emitter의 연결이 끊기게 되면 저장되어 있는 Event를 기반으로 이를 전송해줄 수 있어야 되기 때문이다.)

그렇기 때문에 추가적으로 EmitterRepository를 추가적으로 구현해주었다.

구현하는데 있어 먼저 레포지토리를 인터페이스로 구현했다.

현재는 Map을 이용해 Emitter와 이벤트를 저장하는 형식으로 구현하였으나 추후 다른 방식으로 이를 구현할 수 있기 때문에 유연한 전환을 위해 추상체를 생성하고 그 아래에 구현체로서 구현하는 방식을 채택했다.

먼저 EmitterRepository이다.

public interface EmitterRepository {
    SseEmitter save(String emitterId, SseEmitter sseEmitter);
    void saveEventCache(String emitterId, Object event);
    Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId);
    Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId);
    void deleteById(String id);
    void deleteAllEmitterStartWithId(String memberId);
    void deleteAllEventCacheStartWithId(String memberId);
}

위와 같은 메소드들을 확인할 수 있는데 각각에 대해 살펴보면 다음과 같다.

  • save - Emitter를 저장한다.
  • saveEventCache - 이벤트를 저장한다.
  • findAllEmitterStartWithByMemberId - 해당 회원과 관련된 모든 Emitter를 찾는다.
    • 브라우저당 여러 개 연결이 가능하기에 여러 Emitter가 존재할 수 있다.
  • findAllEventCacheStartWithByMemberId - 해당 회원과 관련된 모든 이벤트를 찾는다.
  • deleteById - Emitter를 지운다.
  • deleteAllEmitterStartWithId - 해당 회원과 관련된 모든 Emitter를 지운다.
  • deleteAllEventCacheStartWithId - 해당 회원과 관련된 모든 이벤트를 지운다.

이를 구현한 구현체는 다음과 같다.

동시성을 고려하여 ConcurrentHashMap을 이용해 구현해주고 이를 저장하고 꺼내는 식의 방식을 진행한다.

Emitter와 이벤트를 찾는 부분에 있어 startsWith을 사용하는 이유는 현재 저장하는데 있어 뒤에 구분자로 회원의 ID를 사용하기 때문에 해당 회원과 관련된 Emitter와 이벤트들을 찾아오는 것이다.

@Repository
@NoArgsConstructor
public class EmitterRepositoryImpl implements EmitterRepository {
    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

    @Override
    public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
        return sseEmitter;
    }

    @Override
    public void saveEventCache(String eventCacheId, Object event) {
        eventCache.put(eventCacheId, event);
    }

    @Override
    public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(memberId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
        return eventCache.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(memberId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public void deleteById(String id) {
        emitters.remove(id);
    }

    @Override
    public void deleteAllEmitterStartWithId(String memberId) {
        emitters.forEach(
                (key, emitter) -> {
                    if (key.startsWith(memberId)) {
                        emitters.remove(key);
                    }
                }
        );
    }

    @Override
    public void deleteAllEventCacheStartWithId(String memberId) {
        eventCache.forEach(
                (key, emitter) -> {
                    if (key.startsWith(memberId)) {
                        eventCache.remove(key);
                    }
                }
        );
    }
}

실제 EmitterRepository가 잘 작동하는지에 대해서 테스트해보자.

각각에 대한 부분의 테스트 케이스를 작성하여 이를 실행해보았다.

class EmitterRepositoryImplTest {

    private EmitterRepository emitterRepository = new EmitterRepositoryImpl();
    private Long DEFAULT_TIMEOUT = 60L * 1000L * 60L;

    @Test
    @DisplayName("새로운 Emitter를 추가한다.")
    public void save() throws Exception {
        //given
        Long memberId = 1L;
        String emitterId =  memberId + "_" + System.currentTimeMillis();
        SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);

        //when, then
        Assertions.assertDoesNotThrow(() -> emitterRepository.save(emitterId, sseEmitter));
    }

    @Test
    @DisplayName("수신한 이벤트를 캐시에 저장한다.")
    public void saveEventCache() throws Exception {
        //given
        Long memberId = 1L;
        String eventCacheId =  memberId + "_" + System.currentTimeMillis();
        Notification notification = new Notification(new Member(1L), NotificationType.APPLY, "스터디 신청이 왔습니다.", "url", false);

        //when, then
        Assertions.assertDoesNotThrow(() -> emitterRepository.saveEventCache(eventCacheId, notification));
    }

    @Test
    @DisplayName("어떤 회원이 접속한 모든 Emitter를 찾는다")
    public void findAllEmitterStartWithByMemberId() throws Exception {
        //given
        Long memberId = 1L;
        String emitterId1 = memberId + "_" + System.currentTimeMillis();
        emitterRepository.save(emitterId1, new SseEmitter(DEFAULT_TIMEOUT));

        Thread.sleep(100);
        String emitterId2 = memberId + "_" + System.currentTimeMillis();
        emitterRepository.save(emitterId2, new SseEmitter(DEFAULT_TIMEOUT));

        Thread.sleep(100);
        String emitterId3 = memberId + "_" + System.currentTimeMillis();
        emitterRepository.save(emitterId3, new SseEmitter(DEFAULT_TIMEOUT));


        //when
        Map<String, SseEmitter> ActualResult = emitterRepository.findAllEmitterStartWithByMemberId(String.valueOf(memberId));

        //then
        Assertions.assertEquals(3, ActualResult.size());
    }

    @Test
    @DisplayName("어떤 회원에게 수신된 이벤트를 캐시에서 모두 찾는다.")
    public void findAllEventCacheStartWithByMemberId() throws Exception {
        //given
        Long memberId = 1L;
        String eventCacheId1 =  memberId + "_" + System.currentTimeMillis();
        Notification notification1 = new Notification(new Member(1L), NotificationType.APPLY, "스터디 신청이 왔습니다.", "url", false);
        emitterRepository.saveEventCache(eventCacheId1, notification1);

        Thread.sleep(100);
        String eventCacheId2 =  memberId + "_" + System.currentTimeMillis();
        Notification notification2 = new Notification(new Member(1L), NotificationType.ACCEPT, "스터디 신청이 승인되었습니다.", "url", false);
        emitterRepository.saveEventCache(eventCacheId2, notification2);

        Thread.sleep(100);
        String eventCacheId3 =  memberId + "_" + System.currentTimeMillis();
        Notification notification3 = new Notification(new Member(1L), NotificationType.REJECT, "스터디 신청이 거절되었습니다.", "url", false);
        emitterRepository.saveEventCache(eventCacheId3, notification3);

        //when
        Map<String, Object> ActualResult = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));

        //then
        Assertions.assertEquals(3, ActualResult.size());
    }

    @Test
    @DisplayName("ID를 통해 Emitter를 Repository에서 제거한다.")
    public void deleteById() throws Exception {
        //given
        Long memberId = 1L;
        String emitterId =  memberId + "_" + System.currentTimeMillis();
        SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);

        //when
        emitterRepository.save(emitterId, sseEmitter);
        emitterRepository.deleteById(emitterId);

        //then
        Assertions.assertEquals(0, emitterRepository.findAllEmitterStartWithByMemberId(emitterId).size());
    }

    @Test
    @DisplayName("저장된 모든 Emitter를 제거한다.")
    public void deleteAllEmitterStartWithId() throws Exception {
        //given
        Long memberId = 1L;
        String emitterId1 = memberId + "_" + System.currentTimeMillis();
        emitterRepository.save(emitterId1, new SseEmitter(DEFAULT_TIMEOUT));

        Thread.sleep(100);
        String emitterId2 = memberId + "_" + System.currentTimeMillis();
        emitterRepository.save(emitterId2, new SseEmitter(DEFAULT_TIMEOUT));

        //when
        emitterRepository.deleteAllEmitterStartWithId(String.valueOf(memberId));

        //then
        Assertions.assertEquals(0, emitterRepository.findAllEmitterStartWithByMemberId(String.valueOf(memberId)).size());
    }

    @Test
    @DisplayName("수신한 이벤트를 캐시에 저장한다.")
    public void deleteAllEventCacheStartWithId() throws Exception {
        //given
        Long memberId = 1L;
        String eventCacheId1 =  memberId + "_" + System.currentTimeMillis();
        Notification notification1 = new Notification(new Member(1L), NotificationType.APPLY, "스터디 신청이 왔습니다.", "url", false);
        emitterRepository.saveEventCache(eventCacheId1, notification1);

        Thread.sleep(100);
        String eventCacheId2 =  memberId + "_" + System.currentTimeMillis();
        Notification notification2 = new Notification(new Member(1L), NotificationType.ACCEPT, "스터디 신청이 승인되었습니다.", "url", false);
        emitterRepository.saveEventCache(eventCacheId2, notification2);

        //when
        emitterRepository.deleteAllEventCacheStartWithId(String.valueOf(memberId));

        //then
        Assertions.assertEquals(0, emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId)).size());
    }
}

잘 작동하는 모습을 확인할 수 있었다.

이제 서비스를 구현하러 가보자.

서비스 구현

어떻게 우리가 구독을 진행하는 것인지에 대해서 살펴보고 어떻게 구독자들에게 이벤트를 전달시켜줄 수 있는지 살펴보자.

먼저, 구독의 과정을 살펴보자.

구독을 하는 것은 Spring에서 제공하는 SseEmitter를 생성해서 이를 저장시켜둔 다음 필요할 때마다 해당 구독자가 생성한 SseEmitter를 불러와 이벤트에 대한 응답을 전송해주면 된다.

그렇기 위해서는 이런 의문점이 생긴다. SseEmitter 자체를 구분해줄 수 있어야하지 않을까?

누가 사용하고 있는지에 대한 SseEmitter인지를 구분해줄 필요성이 존재한다.

그렇기 때문에 다음과 같이 EmitterId를 생성해주자.

String emitterId = makeTimeIncludeId(memberId);

private String makeTimeIncludeId(Long memberId) {
        return memberId + "_" + System.currentTimeMillis();
}

왜 구분자 뒤에 현재 시각을 붙여놓을까?

이전 SSE에 대해 설명할 때, HTTP/1.1에서는 브라우저당 6개, HTTP/2에서는 브라우저당 100개라고 언급한 바 있다.

만약, 브라우저에서 여러 개의 구독을 진행할 때 브라우저 탭마다의 SseEmitter 구분을 위해 시간을 붙여 구분하기 위해 위와 같이 진행했다.

어느 탭으로 접속하든간에 똑같은 동작을 할텐데 굳이 구분을 해야라는 의문을 가질 수 있으나 추후 먼저 접속하거나 나중에 접속한 Emitter에 대해 무슨 작업을 진행할 수 있는 여지가 존재해 이렇게 구분을 해놓는 것이 어떨까라는 생각에 이렇게 진행해보았다.

그리고 굳이 메소드로 따로 빼놓은 부분에 대해서 추후 이벤트에 대한 Id에 대해서도 시간으로 구분을 진행하므로 코드 중복을 피하고자 다음과 같이 메소드로 구성했다. (조금 뒤에 다룬다.)

이렇게 생성된 Id를 기반으로 Emitter를 새로 하나 생성하여 저장해주면 된다. (레포지토리에 대해서는 추후 살펴본다.)

SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(timeout));

그리고 시간이 만료된 경우에 대해 자동으로 레포지토리에서 삭제 처리해줄 수 있는 콜백을 등록해놓을 수 있다.

emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));

다음과 같이 등록을 진행했다면 저장은 완료된 것이다.

하지만 여기서 문제가 발생한다.

만약 등록을 진행한 뒤, SseEmitter의 유효 시간동안 어느 데이터도 전송되지 않는다면 503 에러를 발생시키므로 이에 대한 방안으로 맨 처음 연결을 진행한다면 Dummy 데이터를 보내 이를 방지해주면 된다.

String eventId = makeTimeIncludeId(memberId);
sendNotification(emitter, eventId, emitterId, "EventStream Created. [userId=" + memberId + "]");

private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
    try {
        emitter.send(SseEmitter.event()
                     .id(eventId)
                     .data(data));
    } catch (IOException exception) {
        emitterRepository.deleteById(emitterId);
    }
}

여기서 이벤트 ID에 대해 시간으로 또 구분을 해줬다.

그 이유는 바로 Last-Event-ID 때문이다.

우리는 Last-Event-ID로 마지막 전송받은 이벤트 ID가 무엇인지 알고 받지 못한 데이터 정보들에 대해 인지할 수 있어야 한다.

하지만 해당 회원 ID로 요청하는 수 많은 이벤트들이 존재할 것이고 만약 그 중에서 가장 마지막에 전송받은 데이터를 구분하라고 하면 어떻게 구분할 수 있을까?

단순 회원으로만 저장되어 있다면 구분이 불가능할 것이다. 다음과 같이 말이다.

Last-Event-Id = 1548

{1548, Notification1}
{1548, Notification2}
{1548, Notification3}

위와 같이 왔을 때, 시간 순서상으로 1, 2, 3이 저장되어있다고는 하나 이를 확실하게 구분할 수 있다고 할 수 있을까?

우리는 Map이라는 구현체에 입력한 순서대로 저장되는 LinkedHashMap이나 다른 방안을 고려해볼 수 있겠으나 동시성 고려를 위해 ConcurrentHashMap을 사용했으므로 잘 동작하지 않을 것이다.

이를 위해 구분자로 시간을 넣어주면 어떨까?

Last-Event-Id = 1548_15198456

{1548_15198456, Notification1}
{1548_16198456, Notification2}
{1548_17198456, Notification3}

다음과 같이 1548이라는 ID를 가진 회원의 이벤트 중 뒤의 시간을 기준으로 구분할 수 있게 된다.

이 때문에 뒤에 시간이라는 구분자를 추가해준 것이다.

이제 다시 다음을 살펴보자.

Dummy 데이터를 보냈으니 이제 실제로 이전에 온 이벤트들 중 받지 못한 데이터가 있는지 확인하고 있다면 이에 대해 다시 수신받을 수 있도록 하자.

// 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
if (hasLostData(lastEventId)) {
    sendLostData(lastEventId, memberId, emitterId, emitter);
}

private boolean hasLostData(String lastEventId) {
    return !lastEventId.isEmpty();
}

private void sendLostData(String lastEventId, Long memberId, String emitterId, SseEmitter emitter) {
    Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
    eventCaches.entrySet().stream()
        .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
        .forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
}
  • hasLostData - Last-Event-ID가 존재한다는 것은 받지 못한 데이터가 있다는 것이다. (프론트에서 알아서 보내준다.)
  • sendLostData - 받지 못한 데이터가 있다면 Last-Event-ID를 기준으로 그 뒤의 데이터를 추출해 알림을 보내주면 된다.

이렇게 진행한 다음 생성한 Emitter를 리턴해주면 된다.

return emitter;

전체 코드는 다음과 같다.

@Override
public SseEmitter subscribe(Long memberId, String lastEventId) {
    String emitterId = makeTimeIncludeId(memberId);
    SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(timeout));
    emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
    emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));

    // 503 에러를 방지하기 위한 더미 이벤트 전송
    String eventId = makeTimeIncludeId(memberId);
    sendNotification(emitter, eventId, emitterId, "EventStream Created. [userId=" + memberId + "]");

    // 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
    if (hasLostData(lastEventId)) {
        sendLostData(lastEventId, memberId, emitterId, emitter);
    }

    return emitter;
}

private String makeTimeIncludeId(Long memberId) {
    return memberId + "_" + System.currentTimeMillis();
}

private void sendNotification(SseEmitter emitter, String eventId, String emitterId, Object data) {
    try {
        emitter.send(SseEmitter.event()
                     .id(eventId)
                     .data(data));
    } catch (IOException exception) {
        emitterRepository.deleteById(emitterId);
    }
}

private boolean hasLostData(String lastEventId) {
    return !lastEventId.isEmpty();
}

private void sendLostData(String lastEventId, Long memberId, String emitterId, SseEmitter emitter) {
    Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(memberId));
    eventCaches.entrySet().stream()
        .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
        .forEach(entry -> sendNotification(emitter, entry.getKey(), emitterId, entry.getValue()));
}

그리고 실제로 다른 사용자가 알림을 보낼 수 있는 기능이 필요하다.

이를 구현해보자.

이는 위에서 했던 부분과 비슷하다.

알림을 일단 구성하고 해당 알림에 대한 이벤트를 발생시키면 되는데 이벤트를 발생시키기 앞서 어떤 회원에게 알림을 보낼지에 대해 찾고 알림을 받을 회원의 Emitter들을 모두 찾아 해당 Emitter로 Send 시켜주기만 하면 된다.

@Override
public void send(Member receiver, NotificationType notificationType, String content, String url) {
    Notification notification = notificationRepository.save(createNotification(receiver, notificationType, content, url));

    String receiverId = String.valueOf(receiver.getId());
    String eventId = receiverId + "_" + System.currentTimeMillis();
    Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(receiverId);
    emitters.forEach(
        (key, emitter) -> {
            emitterRepository.saveEventCache(key, notification);
            sendNotification(emitter, eventId, key, NotificationResponseDto.create(notification));
        }
    );
}

private Notification createNotification(Member receiver, NotificationType notificationType, String content, String url) {
    return Notification.builder()
        .receiver(receiver)
        .notificationType(notificationType)
        .content(content)
        .url(url)
        .isRead(false)
        .build();
}

이를 실제 테스트해보자.

각각의 메소드에 대한 테스트를 진행해보았다.

@SpringBootTest
@Transactional
@ActiveProfiles("test")
class NotificationServiceImplTest {
    @Autowired
    NotificationService notificationService;
    @Autowired
    TestDB testDB;

    @BeforeEach
    void beforeEach() {
        testDB.init();
    }

    @Test
    @DisplayName("알림 구독을 진행한다.")
    public void subscribe() throws Exception {
        //given
        Member member = testDB.findGeneralMember();
        String lastEventId = "";

        //when, then
        Assertions.assertDoesNotThrow(() -> notificationService.subscribe(member.getId(), lastEventId));
    }

    @Test
    @DisplayName("알림 메세지를 전송한다.")
    public void send() throws Exception {
        //given
        Member member = testDB.findGeneralMember();
        String lastEventId = "";
        notificationService.subscribe(member.getId(), lastEventId);

        //when, then
        Assertions.assertDoesNotThrow(() -> notificationService.send(member, NotificationType.APPLY, "스터디 신청에 지원하셨습니다.", "localhost:8080/study/1"));
    }
}

결과는 성공적이였다.

Controller 테스트

Controller 계층에 대해서는 이미 설명했기에 테스트를 진행해보자.

구독이 성공하는지를 보자.

@SpringBootTest
@AutoConfigureMockMvc
@ActiveProfiles("test")
@Transactional
class NotificationControllerTest {
    @Autowired
    WebApplicationContext context;
    @Autowired
    MockMvc mockMvc;
    @Autowired
    MemberRepository memberRepository;
    @Autowired
    JwtTokenHelper accessTokenHelper;
    @Autowired
    NotificationController notificationController;
    @Autowired
    TestDB testDB;

    @BeforeEach
    void beforeEach() {
        mockMvc = MockMvcBuilders.webAppContextSetup(context).apply(springSecurity()).build();
        testDB.init();
    }

    @Test
    @DisplayName("SSE에 연결을 진행한다.")
    public void subscribe() throws Exception {
        //given
        Member member = testDB.findGeneralMember();
        String accessToken = accessTokenHelper.createToken(member.getEmail());

        //when, then
        mockMvc.perform(get("/subscribe")
                        .header("X-AUTH-TOKEN", accessToken))
                .andExpect(status().isOk());
    }
}

구독이 성공적으로 완료된다.

전송해보기

이렇게 구현된 API를 기반으로 JS에서 EventSource를 만들어 요청을 진행하면 알림에 대한 구독을 진행할 수 있다.

그리고 실제 알림을 보내는 부분에 대해서 살펴보자.

이제 알림을 보내고 싶은 곳에서 알림을 실제로 보내면 된다.

간단하게 NotificationService를 주입받아 이를 적용시켜줄 수도 있겠지만 이렇게 된다면 서비스간의 의존성이 추가되고 결합도가 높아질 것이다.

이에 대한 사이드 이펙트가 발생할 가능성이 크므로 서비스에 대한 결합을 끊는 방법으로 이벤트를 활용해보려고 했다.

private final ApplicationEventPublisher eventPublisher;

EventPublisher를 주입받아 이 이벤트를 발행하면 EventListener는 리스닝하고 있다가 발행된 이벤트를 처리해주게 된다.

이를 실제로 구현해보자.

간단하게 스터디 신청을 승인하는 예를 살펴보자.

@Transactional
public void acceptJoin(Long studyId, Long memberId) {
    validateJoinCondition(studyId, memberId);
    StudyJoin studyJoin = findMemberJoinInfoAboutSpecificStudy(studyId, memberId);
    studyJoin.acceptMember();
    notifyJoinInfo(studyJoin, NotificationType.ACCEPT);
}

private void notifyJoinInfo(StudyJoin studyJoin, NotificationType accept) {
    studyJoin.publishEvent(eventPublisher, accept);
}

위와 같이 단순 승인을 진행하고 이벤트를 발행시킨다. (비지니스 로직 실행의 주체는 도메인이고 서비스 계층은 단순 트랜잭션, 도메인 간의 순서 보장이므로 StudyJoin에서 승인과 발행을 처리한다.)

이로써 발행된 이벤트는 리스닝하고 있는 리스너에게로 흘러들어간다.

@Component
@RequiredArgsConstructor
public class NotificationListener {

    private final NotificationService notificationService;

    @TransactionalEventListener
    @Async
    public void handleNotification(NotificationRequestDto requestDto) {
        notificationService.send(requestDto.getReceiver(), requestDto.getNotificationType(),
                requestDto.getContent(), requestDto.getUrl());
    }
}

리스너는 단순 해당 발행에 대한 이벤트를 리스닝하고 있다가 처리해준다. (알림 전송)

여기서 @EventListener는 동기적으로 처리를 진행한다.

위와 같이 모든 승인 요청이 완료된 후에 알림을 보내는 식이라면 승인이 끝나서야 알림을 보내기 시작한다는 뜻이다.

사실 이러한 부분에 있어 엄청 오래걸리는 작업이 존재한다면 그만큼 알림도 늦게 간다는 뜻이다.

그러면 순서를 반대로 바꿔 맨 앞으로 보내면 또 문제가 생긴다.

도중에 예외가 발생하는 작업이 있다면? 위의 서비스 레이어의 경우 @Transactional으로 인해 롤백처리로 간단하게 되돌아갈 수 있겠지만 이미 알림은 발송된 상태가 되버리게 된다.

그렇기 때문에 이를 해결해야한다.

이를 위해 @TransactionalEventListener를 활용해 트랜잭션의 흐름에 따라 이벤트를 제어하면 된다.

그리고 동기적으로 실행하는 부분에 대해서는 @Async 어노테이션을 통해 비동기적으로 처리해줄 수 있게 한다.

비동기에 대한 설정은 이전에 이메일 전송에서 사용한 구성과 똑같이 이용하였다.

@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(3);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(50);
        taskExecutor.setThreadNamePrefix("async-thread-");
        taskExecutor.initialize();
        return taskExecutor;
    }
}

이렇게 되면 모든 알림 전송에 대한 부분은 마무리가 된다.

다른 부분(스터디 거절, 스터디 신청, 댓글)도 마찬가지로 이에 대한 부분들을 구현해주면 된다.