Web/Kotlin & Spring

[ Kotlin & Spring ] - 스프링 WebFlux

Hyunseo😊 2023. 7. 13. 17:20

1. 프로젝트 리액터

프로젝트 리액터(Project Reactor)는 리액티브 스트림의 구현체 중 하나로서 스프링의 에코시스템 범주에 포함된 프레임워크 입니다. 리액티브 스트림 사양을 구현하고 있으므로 리액티브 스트림에서 사용하는 용어와 규칙을 그대로 사용하면 됩니다. 이를 사용해서 애플리케이션에 리액티브 프로그래밍을 적용할 수도, 비동기-논블로킹을 적용할 수 있습니다. 또한, 함수형 프로그래밍의 접근방식을 통해 비동기-논블로킹 코드의 난해함을 어느정도 해결합니다. 뿐만 아니라 백프레셔(Backpressure)를 사용해 시스템의 부하를 효율적으로 조절할 수 있습니다.

 

1-1. 모노와 플럭스

리액터는 리액티브 스트림의 Publisher 인터페이스를 구현하는 모노(Mono)와 플럭스(Flux)라는 두 가지 핵심 타입을 제공합니다. 모노와 플럭스 각각 0..1 개의 단일 요소 스트림을 통지하는 발행자, 0..N 개로 이뤄진 다수 요소 스트림을 통지하는 발행자입니다. 두 타입 모두 리액티브 스트림 데이터 처리 프로토콜대로 onComplete 또는 onError 시그널이 발생할 때 까지 onNext를 사용해 구독자에게 데이터를 통지합니다.

 

package com.hyunseo.springwebflux

import reactor.core.publisher.Mono


fun main() {

    val mono: Mono<String> = Mono.just("Hello Reactive World!")
    mono.subscribe(::println)

}

이는 Mono.just를 사용해 결과를 찍어본 것입니다. Mono.just(data: T)는 객체를 인자로 받은 뒤 모노로 래핑하는 팩토리 함수입니다. 만약 subscribe()를 호출하지 않는 경우에는 그냥 MonoJust라는 타입이 찍히게 됩니다.

 

그리고 모노와 플럭스의 연산자는 모두 Lazy(게으르게) 동작하여 subscribe를 호출하지 않으면 리액티브 스트림 사양대로 코드가 동작하지 않습니다. 즉 subscribe는 Terminal Operator(최종 연산자)입니다. 참고로 여기서 소비자는 ::println이 되며 아래의 코드가 생략된 것이라고 보면 되겠습니다.

 

mono.subscribe(
    ::println, // onNext
    Throwable::printStackTrace, // onError
    { println("Completed!") } // onComplete
)

 

이제 Flux를 사용한 예시도 간단히 작성해보겠습니다.

 

package com.hyunseo.springwebflux

import reactor.core.publisher.Flux

data class Cellphone(
    val name: String,
    val price: Int,
    val currency: Currency,
)

enum class Currency {
    KRW, USD
}

fun main() {
    val iphone =
        Cellphone(name = "Iphone", price = 100, currency = Currency.KRW)
    val galaxy =
        Cellphone(name = "Galaxy", price = 90, currency = Currency.KRW)

    val flux: Flux<Cellphone> =
        Flux.just(iphone, galaxy)

    flux.subscribe(::println)


}

 

Flux는 Mono와 다르게 다수의 요소를 통지할 수 있게 됩니다. 

 

2. 스프링 WebFlux와 스프링 MVC의 비교

스프링 MVC는 동시성 처리를 전통적 웹 방식중 하나인 하나의 스레드가 하나의 요청을 처리하는 Thread per Request Model을 사용합니다. 이는 DB, Network IO등이 발생할 경우 결과를 받기까지 스레드가 블로킹됩니다. 이러한 문제를 해결하기 위해 스레드 풀을 사용해 동시성을 제어하는 것이 스프링 MVC입니다.

 

스프링 WebFlux는 리액티브 기반의 웹 스택 프레임워크입니다. 기본적으로 프로젝트 리액터 기반이며 리액티브 스트림의 다른 구현체인 RxJava나 코틀린의 코루틴으로도 개발이 가능합니다. 스프링 WebFlux는 비동기-논 블로킹으로 동작하므로 적은 수의 스레드로도 대량의 동시성을 제어할 수 있게 됩니다.

