null

Реактивное программирование на Kotlin и Spring WebFlux

Что такое реактивное программирование?

Большинство людей, которые так или иначе знакомы со сферой IT, вероятно слышали или даже имеют представление о таких видах программирования, как: функциональное и объектно-ориентированное программирование. Однако термин "реактивное программирование" не так известен и популярен, и поэтому многим непонятен. 
Давайте дадим определение тому, что такое реактивное программирование. 
Возможно, Вы уже смогли догадаться из названия, что этот тип программирования связан с какой-то реакцией (откликом, отзывом) на что-то (на произошедшее действие/событие). Действительно, это так.
Реактивное программирование - это декларативная парадигма (архетип, паттерн, стандарт) программирования, связанная с потоками данных (data streams) и распространением изменений (propagation of change). С помощью этой парадигмы можно с легкостью управлять статическими (например, массивы) или динамическими (например, источники событий [event emitters]) потоками данных. Если сказать кратко, это программирование, предназначение которого - эффективно, с хорошим быстродействием, справляться с асинхронными потоками данных. Приложения, построенные на основе этого архетипа, являются неблокирующими и управляются событиями. Еще можно сказать так: в реактивном программировании события, уведомления и HTTP-запросы представляются в виде потоков данных, и приложения создаются для обработки таких потоков.

Особенности реактивного программирования

1) Реактивное программирование - довольно молодой стандарт программирования

2) Ключевые принципы, заложенные в него - это асинхронность и неблокирующая работа

- Когда программа работает синхронно, то для того, чтобы она могла перейти к выполнению следующей строки своего кода, ей необходимо дождаться выполнения предыдущих строк (а это может быть долгое обращение к базе данных или к api). Таким образом, при синхронном выполнении, долгие операции блокируют основной поток и приложение сильно замедляется и становится неотзывчивым. 
- При асинхронном способе, программа не будет ждать завершения предыдущих строк своего кода (тяжелые операции, такие как получение ответа от api или от базы данных будут происходить в отдельных потоках, так сказать, в фоновом режиме) и продолжит своё выполнение. Всякий раз, когда будет приходить ответ от api или базы данных, программа будет его обрабатывать, при этом, не блокируя и не мешая основному потоку.

Рассмотрим пример, как на практике выглядит связка асинхронность + неблокирующая работа:

Предположим, у нас есть веб-приложение, например, социальная сеть. В нём, на главной странице, пользователь вводит свой логин и пароль, и далее в фоновом режиме приложение отправляет get-запрос на сервер для проверки данных пользователя, и в случае успеха, пользователю отображается его домашняя страница.
Представим, что на сервере, в пуле потоков имеется ограниченное количество потоков, скажем, 20-ть. 
Давайте посмотрим и сравним, как будет обрабатывать этот пользовательский запрос API, построенное классическим способом, и как его будет обрабатывать API с реализованными принципами реактивного программирования.

Случай № 1. Классическое REST-API без реактивного программирования

В случае классического API, когда пользователь (клиент) делает запрос, на сервере этому запросу назначается отдельный поток из пула потоков. Этот поток обращается в базу данных и извлекает данные. Поток будет ждать пока полностью не получит ответ/данные из базы данных. Когда он дождется, он отправит результат обратно клиенту. После завершения своей работы, поток освобождается и вновь становится доступным в пуле потоков.
Если в одно и тоже время несколько клиентов сделают запросы к серверу, каждому из запросов, будет назначен свой отдельный поток из пула потоков. Таким образом, если одновременно обратятся 20-ть пользователей, то все наши 20-ть потоков из пула будут задействованы (заняты). И если поступит 21-ый запрос, ему придется ожидать, пока какой-либо из потоков не освободится.

Случай № 2. REST-API с применением реактивного программирования

В АПИ с применением реактивного программирования, когда клиент делает запрос, этому запросу также назначается отдельный поток из пула потоков и этот поток обращается в базу данных, чтобы извлечь данные. Однако, поток не ждет получения данных. Он вместо этого посылает определенное событие в базу данных (делегирует), затем освобождается и становится готовым обработать другой запрос от клиента. Когда приходит ответ из базы данных, то он обрабатывается одним или несколькими доступными потоками из пула. С помощью подобного делегирования операций ввода/вывода, приложение, даже имея минимальное количество доступных потоков, остается способным обрабатывать множество одновременных запросов. Таким образом, реактивное программирование действительно является асинхронным и неблокирующим.

