Web/Kotlin & Spring

[ Kotlin & Spring ] - 리액티브 프로그래밍 (옵저버 패턴, 이터레이터 패턴, 리액티브 스트림)

Hyunseo😊 2023. 7. 12. 20:40

1. 비동기 프로그래밍?

비동기 프로그래밍은 그냥 실행 중인 작업이 끝나는 것을 기다리지 않고 바로 반환하는 것을 말합니다. 만약 작업이 끝나면 콜백등으로 다시 쓰레드가 해당 작업을 처리하는 것처럼 말이죠.

 

사실 저는 프론트앤드 기발을 많이 해봐서 비동기(Asynchronous) 프로그래밍에 매우 익숙합니다. 왜냐하면 FE에서는 UI 애플리케이션의 경우 특정 이벤트가 발생할 경우에 반응하는 동작을 구현해야 하는데 이럴 떄 필수적으로 비동기 프로그래밍을 사용하게 되기 때문입니다. 

 

const button = document.querySelector('button');

button.addEventListener('click', event => {
    button.innerHTML = '~'
});

이런식으로 말이죠. 대부분의 프로그래밍 언어들은 각 언어의 철학에 맞는 다양한 비동기 처리 방법들을 지원합니다. 대표적으로 Callback, Promise, Future, Async-await, Coroutine 등이 있고 각각의 방법들은 장점과 단점이 존재합니다.

 

1-1. Thread

 

즉 이러한 비동기 프로그래밍을 구현하는 가장 기본적인 방법은 Thread를 사용하는 방법입니다. 쓰레드는 프로세스마다 무조건 하나이상씩은 가지고 있고, 이는 프로세스를 생성하고 컨텍스트 스위칭 하는데에 비용이 더 적게 들죠

 

하지만 쓰레드가 무한정 많아지면 메모리 사용량이 높아져 OOME(OutOfMemoryError)가 발생할 수 있고 높은 동시 처리량을 요구하는 시스템에서는 스레드를 생성하면서 발생하면서 발생하는 대기 시간 때문에 응답 지연이 발생할 수 있습니다. 그래서 실무에서는 이러한 문제들을 해결하기 위해 스레드 풀(Thread Pool)을 사용합니다. 스레드 풀을 사용하면 애플리케이션 내에서 사용할 총 스레드 수를 제한할 수 있고 기존에 생성된 스레드를 재사용하므로 빠른 응답이 가능한 것이죠. JPA의 구현체인 Hibernate도 사실 JDBC를 내부적으로 사용하는데, 이 또한 그 안에 HikariCP라는 쓰레드 풀 관리 라이브러리를 사용합니다.

 

이는 직접 만드는 것보단 java.util.concurrent 패키지의 ExecutorService를 사용하면 쉽고 안전하게 스레드 풀을 사용할 수 있게 됩니다. 

 

1-2. Future

퓨처(Future)는 제가 Swift를 할 때 정말 많이 다루었었는데요, 이 또한 비동기 작업에 대한 결과를 얻고 싶은 경우에 사용합니다. 이의 중요한 특징으로는 스레드는 Runnable를 사용해 비동기 처리를 하지만 퓨처를 사용해 처리 결과를 얻기 위해선 Callable을 사용해야 합니다.

 

package com.hyunseo.springwebflux

import java.util.concurrent.Callable
import java.util.concurrent.Executors

/**
 * @author ihyeonseo
 */

fun sum(a: Int, b: Int) = a + b

fun main() {
    // Executor Service
    val pool = Executors.newSingleThreadExecutor()

    val future = pool.submit(Callable {
        sum(100, 200)
    })

    println("계산 시작")

    val futureResult = future.get()
    println(futureResult)
    println("계산 종료")
}

이렇게 퓨처를 사용하면 수행 시간이 오래 걸리는 작업이나 작업에 대한 결과를 기다리면서 다른 작업을 병행하고 수행하고 싶은 경우에 매우 유용합니다. 하지만 퓨터를 사용하면 몇가지 단점이 존재하는데, get 함수는 비동기 작업의 처리가 완료될 때까지 다음 코드로 넘어가지 않고 무한정 대기하거나 지정해준 타임아웃 되기까지 블로킹됩니다. 

 

1-3. Completable Future