스프링 MVC와 스프링 WebFlux간의 차이점과 공통점이 존재합니다. 우선 MVC는 대부분의 스프링 웹 애플리케이션이 MVC기반이므로 풍부한 라이브러리를 지원합니다. 또한 JPA, JDBC와 같은 블로킹 API를 사용하는 경우에는 스프링 MVC를 사용하는 것이 낫습니다. 스프링 WebFlux는 함수형 엔드포인트와 애노테이션 컨트롤러 방식을 모두 지원하며, 이벤트 루프 동시성 모델이고, 전 구간 비동기-논블로킹인 경우에 최적의 성능을 보여주지만 대체적으로 러닝커브가 높은 편입니다.

 

만약 WebFlux에서 어쩔 수 없이 블로킹 API를 쓰는 경우 별도의 스케줄러로 동작시키는게 좋습니다.

// 어쩔 수 없이 블로킹 API를 쓰는 경우 별도의 스케줄러로 동작시키자!
val blockingWrapper = Mono.fromCallable {
  // JPA의 블로킹 코드
  jpaRepository.findById(id)
}.subscribeOn(Schedulers.boundedElastic())

 

뿐만 아니라 스프링 MVC에서도 리액터와 WebFlux 의존성을 추가하여 리액티브 코드와 논블로킹 라이브러리를 사용할 수 있습니다.

 

Router

package com.hyunseo.springwebflux

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.server.RouterFunction
import org.springframework.web.reactive.function.server.RouterFunctions.route
import org.springframework.web.reactive.function.server.ServerResponse
import org.springframework.web.reactive.function.server.router

/**
 * @author ihyeonseo
 */

@Configuration
class Router {

    @Bean
    fun helloRouter(handler: HelloHandler) : RouterFunction<ServerResponse> =
        route()
            .GET("/", handler::sayHello)
            .POST("/", handler::sayHello)
            .DELETE("/", handler::sayHello)
            .build()

    @Bean
    fun userRouter(handler: UserHandler): RouterFunction<ServerResponse> =
        router {
            "/users".nest {
                GET("/{id}", handler::getUser)
                GET("", handler::getAll)
            }
        }
}

이는 WebFlux의 라우터의 일부분입니다. helloRouter, userRouter를 Configuration에 Bean으로 등록해주어야 하며, 이는 나중에 살펴볼 함수형 앤드포인트를 적용해준 예시이기도 합니다. 

 

HelloHandler

package com.hyunseo.springwebflux

import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Mono

@Component
class HelloHandler {
    fun sayHello(req: ServerRequest): Mono<ServerResponse> {
        return ServerResponse.ok().bodyValue("Hello WebFlux")
    }
}

 

여기서 ServerResponse를 Mono로 만들어서 반환해주었습니다. 그리고 스프링 웹플럭스가 비동기-논블로킹 처리를 위해 리액티브 타입인 'Mono'와 'Flux'를 내부적으로 사용하기 때문에 별도로 핸들러에서 Mono로 변환해주지 않아도 자동으로 래핑해서 반환합니다. 이렇게 하면 "Hello WebFLux" 메시지를 본문으로 가지는 HTTP 200 OK 응답을 나타내는 `ServerResponse`객체를 생성하고, 이 객체를 `Mono`에 래핑하여 반환하는 것입니다. 이렇게 하면 메서드의 호출자는 이 `Mono`를 구독하고, 응답이 준비되면 이를 처리할 수 있게 합니다. 이는 비동기적으로 HTTP응답을 생성하고 처리할 수 있게 해줍니다.

 

$ curl localhost:8080
Hello WebFlux

 

3. 함수형 엔드포인트

스프링 WebFlux는 클라이언트의 요청을 라우팅하고 처리할 수 있는 람다 기반 프로그래밍 모델인 함수형 엔드포인트(Functional Endpoints)를 제공합니다. 이는 요청을 분서갷 라우팅하는 라우팅 함수(RouterFunction)과 요청 객체를 전달 받아 응답을 제공하는 핸들러 함수(HandlerFunction)으로 이루어져 있습니다.

 

3-1. 라우터 함수