3) Оно следует функциональному стилю разработки

Реактивное программирование поддерживает функциональный стиль кода, который очень схож со стилем Java 8 Stream API.
Функциональное программирование - это стандарт программирования, в котором программы создаются путем применения и компоновки функций. Преимущество функционального программирования состоит в том, что код, написанный с его использованием, получается более кратким, читабельным и предсказуемым, его легко тестировать и поддерживать.  

Рассмотрим пример. Предположим мы разрабатываем веб-приложение "музыкальный проигрыватель (плеер)". И нам необходимо получить список песен - Playlist, по его идентификатору (id).
Вот как может выглядеть реализующий нашу задачу код, выполненный в стиле функционального программирования:

fun getPlaylistById(id: Int): Mono<Playlist> {
 return playlistRepository
 .findById(id)
 .switchIfEmpty( 
  Mono.error(NotFoundException("Playlist with playlist id : $id not found"))
  )
 }

Сначала мы обращаемся в репозиторий playlistRepository и пытаемся найти плейлист по его идентификатору (функция findById). Если результат получен, возвращаем его как Mono<Playlist>. Если результат не найден, выполняется функция switchIfEmpty, которая вызывает ошибку NotFoundException с соответствующим сообщением.

4) Данные представлены в виде потока, управляемого событиями

Это особенно удобно, когда необходимо получать/отображать данные в режиме реального времени или выполнять потоковую передачу данных (streaming). Взять к примеру, данные о погоде или слежение за счётом какого-либо важного спортивного события. Обычно, такие данные непрерывно обновляются. Можно сказать, они поступают непрерывным потоком. И всякий раз, когда в базу данных добавляются новые данные, запускается определенное событие, и эти данные отображаются / отправляются в качестве ответа. Это достигается с помощью метода onNext() из Reactive Stream API. Мы рассмотрим его далее в этой статье.

5) Оно поддерживает Противодавление [Backpressure] в потоке данных (Обратное давление на поток данных)

В нереактивном приложении иногда может произойти следующая ситуация: оно может запросить данные из базы данных и если последняя вернёт слишком большой объём, то приложение упадёт из-за нехватки памяти. Однако с помощью реактивного программирования, подобной ситуации можно избежать. Можно добавить ограничение на количество получаемых данных. Также, из-за неблокирующей природы (не блокирующей потоки) реактивного программирования, сервер не отправляет весь поток данных сразу. Он может передавать данные по мере того, как они станут доступны.

Давайте обобщим всё вышесказанное. С помощью реактивного программирования можно добиться быстрой, плавной и отзывчивой работы приложения, так как все данные в нём являются потоками, управляемыми событиями, причем потоками - асинхронными и неблокирующими основной поток. Реактивное программирование обеспечивает поддержку обратного давления, что хорошо сказывается на стабильности и производительности. А также благодаря применению стиля функционального программирования, код получается более чистым и читаемым.

Spring, Spring WebFlux и Project Reactor

Spring - этот самый популярный фреймворк для разработки приложений на Java. 
В состав его web-части входит spring web-mvc framework, который основан на пулах потоков.
Что касается реактивного стека, то туда входит фреймворк Spring WebFlux, который основан на механизме цикла событий и является неблокирующим, асинхронным, обеспечивает поддержку обратного давления.
Начиная с 5-ой версии, Spring поддерживает разработку реактивных приложений с помощью встроенной в него библиотеки Reactor, и имеет инструменты для создания реактивных REST-серверов. Spring Webflux как раз использует именно Reactor в качестве библиотеки для реализации реактивного программирования. Reactor - это реактивная библиотека четвертого поколения, основанная на спецификации Reactive Streams. Она предназначена для создания неблокирующих приложений на JVM. Reactor уже из коробки поддерживает неблокирующее противодавление (backpressure). Для работы с потоками данных, библиотека предлагает два реактивных API: Flux и Mono.