JDK8 부터 퓨처의 단점을 극복하기 위해 컴플리터블 퓨처(Completable Future)를 제공합니다. 이는 팩토리 함수인 supplyAsnc를 사용해 비동기 작업을 수행할 수 있습니다.

 

package com.hyunseo.springwebflux

import java.util.concurrent.CompletableFuture

/**
 * @author ihyeonseo
 */

fun sum(a: Int, b: Int) = a + b

fun main() {
    val completableFuture = CompletableFuture.supplyAsync {
            Thread.sleep(2000)
            sum(100, 200)
    }

    println("계산 시작")

    completableFuture.thenApplyAsync(::println) // 논블로킹으로 동작한다.

    while(!completableFuture.isDone) {
        Thread.sleep(500)
        println("계산 결과를 집계 중입니다.")
    }

    println("계산 종료")
}

 

thenApplyAsnc 함수를 사용해 논블로킹으로 동작하고 뒤에 Async가 붙은 함수들은 supplyAsync와 별도의 스레드 풀을 지정할 수 있습니다. 여기서 CompletableFuture를 쓰더라도 get 함수를 그대로 사용하면 블로킹 코드가 된다는 점에 유의해야 합니다.

 

이 CompletableFuture는 대다수의 비동기 처리 시나리오에서 유용하게 사용될 수 있습니다. 예를 들면 우리가 개발한 서버에서 외부의 여러 API 서버를 호출하여 응답을 받아서 결과를 결합하고 처리해야 하는 시나리오라면 이는 매우 유용하겠죠??

 

2. 옵저버 패턴

옵저버 패턴(Observer Pattern)이란 GoF가 소개한 디자인 패턴 중에 하나로 관찰 대상이 되는 객체가 변경되면 대상 객체를 관찰하고 있는 옵저버(Observer)에게 변경사항을 통지(notify)하는 디자인 패턴을 말합니다. 

 

 

이 또한 사실 React-Redux에서 Flux를 공부할 때 정말 많이 활용한 디자인 패턴입니다. 옵저버 패턴은 관찰 대상인 Subject와 Subject를 관찰하는 Observer로 이루어져 있습니다. 하나의 Subject에는 1개 또는 여러개의 옵저버를 등록할 수 있습니다. 그리고 Subject의 상태가 변경되면 자신을 관찰하는 Observer들에게 변경사항을 통지합니다. 마지막으로 Subject로 변경사항을 받은 Observer는 부가적인 처리를 진행합니다.

 

 

그리고 각각에 구체화 클래스가 존재하는데, 각각에 맞는 상세 구현을 작성해야 합니다.

 

package com.hyunseo.springwebflux

import io.micrometer.observation.Observation
import java.util.Observable
import java.util.Observer

class Coffee(val name: String)

// Subject
class Barista : Observable() {
    private lateinit var coffeeName: String

    fun orderCoffee(name: String) {
        this.coffeeName = name
    }

    fun makeCoffee() {
        setChanged()
        notifyObservers(Coffee(this.coffeeName))
    }
}

// Observer
class Customer(val name: String): Observer {
    override fun update(
        o: Observable?,
        arg: Any?
    ) {
        val coffee = arg as Coffee
        println("${name}이 ${coffee.name}을 받았습니다.")
    }
}

fun main() {
    val barista = Barista()
    barista.orderCoffee("아이스 아메리카노")

    val customer1 = Customer("고객1")
    val customer2 = Customer("고객2")

    barista.addObserver(customer1)
    barista.addObserver(customer2)

    barista.makeCoffee()
}

 

간단히 구현한 예시입니다. Customer 클래스는 Observer 인터페이스를 구현하여 Barista 클래스가 커피를 완성하면 통지를 받아서 update 함수에서 처리합니다. Barista 클래스는 Observable 클래스를 상속하여 고객이 주문한 커피가 만들어지면 notifyObservers로 고객에게 만들어진 Coffee 객체를 전달합니다. 이 때 setChanged를 먼저 호출하여 변경 여부를 내부에 저장해야 합니다.

 

이러한 옵저버 패턴의 장점은 아래와 같습니다.

  • 고객이 Polling 방식으로 커피가 완성됐는지 바리스타에게 확인하는 처리가 없어도 됩니다.
    • 이는 불필요한 호출로 성능상 문제가 될 수 있습니다.
    • 그리고 Polling의 간격은 실시간성에 영향을 줄 수도 있습니다.
  • 이는 일종에 데이터를 제공하는 측에서 데이터를 소비하는 측에 통지하는 푸시(Push-Based) 방식입니다.

 