이는 클라이언트로부터 전달받은 요청을 해석하고 그에 맞는 핸들러로 전달하는 역할을 합니다. 저도 이걸 Express와 Koa로 백엔드 개발할 때 정말 많이 활용했던 개념입니다. 라우터 함수는 위에서도 말했지만 @Configuration에 RouterFunction을 반환하는 빈(Bean)으로 등록할 수 있으며 빈의 이름을 다르게 하여 여러개의 라우터 함수를 등록할 수도 있습니다. ( 위의 예시 대체 ) 

 

그리고 위에서는 중첩라우터(Nested Router)를 사용할 수도 있습니다. 이를 사용해서 URI를 그룹화하면 코드의 중복을 제거하고 가독성 좋은 라우터 함수를 작성할 수 있게 됩니다.

 

3-1. 핸들러 함수

핸들러 함수는 라우터 함수로부터 전달받은 요청 객체인 ServerRequest를 이용해 로직을 처리한 후 응답 객체인 ServerResponse를 생성한 뒤 반환합니다. 이들은 모두 불면 객체로 설계되었으며, 기본적으로 리액터의 Publisher인 Mono 또는 Flux로 응답 본문을 작성합니다.

 

UserHandler

package com.hyunseo.springwebflux

import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import reactor.core.publisher.Mono

data class User(val id: Long, val email: String)

@Component
class UserHandler {
    val users = listOf(
        User(id = 1, email = "user1@gmail.com"),
        User(id = 2, email = "user2@gmail.com"),
    )

    fun getUser(req: ServerRequest): Mono<ServerResponse> =
        users.find { req.pathVariable("id").toLong() == it.id}
            ?.let {
                ServerResponse.ok().bodyValue(it)
            } ?: ServerResponse.notFound().build()

    fun getAll(req: ServerRequest): Mono<ServerResponse> =
        ServerResponse.ok().bodyValue(users)
}

 

 

4. 애노테이션 컨트롤러

스프링 웹플럭스에서 제공하는 애노테이션 컨트롤러(Annotation Controller)모델은 전통적인 스프링 MVC에서 사용하던 애노테이션 기반의 요청, 응답을 그대로 사용할 수 있습니다. 이는 실제로 Book 서비스코드를 보면서 WebFLux 애노테이션 컨트롤러 모델에 대해 이해해보겠습니다.

 

BookController

package com.hyunseo.springwebflux.book

import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

/**
 * @author ihyeonseo
 */

@RestController
class BookController(
    private val bookService: BookService,
) {
    @GetMapping("/books")
    fun getAll(): Flux<Book> {
        return bookService.getAll()
    }

    @GetMapping("/books/{id}")
    fun get(@PathVariable id: Int): Mono<Book> {
        return bookService.get(id)
    }

    @PostMapping("/books")
    fun add(@RequestBody request: Map<String, Any>): Mono<Book> {
        return bookService.add(request)
    }

    @PostMapping("/books/{id}")
    fun delete(@PathVariable id: Int): Mono<Void> {
        return bookService.delete(id)
    }
}

이처럼 bookService를 DI받습니다. 또한 컨트롤러 함수의 반환타입이 Mono, Flux인 경우 WebFlux에서 자동으로 subscribe를 호출해줍니다. 

 

BookService

package com.hyunseo.springwebflux.book

import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toFlux
import java.util.concurrent.atomic.AtomicInteger

/**
 * @author ihyeonseo
 */

data class Book(val id: Int, val name: String, val price: Int)

@Service
class BookService {
    private final val nextId = AtomicInteger(0)
    val books: MutableList<Book> = mutableListOf(
        Book(id = nextId.incrementAndGet(), name = "코틀린 인 액션", price = 30000),
        Book(id = nextId.incrementAndGet(), name = "HTTP 완벽 가이드", price = 40000),
    )

    fun getAll(): Flux<Book> {
//        return Flux.fromIterable(books)
        return books.toFlux()
    }

    fun get(id: Int): Mono<Book> {
        return Mono.justOrEmpty(books.find { it.id == id })
    }

    fun add(request: Map<String, Any>): Mono<Book> {
        return Mono.just(request)
            .map { map ->
                val book = Book(
                    id = nextId.incrementAndGet(),
                    name = map["name"].toString(),
                    price = map["price"] as Int
                )
                books.add(book)
                book
            }
    }