API Реактивных потоков (Reactive Stream API)

API Реактивных потоков появилось в Java с 9-ой версии языка.
Реактивный поток (Reactive Stream) - это спецификация стандарта обработки асинхронных потоков данных с поддержкой неблокирующего противодавления. Reactive Stream основан на модели Издатель-Подписчик (Publisher-Subscriber) и оперирует 4-мя интерфейсами:

  • Publisher
  • Subscriber
  • Subscription
  • Processor

Рассмотрим эти интерфейсы немного подробнее.

Publisher (Издатель)

Издатель - это поставщик данных в виде событий (например, сервер или база данных). Издатель обрабатывает и публикует события своим подписчикам, когда те подписываются на него. 
Интерфейс Publisher имеет только один метод - подписаться (subscribe):

interface Publisher<T> {
    fun subscribe(s: Subscriber<in T>)
}

 

Subscriber (Подписчик)

Подписчик получает/обрабатывает события от Издателя. У Подписчика есть 4 метода для работы с полученными событиями:

onSubscribe(s: Subscription): этот метод вызывается автоматически, когда Издатель регистрируется после того, как Подписчик вызывает метод subscribe. Издатель отправляет событие подписки в этом методе, сообщающее, что подписка выполнена успешно.

onNext(t: T): этот метод вызывается при каждой передаче данных от Издателя к Подписчику. Если Издатель опубликует 5 событий, onNext будет вызван 5 раз.

onError(t: Throwable): если при обработке события возникает какая-либо ошибка, вызывается этот метод.

onComplete(): если при обработке события не было ошибки и все события успешно завершены, вызывается этот метод.

interface Subscriber<T> {
    fun onSubscribe(s: Subscription)
    fun onNext(t: T)
    fun onError(t: Throwable)
    fun onComplete()
}

 

Subscription (Подписка)

Подписка определяет однозначную связь между Подписчиком и Издателем. У неё есть 2 метода — запросить (request) и отменить (cancel). Подписчик вызывает этим методы, чтобы соответственно получить данные от Издателя и отменить получение данных.

interface Subscription {
    fun request(n: Long)
    fun cancel()
}

 

Processor (Обработчик)

Обработчик представляет собой стадию (операцию) обработки, состоящую из обоих Издателя и Подписчика.

interface Processor<T, R> : Subscriber<T>, Publisher<R>

Ниже приведена диаграмма, описывающая жизненный цикл (рабочий процесс) реактивного потока Reactive Stream.

1) Сначала Подписчику необходимо зарегистрироваться у Издателя, вызвав метод subscribe().
2) После этого, Издатель посылает событие подписки Подписчику и вызывается метод onSubscribe() Подписчика.
3) Далее Подписчик запрашивает данные у Издателя (он может запросить n-ое кол-во данных), используя метод request(n);
4) Затем Издатель, по мере того, как данные будут доступны, будет их публиковать в форме потоков данных (data streams), вызывая метод события onNext(). При n-ном количестве событий/записей метод onNext() будет вызван n раз. 
5) В итоге, по завершению, если не было никаких ошибок, Издатель запустит событие onComplete(). Если в ходе какого-либо этапа произошла ошибка, будет вызвано событие onError().

Mono и Flux

Mono и Flux - это реактивные типы, реализующие Издателя (Publisher). Объект flux представляет собой реактивную последовательность от 0 до N элементов, в то время как объект Mono представляет 0 или 1 элемент.

Flux

Flux<T> (где T - тип Flux, т.е. данные, это может быть целое число, класс или что угодно) - это стандартный Издатель, который представляет собой асинхронную последовательность от 0 до N элементов. Для него действуют все методы спецификации Reactive Stream, применимые к Publisher или вызываемые им (subscribe, onNext, onComplete, onError и др.). Flux используется тогда, когда ожидается, что сервер возвратит количество данных от 0 до N. 
Например, возвращаясь к приложению "музыкальный плеер", если мы делаем запрос к серверу для получения всех песен, мы ожидаем что он вернет нам от 0 до N объектов типа "песня" в качестве ответа. В этом случае мы будем использовать именно Flux.