옵저버 패턴에서는 서브젝트와 옵저버는 관심사에 따라 역할과 책임이 분리되어 있습니다. (SRP)

서브젝트는 옵저버가 어떤 작업을 하는지 옵저버의 상태가 어떠한지에 대한 관심을 가질 필요가 없고 오직 변경사항을 통지하는 역할만 수행하고 하나 혹은 다수의 옵저버는 각각 맡은 작업을 스스로 하기 때문에 옵저버가 하는 일이 서브젝트에 영향을 끼치지 않고 옵저버는 단순한 데이터의 소비자로서 존재할 수 있게 됩니다.

 

3. 이터레이터 패턴

이는 말 그대로 데이터의 집합에서 데이터를 순차적으로 꺼내기 위해 만들어진 디자인 패턴입니다. 이를 사용하면 컬렉션이 변경하더라도 동일한 인터페리스를 사용해 데이터를 꺼내올 수 있기 때문에 변경사항 없이 사용할 수 있습니다. 데이터의 집합이 얼만큼의 크기를 가진지 알 수 없는 경우 이터레이터 패턴을 사용하면 순차적으로 데이터를 꺼내올 수 있겠죠??

 

 

여기서 Aggregate는 요소들의 집합체를 말합니다. 이터레이터는 집합체 내부에 구현된 iterator를 사용해 생성합니다. 이터레이터를 사용하는 클라이언트는 생성된 이터레이터의 hasNext 함수를 사용해 데이터가 존재하는지 검사하고 next 함수를 사용해 데이터를 꺼냅니다.

 

package com.hyunseo.springwebflux


data class Car2(val brand: String)

class CarIterable(private val cars: List<Car2> = listOf()): Iterable<Car2> {
    override fun iterator() = CarIterator(cars)
}

class CarIterator(
    private val cars: List<Car2> = listOf(),
    private var index: Int = 0): Iterator<Car2> {

    override fun hasNext() = cars.size > index
    override fun next() = cars[index++]
}

fun main() {
    val carIterable = CarIterable(
        listOf(
            Car2("람보르기니"),
            Car2("페라리")
        )
    )

    val iterator = carIterable.iterator()

    while(iterator.hasNext()) {
        println("브랜드: ${iterator.next()}")
    }
}

 

여기서 CarIterable 클래스는 Iterable 인터페이스를 구현하여 CarIterator를 생성하는 iterator 함수를 오버라이드 했습니다. 그리고 while 내부에서는 hasNext를 사용하여 데이터를 모두 가져올때까지 반복하고 데이터를 출력합니다.

 

이와 옵저버 패턴과의 관계를 생각해보면, 데이터를 제공한다는 관점에서 이터레이터 패턴과 옵저버 패턴은 유사합니다. 하지만 이터레이터 패턴은 Aggregate이 내부에 데이터를 저장하고 이터레이터를 사용해 데이터를 순차적으로 당겨오는 방식이기 때문에 풀 기반(Pull-based)라고 할 수 있습니다. 

 

4. 리액티브 프로그래밍이란?

리액티브 프로그래밍이란 별거 없습니다. Rx라고도 하는데, 데이터 또는 이벤트의 변경이 발생하면 이에 반응해 처리하는 프로그래밍 기법을 말합니다. 이는 비동기 프로그래밍을 처리하는 새로운 접근 방식이라고 할 수 있죠. 2010년도에 에릭 마이어에 의해 마이크로소프트 .NET 에코 시스템으로 정의되었습니다. 이는 데이터의 통지, 완료, 에러에 대한 처리를 옵저버 패턴에 영감을 받아 설계되었고 데이터의 손쉬운 비동기 처리를 위해 함수형 언어의 접근 방식을 사용하였습니다.

 

fetch("/api/users/me") { user->
      fetch("/api/users/${user.id}/followers") { followers ->
          fetch("/api/users/${user.id}/likes") { likes ->
              fetch("/api/users/${user.id}/contacts") { contacts ->
                  // 콜백 헬
                  }
              }
          }
      }
 }

 