    fun delete(id: Int): Mono<Void> {
        return Mono.justOrEmpty(books.find { it.id == id })
            .map { books.remove(it) }
            .then()
    }
}

간단히 Book data class를 만들어서 작성했습니다. 그리고 Mono, Flux를 만들기 위한 다양한 팩토리 함수들이 존재합니다. 또한, 코틀린은 이를 지원하기 위한 다양한 확장함수를 제공하는데, getAll에서는 그냥 books를 toFlux를 통해 flux로 변환해서 넘겨줍니다. 

 

스프링 웹플럭스는 리액티브 스트림을 제공하고 이를 사용하기 위해 서버에서 'Mono'나 'Flux'를 반환하면, 웹플럭스는 이를 구독하고 처리합니다. 그리고 그 결과를 HTTP 응답으로 변환하여 클라이언트에게 보냅니다. 따라서 클라이언트에서 따로 'subscribe'를 호출할 필요가 없습니다.

클라이언트에서는 HTTP 응답을 받아서 처리하면 됩니다. 그러나 클라이언트가 리액티브 프로그래밍을 지원하면, 서버에서 보낸 'Mono'나 'Flux' 스트림을 바로 구독하고 처리할 수 있습니다. 예를 들어, 스프링에서는 'WebClient'라는 리액티브 HTTP 클라이언트를 제공합니다. 이를 사용하면 서버에서 보내는 리액티브 스트림을 바로 구독하고 처리할 수 있습니다.

val client = WebClient.create("http://localhost:8080")
val bookFlux = client.get()
    .uri("/books")
    .retrieve()
    .bodyToFlux(Book::class.java)

bookFlux.subscribe { book -> println(book) }​

위처럼 말이죠 WebClient를 사용해서 서버에서 'Flux<Book>'를 받아와 'subscribe'하여 처리하고 있습니다. 서버에서 리액티브 스트림을 제공하면 이렇게 클라이언트에서도 리액티브하게 처리할 수 있습니다.

 

5. 웹 클라이언트

WebClientExample

package com.hyunseo.springwebflux.webclient

import com.hyunseo.springwebflux.book.Book
import org.slf4j.LoggerFactory
import org.springframework.core.ParameterizedTypeReference
import org.springframework.http.HttpMethod
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.client.RestTemplate
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux

/**
 * @author ihyeonseo
 */

@RestController
class WebClientExample {

    val url = "http://localhost:8080/books"

    val log = LoggerFactory.getLogger(javaClass)

    @GetMapping("/books/block")
    fun getBooksBlockingWay() : List<Book> {
        log.info("Start RestTemplate")

        val restTemplate = RestTemplate()
        val response = restTemplate.exchange(url, HttpMethod.GET, null,
            object : ParameterizedTypeReference<List<Book>>() {}
        )

        val result = response.body!!
        log.info("result : {}", result)
        log.info("Finish RestTemplate")
        return result
    }

    @GetMapping("/books/nonblock")
    fun getBooksNonBlockingWay(): Flux<Book> {
        log.info("Start WebClient")

        val flux = WebClient.create()
            .get()
            .uri(url)
            .retrieve()
            .bodyToFlux(Book::class.java)
            .map {
                log.info("result: {}", it)
                it
            }

        log.info("Finish WebClient")
        return flux
    }
}

 

GET /books/block


위처럼 ResTemplate가 기존에 널리 사용되었지만 지금은 Deprecated된 상태입니다. 이의 문제는 요청을 보낸 서버로 부터 응답을 받을 때까지 스레드가 블로킹되어 다른 일을 하지 못한다는 점에 있습니다. 만약 하나의 API에서 여러 서버의 응답을 받아 결합해서 처리하는 기능이 있는 경우라면 하나씩 처리하므로 응답이 느려지는 문제가 발생할 수 있습니다. 이러한 문제점 떄문에 복수개의 응답을 처리하는 경우라면 저희가 위에서 살펴보았던 CompletableFuture와 같은 방식을 사용해야 합니다.

 

GET /books/nonblock

 