Пример создания Flux:

//создаем переменную allSongs (тип String), отожествляющую Издателя типа Flux. Издатель является источником названий песен. 
val allSongs = Flux.just("lullaby", "sugar and brownies", "brightest light")

//нам необходимо подписаться на Издателя allSongs, чтобы получать потоки данных от него. 
allSongs.subscribe()

//С помощью лямбда-функции мы можем, например, вывести на экран название песен
allSongs.subscribe({ song -> println(song)}) //lullaby, sugar and brownies, brightest light

//В случае если где-то в процессе произойдет ошибка, мы можем обработать ее следующим образом
allSongs.subscribe({ song -> println(song)},
                   { error -> println("error is $error")}) 

 

Существует метод .log(), который мы можем использовать для отслеживания и наблюдения всех сигналов потока. События записываются в консоль.

 Mono

Mono<T> (где T - тип Mono, это может быть целое число, класс или что угодно) - это специализированный Издатель, который выдает максимум один элемент. Для него также действуют все методы спецификации Reactive Stream, применимые к Publisher или вызываемые им (subscribe, onNext, onComplete, onError и др.). Мы применяем Mono, когда ожидаем от сервера данные в размере максимум одного. Например, если мы делаем запрос к серверу, чтобы получить песню с каким-либо определенным id, то в качестве ответа ожидаем получить либо 0 данных (песня отсутствует), либо 1 (один) объект типа "песня". В этом случае мы используем именно Mono.

Пример создания Mono:

//Создаем Издателя типа Mono (источник названия песни)
val song = Mono.just("on my way")

//нам нужно подписаться на поток данных, чтобы Издатель начал выдавать (emit) элементы
song.subscribe()

//С помощью лямбда-функции мы можем вывести на экран название песни
song.subscribe({ songName -> println(songName)}) //on my way

//В случае если где-то в процессе произойдет ошибка, мы можем обработать ее следующим образом
song.subscribe({ songName -> println(songName)},
                   { error -> println("error is $error")})

 

Операции с реактивными потоками

Библиотека Reactor содержит в себе операторы для работы с объектами Mono и Flux.
Рассмотрим некоторые из них.

filter: используется для фильтрации последовательности. Например, если у нас есть Flux из списка чисел [1,2,3,4,5,6] и мы хотим отфильтровать все нечетные числа, оставив только четные, то мы можем это сделать с помощью filter.

Пример кода:

val numbers = Flux.just(1,2,3,4,5,6)

val evenNumbers = numbers.filter({ num -> num % 2 == 0 })

evenNumbers.subscribe({ num -> println(num) }) // 2,4,6

//или эти 3 линии кода можно объединить, следуя функциональному стилю программирования:

Flux.fromIterable(listOf(1,2,3,4,5,6))
    .log() //для логирования (необязательно)
    .filter { value -> value % 2 == 0 }
    .subscribe({value -> println(value) }) // output -> 2,4,6

map: используется для преобразования существующей последовательности по принципу 1 к 1. Например, если у нас есть Flux из списка чисел [1,2,3,4,5,6] и мы хотим возвести все числа в квадрат, то мы можем использовать map.

Пример кода:

val numbers = Flux.just(1,2,3,4,5,6)

val squaredNumbers = numbers.map { num -> num*num }

squaredNumbers.subscribe({ num -> println(num) }) //1,4,9,16,25,36

//все 3 линии кода вместе:

Flux.fromIterable(listOf(1,2,3,4,5,6))
    .log() //для логирования (необязательно)
    .map{ num -> num*num }
    .subscribe({ value -> println(value) }) //1,4,9,16,25,36

flatMap: используется для преобразования существующей последовательности по принципу 1 к n. Оператор flatMap асинхронно преобразует элементы, испускаемые Flux/Mono, в Publisher, а затем объединяет (flatten) эти внутренние publishers в один Flux путем слияния (merging). Таким образом, в основном для асинхронной работы над испускаемыми (выдаваемыми, emitted) элементами используется flatMap. Например, если у нас есть Flux из списка чисел, который является ничем иным, как списком id песен, и мы хотим получить песни, соответствующие каждому из этих id (из базы данных, делая к ней асинхронные вызовы), то мы используем flatMap. 