기존의 리액트리브 프로그래밍이 나오기 전 비동기 프로그래밍은 대부분 위와같이 콜백 기반의 비동기 처리 방식을 사용했습니다. 간단한 콜백은 이해하기 쉬울 수 있지만 콜백이 많아져서 발생하는 콜백 헬(Callback Hell)로 인해 코드의 복잡도가 늘어날 수 있습니다.

 

fetchReactive("/api/users/me")
    .zip { user -> fetchReactive("/api/users/${user.id}/followers") }
    .zip { user -> fetchReactive("/api/users/${user.id}/likes") }
    .zip { user -> fetchReactive("/api/users/${user.id}/contacts") }
    .flatMap { followers, likes, contacts ->
       // 로직 구현
    }

 

하지만 리액티브 프로그래밍을 사용하면 콜백 헬 없이 (함수형 프로그래밍의 관점으로) 비동기 코드를 쉽게 작성할 수 있기 때문에 서버나 UI 애플리케이션 개발시 리액티브 프로그래밍이 유용하게 사용되고 있습니다.

 

4-1. 리액티브 스트림

리액티브 스트림(=Reactive Stream)은 리액티브 프로그래밍의 표준 API 사양을 말합니다.  이는 비동기 데이터 스트림과 논-블로킹 백프레셔(Back Pressure)에 대한 사양을 제공합니다. 기존의 방식은 처리할 데이터가 무한정 많아져서 시스템의 한계를 넘어서는 경우 애플리케이션은 병목 현상(bottleneck)이 발생하거나 심각한 경우 애플리케이션이 정지되는 경우도 발생할 수 있습니다. 

 

리액티브 스트림은 TCK(Technology Compatibility Kit)을 지원하기 때문에 라이브러리가 정해진 사양에 맞게 구현되었는지 보장할 수 있게 됩니다. 자바 진영ㅇ서는 Java SE표준을 따른 JDK(Java Development Kit인지 검증하기 위해 TCK를 사용합니다.

 

이에 리액티브 스트림을 표준 사양으로 채택한 대표적인 구현체로는, Project Reactor, RxJava, RDK9 Flow, Akka Streams, Vert.x등이 있습니다. 이제 이에 대한 사양을 구체적으로 보겠습니다.

 

4-2. 리액티브 스트림 사양

리액티브 스트림 사양(specification)은 핵심 인터페이스와 프로토콜로 구성됩니다.

 

발행자(Publisher)는 데이터를 생성하고 구독자(Subscriber)에게 데이터를 통지하고 구독자는 자신이 처리할 수 있는 만큼의 데이터를 요청하고 처리합니다. 이때 발행자가 제공할 수 있는 데이터의 양은 무한(unbounded)하고 순차적(sequential)처리를 보장합니다. 그리고 서브스크립션(Subscription)은 발행자와 구독자를 연결하는 매개체이며 구독자가 데이터를 요청하거나 구독을 해지하는 등 데이터 조절에 관련된 역할을 담당합니다. 마지막으로 프로세서(Processor)는 발행자와 구독자의 기능을 모두 포함하는 인터페이스이며 데이터를 가공하는 중간 단계에서 사용합니다.

 

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

그리고 구독자는 4개의 추상 메소드 프로토콜을 가지고 있습니다. 이처럼 리액티브 스트림은 발행자 구독자간의 데이터 전달에 사용하는 규칙(Protocol)로 정의하고 있습니다. 이제 가시적으로 리액티브 스트림 데이터 처리 프로토콜의 흐름을 보겠습니다.

 

각 메서드의 호출을 시그널(Signal)이라고 부르고 시그널은 호출되는 순서가 다릅니다. onSubscribe는 최초 구독에 대한 초기화를 담당하므로 구독 시 최초 한 번만 호출되기 때문에 onSubscribe 내부에서 초기화 로직을 구현할 수 있습니다. onNext는 발행자로부터 통지받을 데이터가 있는 경우 구독자가 요청하는 만큼 계속 호출됩니다. 만약 발행자 측에 처리 중 에러가 발생하면 onError를 구독자에게 통지하고 onError 시그널이 발생하면 더 이상 데이터를 통지받지 않습니다. onCpmplete는 모든 데이터를 통지한 시점에 마지막에 호출되어 데이터 통지가 성공적으로 완료되었음을 통지합니다. onError와 onComplete는 반드시 둘중 하나만 호출되어야 하며 이후에는 어떠한 시그널도 발생해서는 안됩니다.