WebClient는 위에서 말한것 처럼 스프링에서 제공하는 리액티브 기반의 논블로킹 HttpClient입니다. 스프링 5 이후부터 RestTemplate을 대체하여 논블로킹 방식, 블로킹 방식 모두가 사용이 가능합니다. 이를 사용하면 스레드가 응답을 기다릴 필요없이 처리할 수 있으므로 RestTemplate보다 부하를 줄일 수 있고 여러 서버의 응답을 받아서 처리하는 경우 동시에 여러 서버로 호출이 가능하므로 빠르게 처리가 가능합니다.

 

위 nonblock코드에서 'retrieve()' 메소드를 호출하고 나면, 'bodyToFlux()' 또는 'bodyToMono()' 같은 메소드를 호출하여 HTTP 응답 본문을 Flux나 Mono로 반환할 수 있다고 합니다. 그리고 실제 데이터 처리는 구독자(클라이언트)가 'subscribe()'를 호출할때 시작됩니다. 당연히 이 subscribe는 getBooksNonBlockingway()가 반환됨에 따라 바로 스프링 웹플럭스가 이를 subscribe를 해주는 것일거고요. 그리고 최종적으로 발행이 완료된 값을 웹플럭스가 서버로부터 받아와서 클라이언트에게 이를 반환합니다. (함수 return flux가 된 시점에는 아직 발행이 안된 Flux일 것임)

 

 

5. 스프링 데이터 R2DBC

JDBC

전통적인 방식의 JDBC(Java Database Connectivity) 드라이버는 하나의 커넥션에 하나의 스레드를 사용하는 Thread per Connection 방식이라고 했습니다. 

 

String selectSql = "SELECT * FROM employees";
try (ResultSet resultSet = stmt.executeQuery(selectSql)) {
    List<Employee> employees = new ArrayList<>();
    while (resultSet.next()) {
        Employee emp = new Employee();
        emp.setId(resultSet.getInt("emp_id"));
        emp.setName(resultSet.getString("name"));
        emp.setPosition(resultSet.getString("position"));
        emp.setSalary(resultSet.getDouble("salary"));
        employees.add(emp);
} }

 

Thread per Connection 방식은 데이터베이스로 부터 응답을 받기 전까지 스레드는 블로킹 됩니다. 높은 처리량과 대규모 애플리케이션을 위해 비동기-논블로킹 데이터베이스 API에 대한 요구가 생깁니다. 애플리케이션 로직이 비동기-논블로킹 이더라도 DB 드라이버가 JDBC라면 필연적으로 블로킹이 발생하므로 100% 비동기-논블로킹의 성능을 내기 어려웠습니다.. JPA등도 말이죠

 

 

R2DBC

 

R2DBC(Reactive Relational Database Connectivity) 는 빠르게 성장 중인 리액티브 기반의 비동기-논블로킹 데이터베이스 드라이버입니다. 이또한 Oracle, Postgres, H2, MSSQL, Google Spanner, MariaDB등 다양한 데이터베이스를 지원합니다. 또한 리액티브 스트림 구현체인 Project Reactor, RxJava 등을 지원합니다.

 

connection.createStatement("SELECT * FROM employess")
        .execute()
        .flatMap(r -> r.map((row, metadata) -> {
            Employee emp = new Employee();
            emp.setId(row.get("emp_id", Integer.class));
            emp.setName(row.get("name", String.class));
            emp.setPosition(row.get("position", String.class));
            emp.setSalary(row.get("salary", Double.class));
          return emp;
        }))
        .close()
        .subscribe();

 

스프링 데이터 R2DBC

스프링 데이터 R2DBC는 R2DBC 기반의 스프링 데이터 프로젝트입니다. 이는 스프링 데이터 프로젝트이므로 스프링 애플리케이션에 수비게 통합할 수 있으며 스프링 데이터 JPA, 스프링 데이터 몽고DB 같은 프로젝트처럼 뛰어난 추상화를 제공합니다. 즉 스프링 WebFlux와 스프링 데이터 R2DBC를 같이 사용하면 전 구간 비동기-논블로킹 애플리케이션을 구현할 수 있게 됩니다. 또한 많은 ORM(JPA)에서 제공하는 LazyLoading, Dirt-Checking, Cache 등을 지원하지 않으므로 ORM으로써의 기능은 적지만 오히려 더 심플하게 사용할 수 있습니다.

 