Пример кода:

fun fluxFlatMap() {
    
    Flux.fromIterable(listOf(1,2,3,4,5,6))
        .log()
        .flatMap{ id -> getSongName(id) }
        .subscribe({ song -> println(song) })
}
//Функция имитирует асинхронное обращение к базе данных:
private fun getSongName(id: Int?): Mono<String> {
    val songs = mapOf(
        1 to "pink venom",
        2 to "nobody",
        3 to "passionfruit",
        4 to "nevermind",
        5 to "this time"
    )

    Thread.sleep(3000);

    return songs.getOrDefault(id, "not found").toMono()
}

результат:

merge: используется для объединения нескольких реактивных потоков (издателей) без сохранения их последовательности. Если мы оперируем двумя Flux из списков элементов: [1,2,3,4,5] и [11,12,13,14,15], и при их получении происходит задержка в получении отдельных элементов, то при merge(list1,list2) последовательность не обязательно будет строго упорядочена: [1,2,3,4,5,11,12,13,14,15]. Мы можем получить и другой порядок последовательности, например: [1,2,3,11,4,5,13,12,14,15]

   val list1 = Flux.just(1, 2, 3, 4, 5, 6)
   val list2 = Flux.just(11, 12, 13, 14, 15, 16)

   val mergedList = Flux.merge(list1, list2)

   mergedList.subscribe { num -> println(num) }

concat: используется для объединения издателей с сохранением их последовательности. Если мы оперируем двумя Flux из списков элементов: [1,2,3,4,5] и [11,12,13,14,15], и при их получении происходит задержка в получении отдельных элементов, то при concat(list1,list2) последовательность будет строго упорядочена: [1,2,3,4,5,11,12,13,14,15].

val list1 = Flux.just(1, 2, 3, 4, 5, 6)
val list2 = Flux.just(11, 12, 13, 14, 15, 16)

val mergedList = Flux.concat(list1, list2)

mergedList.subscribe { num -> println(num) }

zip: zip используется для объединения нескольких издателей путём ожидания, пока все источники выдадут один-единственный (единичный, один, one-one) элемент, с последующим объединением на выходе этих элементов в связку элементов (tuple). Например, если у нас есть два Flux [A,B, C,D] и [1,2,3,4], то объединенный c помощью zip, Flux будет иметь следующий вид: [[A,1], [B,2], [C,3], [D,4]].

 Пример кода:

val list1 = Flux.just("hello" , "hi", "hola")
val list2 = Flux.just("reactor", "kt", "java")

val result = Flux.zip(list1, list2)

result.subscribe { tuple -> println(tuple) }

/* 
результат: 
[hello,reactor]
[hi,kt]
[hola,java]
*/

 

Spring WebClient

Spring WebClient - это реактивный веб-клиент, представленный в Spring 5. Он является неблокирующим и используется для выполнения http-запросов. Одним из его применений является взаимодействие одного приложения с другим, посредством http вызовов, для получения или отправки данных (обмена данными). 
Например, у нас есть приложение для прослушивания музыки, где можно проиграть определенную песню, добавить ее в список воспроизведения или удалить из него. Сами песни наше приложение получает из какого-то другого api. Для того чтобы реактивным способом выполнить http-запрос (вызов) к этому стороннему api из нашего приложения, нам необходимо использовать Spring WebClient. Таким образом, WebClient - это отправная точка для выполнения веб-запросов в реактивных приложениях. WebClient поддерживает как синхронные, так и асинхронные операции.

 

В этой статье мы рассмотрели, что такое реактивное программирование, каковы его особенности и преимущества, что такое Project reactor, что такое Reactive Stream api, что такое Mono и Flux и как их создать, а также как выполнять различные операции над реактивными потоками, такие как: filter, map, flatmap, merge, concat, zip. 
В конце мы рассмотрели, что такое Spring WebClient.

Большое спасибо, что прочитали эту статью :). Надеюсь, Вы получили краткое представление о том, что такое реактивное программирование.

Статья основана на труде "Reactive Programming with Kotlin and Spring WebFlux" автора Jyoti.