ReactiveCrudRepository

public interface ReactiveCrudRepository<T, ID> extends Repository<T, ID> {
  <S extends T> Mono<S> save(S entity);
  <S extends T> Flux<S> saveAll(Iterable<S> entities);
  <S extends T> Flux<S> saveAll(Publisher<S> entityStream);
  Mono<T> findById(ID id);
  Mono<T> findById(Publisher<ID> id);
  Mono<Boolean> existsById(ID id);
  Mono<Boolean> existsById(Publisher<ID> id);
  Flux<T> findAll();
  Flux<T> findAllById(Iterable<ID> ids);
  Flux<T> findAllById(Publisher<ID> idStream);
  Mono<Long> count();
  Mono<Void> deleteById(ID id);
  Mono<Void> deleteById(Publisher<ID> id);
  Mono<Void> delete(T entity);
  Mono<Void> deleteAllById(Iterable<? extends ID> ids);
  Mono<Void> deleteAll(Iterable<? extends T> entities);
  Mono<Void> deleteAll(Publisher<? extends T> entityStream);
  Mono<Void> deleteAll();
}

이는 리액티브를 지원하는 CRUD 인터페이스 입니다. 위를 보면, 인터페이스의 모든 함수 반환타입이 Mono, Flux 같은 리액터의 Publisher라는 것을 확인할 수 있습니다.

 

간단히 예시를 작성해 보겠습니다.

 

schema.sql

DROP TABLE IF EXISTS car;
CREATE TABLE car
(
    id      bigint NOT NULL AUTO_INCREMENT,
    name    varchar(50),
    price   int,
    primary key (id)
);

 

SpringWebFluxApplication.kt

package com.hyunseo.springwebflux

import io.r2dbc.spi.ConnectionFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.core.io.ClassPathResource
import org.springframework.r2dbc.connection.init.ConnectionFactoryInitializer
import org.springframework.r2dbc.connection.init.ResourceDatabasePopulator

@SpringBootApplication
class SpringWebFluxApplication {

    @Bean
    fun init(connectionFactory: ConnectionFactory) =
        ConnectionFactoryInitializer().apply {
            setConnectionFactory(connectionFactory)
            setDatabasePopulator(
                ResourceDatabasePopulator(ClassPathResource("scripts/schema.sql")))
        }

}

fun main(args: Array<String>) {
    runApplication<SpringWebFluxApplication>(*args)
}

우선 위처럼 간단히 ConnectionFactory를 통해 스키마를 정의해줍니다. 

 

Car.kt

package com.hyunseo.springwebflux

import org.springframework.data.annotation.Id
import org.springframework.data.relational.core.mapping.Column
import org.springframework.data.relational.core.mapping.Table

/**
 * @author ihyeonseo
 */

@Table
data class Car(
    @Id
    val id: Long? = null,

    @Column
    val name: String,

    @Column
    val price: Int,
)

스프링 데이터 JPA와는 다르게 데이터클래스와 val 프로퍼티를 사용할 수 있습니다. 그 이유는 위에서 언급했습니다.

 

CarRepository.kt

package com.hyunseo.springwebflux

import org.springframework.data.repository.reactive.ReactiveCrudRepository
import reactor.core.publisher.Mono

/**
 * @author ihyeonseo
 */


interface CarRepository: ReactiveCrudRepository<Car, Long> {
    fun findByName(name: String) : Mono<Car>
}

 

CarController.kt

package com.hyunseo.springwebflux

import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Mono

/**
 * @author ihyeonseo
 */

@RestController("/cars")
class CarController(
    val carRepository: CarRepository,
) {
   @GetMapping("{name}")
   fun getByName(@PathVariable name: String): Mono<Car> {
       return carRepository.findByName(name)
   }

    @PostMapping
    fun create(@RequestBody map: Map<String, Any>): Mono<Car> {
        val car = Car(
            name = map["name"].toString(),
            price = map["price"] as Int
        )

        return carRepository.save(car)
    }
}

이렇게 애노테이션 컨트롤러를 활용해서 스프링 데이터 R2DBC를 통해 DB로부터 데이터를 넣고 ReactiveCrudRepository를 통해 데이터를 이름으로 뽑아오는 작업을 진행할 수 있게 됩니다.