Что такое реактивное программирование?
Большинство людей, которые так или иначе знакомы со сферой IT, вероятно слышали или даже имеют представление о таких видах программирования, как: функциональное и объектно-ориентированное программирование. Однако термин "реактивное программирование" не так известен и популярен, и поэтому многим непонятен.
Давайте дадим определение тому, что такое реактивное программирование.
Возможно, Вы уже смогли догадаться из названия, что этот тип программирования связан с какой-то реакцией (откликом, отзывом) на что-то (на произошедшее действие/событие). Действительно, это так.
Реактивное программирование - это декларативная парадигма (архетип, паттерн, стандарт) программирования, связанная с потоками данных (data streams) и распространением изменений (propagation of change). С помощью этой парадигмы можно с легкостью управлять статическими (например, массивы) или динамическими (например, источники событий [event emitters]) потоками данных. Если сказать кратко, это программирование, предназначение которого - эффективно, с хорошим быстродействием, справляться с асинхронными потоками данных. Приложения, построенные на основе этого архетипа, являются неблокирующими и управляются событиями. Еще можно сказать так: в реактивном программировании события, уведомления и HTTP-запросы представляются в виде потоков данных, и приложения создаются для обработки таких потоков.
Особенности реактивного программирования
1) Реактивное программирование - довольно молодой стандарт программирования
2) Ключевые принципы, заложенные в него - это асинхронность и неблокирующая работа
- Когда программа работает синхронно, то для того, чтобы она могла перейти к выполнению следующей строки своего кода, ей необходимо дождаться выполнения предыдущих строк (а это может быть долгое обращение к базе данных или к api). Таким образом, при синхронном выполнении, долгие операции блокируют основной поток и приложение сильно замедляется и становится неотзывчивым.
- При асинхронном способе, программа не будет ждать завершения предыдущих строк своего кода (тяжелые операции, такие как получение ответа от api или от базы данных будут происходить в отдельных потоках, так сказать, в фоновом режиме) и продолжит своё выполнение. Всякий раз, когда будет приходить ответ от api или базы данных, программа будет его обрабатывать, при этом, не блокируя и не мешая основному потоку.
Рассмотрим пример, как на практике выглядит связка асинхронность + неблокирующая работа:
Предположим, у нас есть веб-приложение, например, социальная сеть. В нём, на главной странице, пользователь вводит свой логин и пароль, и далее в фоновом режиме приложение отправляет get-запрос на сервер для проверки данных пользователя, и в случае успеха, пользователю отображается его домашняя страница.
Представим, что на сервере, в пуле потоков имеется ограниченное количество потоков, скажем, 20-ть.
Давайте посмотрим и сравним, как будет обрабатывать этот пользовательский запрос API, построенное классическим способом, и как его будет обрабатывать API с реализованными принципами реактивного программирования.
Случай № 1. Классическое REST-API без реактивного программирования
В случае классического API, когда пользователь (клиент) делает запрос, на сервере этому запросу назначается отдельный поток из пула потоков. Этот поток обращается в базу данных и извлекает данные. Поток будет ждать пока полностью не получит ответ/данные из базы данных. Когда он дождется, он отправит результат обратно клиенту. После завершения своей работы, поток освобождается и вновь становится доступным в пуле потоков.
Если в одно и тоже время несколько клиентов сделают запросы к серверу, каждому из запросов, будет назначен свой отдельный поток из пула потоков. Таким образом, если одновременно обратятся 20-ть пользователей, то все наши 20-ть потоков из пула будут задействованы (заняты). И если поступит 21-ый запрос, ему придется ожидать, пока какой-либо из потоков не освободится.
Случай № 2. REST-API с применением реактивного программирования
В АПИ с применением реактивного программирования, когда клиент делает запрос, этому запросу также назначается отдельный поток из пула потоков и этот поток обращается в базу данных, чтобы извлечь данные. Однако, поток не ждет получения данных. Он вместо этого посылает определенное событие в базу данных (делегирует), затем освобождается и становится готовым обработать другой запрос от клиента. Когда приходит ответ из базы данных, то он обрабатывается одним или несколькими доступными потоками из пула. С помощью подобного делегирования операций ввода/вывода, приложение, даже имея минимальное количество доступных потоков, остается способным обрабатывать множество одновременных запросов. Таким образом, реактивное программирование действительно является асинхронным и неблокирующим.
3) Оно следует функциональному стилю разработки
Реактивное программирование поддерживает функциональный стиль кода, который очень схож со стилем Java 8 Stream API.
Функциональное программирование - это стандарт программирования, в котором программы создаются путем применения и компоновки функций. Преимущество функционального программирования состоит в том, что код, написанный с его использованием, получается более кратким, читабельным и предсказуемым, его легко тестировать и поддерживать.
Рассмотрим пример. Предположим мы разрабатываем веб-приложение "музыкальный проигрыватель (плеер)". И нам необходимо получить список песен - Playlist, по его идентификатору (id).
Вот как может выглядеть реализующий нашу задачу код, выполненный в стиле функционального программирования:
fun getPlaylistById(id: Int): Mono<Playlist> {
return playlistRepository
.findById(id)
.switchIfEmpty(
Mono.error(NotFoundException("Playlist with playlist id : $id not found"))
)
}
Сначала мы обращаемся в репозиторий playlistRepository и пытаемся найти плейлист по его идентификатору (функция findById). Если результат получен, возвращаем его как Mono<Playlist>. Если результат не найден, выполняется функция switchIfEmpty, которая вызывает ошибку NotFoundException с соответствующим сообщением.
4) Данные представлены в виде потока, управляемого событиями
Это особенно удобно, когда необходимо получать/отображать данные в режиме реального времени или выполнять потоковую передачу данных (streaming). Взять к примеру, данные о погоде или слежение за счётом какого-либо важного спортивного события. Обычно, такие данные непрерывно обновляются. Можно сказать, они поступают непрерывным потоком. И всякий раз, когда в базу данных добавляются новые данные, запускается определенное событие, и эти данные отображаются / отправляются в качестве ответа. Это достигается с помощью метода onNext() из Reactive Stream API. Мы рассмотрим его далее в этой статье.
5) Оно поддерживает Противодавление [Backpressure] в потоке данных (Обратное давление на поток данных)
В нереактивном приложении иногда может произойти следующая ситуация: оно может запросить данные из базы данных и если последняя вернёт слишком большой объём, то приложение упадёт из-за нехватки памяти. Однако с помощью реактивного программирования, подобной ситуации можно избежать. Можно добавить ограничение на количество получаемых данных. Также, из-за неблокирующей природы (не блокирующей потоки) реактивного программирования, сервер не отправляет весь поток данных сразу. Он может передавать данные по мере того, как они станут доступны.
Давайте обобщим всё вышесказанное. С помощью реактивного программирования можно добиться быстрой, плавной и отзывчивой работы приложения, так как все данные в нём являются потоками, управляемыми событиями, причем потоками - асинхронными и неблокирующими основной поток. Реактивное программирование обеспечивает поддержку обратного давления, что хорошо сказывается на стабильности и производительности. А также благодаря применению стиля функционального программирования, код получается более чистым и читаемым.
Spring, Spring WebFlux и Project Reactor
Spring - этот самый популярный фреймворк для разработки приложений на Java.
В состав его web-части входит spring web-mvc framework, который основан на пулах потоков.
Что касается реактивного стека, то туда входит фреймворк Spring WebFlux, который основан на механизме цикла событий и является неблокирующим, асинхронным, обеспечивает поддержку обратного давления.
Начиная с 5-ой версии, Spring поддерживает разработку реактивных приложений с помощью встроенной в него библиотеки Reactor, и имеет инструменты для создания реактивных REST-серверов. Spring Webflux как раз использует именно Reactor в качестве библиотеки для реализации реактивного программирования. Reactor - это реактивная библиотека четвертого поколения, основанная на спецификации Reactive Streams. Она предназначена для создания неблокирующих приложений на JVM. Reactor уже из коробки поддерживает неблокирующее противодавление (backpressure). Для работы с потоками данных, библиотека предлагает два реактивных API: Flux и Mono.
API Реактивных потоков (Reactive Stream API)
API Реактивных потоков появилось в Java с 9-ой версии языка.
Реактивный поток (Reactive Stream) - это спецификация стандарта обработки асинхронных потоков данных с поддержкой неблокирующего противодавления. Reactive Stream основан на модели Издатель-Подписчик (Publisher-Subscriber) и оперирует 4-мя интерфейсами:
- Publisher
- Subscriber
- Subscription
- Processor
Рассмотрим эти интерфейсы немного подробнее.
Publisher (Издатель)
Издатель - это поставщик данных в виде событий (например, сервер или база данных). Издатель обрабатывает и публикует события своим подписчикам, когда те подписываются на него.
Интерфейс Publisher имеет только один метод - подписаться (subscribe):
interface Publisher<T> {
fun subscribe(s: Subscriber<in T>)
}
Subscriber (Подписчик)
Подписчик получает/обрабатывает события от Издателя. У Подписчика есть 4 метода для работы с полученными событиями:
onSubscribe(s: Subscription): этот метод вызывается автоматически, когда Издатель регистрируется после того, как Подписчик вызывает метод subscribe. Издатель отправляет событие подписки в этом методе, сообщающее, что подписка выполнена успешно.
onNext(t: T): этот метод вызывается при каждой передаче данных от Издателя к Подписчику. Если Издатель опубликует 5 событий, onNext будет вызван 5 раз.
onError(t: Throwable): если при обработке события возникает какая-либо ошибка, вызывается этот метод.
onComplete(): если при обработке события не было ошибки и все события успешно завершены, вызывается этот метод.
interface Subscriber<T> {
fun onSubscribe(s: Subscription)
fun onNext(t: T)
fun onError(t: Throwable)
fun onComplete()
}
Subscription (Подписка)
Подписка определяет однозначную связь между Подписчиком и Издателем. У неё есть 2 метода — запросить (request) и отменить (cancel). Подписчик вызывает этим методы, чтобы соответственно получить данные от Издателя и отменить получение данных.
interface Subscription {
fun request(n: Long)
fun cancel()
}
Processor (Обработчик)
Обработчик представляет собой стадию (операцию) обработки, состоящую из обоих Издателя и Подписчика.
interface Processor<T, R> : Subscriber<T>, Publisher<R>
Ниже приведена диаграмма, описывающая жизненный цикл (рабочий процесс) реактивного потока Reactive Stream.
1) Сначала Подписчику необходимо зарегистрироваться у Издателя, вызвав метод subscribe().
2) После этого, Издатель посылает событие подписки Подписчику и вызывается метод onSubscribe() Подписчика.
3) Далее Подписчик запрашивает данные у Издателя (он может запросить n-ое кол-во данных), используя метод request(n);
4) Затем Издатель, по мере того, как данные будут доступны, будет их публиковать в форме потоков данных (data streams), вызывая метод события onNext(). При n-ном количестве событий/записей метод onNext() будет вызван n раз.
5) В итоге, по завершению, если не было никаких ошибок, Издатель запустит событие onComplete(). Если в ходе какого-либо этапа произошла ошибка, будет вызвано событие onError().
Mono и Flux
Mono и Flux - это реактивные типы, реализующие Издателя (Publisher). Объект flux представляет собой реактивную последовательность от 0 до N элементов, в то время как объект Mono представляет 0 или 1 элемент.
Flux
Flux<T> (где T - тип Flux, т.е. данные, это может быть целое число, класс или что угодно) - это стандартный Издатель, который представляет собой асинхронную последовательность от 0 до N элементов. Для него действуют все методы спецификации Reactive Stream, применимые к Publisher или вызываемые им (subscribe, onNext, onComplete, onError и др.). Flux используется тогда, когда ожидается, что сервер возвратит количество данных от 0 до N.
Например, возвращаясь к приложению "музыкальный плеер", если мы делаем запрос к серверу для получения всех песен, мы ожидаем что он вернет нам от 0 до N объектов типа "песня" в качестве ответа. В этом случае мы будем использовать именно Flux.
Пример создания Flux:
//создаем переменную allSongs (тип String), отожествляющую Издателя типа Flux. Издатель является источником названий песен.
val allSongs = Flux.just("lullaby", "sugar and brownies", "brightest light")
//нам необходимо подписаться на Издателя allSongs, чтобы получать потоки данных от него.
allSongs.subscribe()
//С помощью лямбда-функции мы можем, например, вывести на экран название песен
allSongs.subscribe({ song -> println(song)}) //lullaby, sugar and brownies, brightest light
//В случае если где-то в процессе произойдет ошибка, мы можем обработать ее следующим образом
allSongs.subscribe({ song -> println(song)},
{ error -> println("error is $error")})
Существует метод .log(), который мы можем использовать для отслеживания и наблюдения всех сигналов потока. События записываются в консоль.
Mono
Mono<T> (где T - тип Mono, это может быть целое число, класс или что угодно) - это специализированный Издатель, который выдает максимум один элемент. Для него также действуют все методы спецификации Reactive Stream, применимые к Publisher или вызываемые им (subscribe, onNext, onComplete, onError и др.). Мы применяем Mono, когда ожидаем от сервера данные в размере максимум одного. Например, если мы делаем запрос к серверу, чтобы получить песню с каким-либо определенным id, то в качестве ответа ожидаем получить либо 0 данных (песня отсутствует), либо 1 (один) объект типа "песня". В этом случае мы используем именно Mono.
Пример создания Mono:
//Создаем Издателя типа Mono (источник названия песни)
val song = Mono.just("on my way")
//нам нужно подписаться на поток данных, чтобы Издатель начал выдавать (emit) элементы
song.subscribe()
//С помощью лямбда-функции мы можем вывести на экран название песни
song.subscribe({ songName -> println(songName)}) //on my way
//В случае если где-то в процессе произойдет ошибка, мы можем обработать ее следующим образом
song.subscribe({ songName -> println(songName)},
{ error -> println("error is $error")})
Операции с реактивными потоками
Библиотека Reactor содержит в себе операторы для работы с объектами Mono и Flux.
Рассмотрим некоторые из них.
filter: используется для фильтрации последовательности. Например, если у нас есть Flux из списка чисел [1,2,3,4,5,6] и мы хотим отфильтровать все нечетные числа, оставив только четные, то мы можем это сделать с помощью filter.
Пример кода:
val numbers = Flux.just(1,2,3,4,5,6)
val evenNumbers = numbers.filter({ num -> num % 2 == 0 })
evenNumbers.subscribe({ num -> println(num) }) // 2,4,6
//или эти 3 линии кода можно объединить, следуя функциональному стилю программирования:
Flux.fromIterable(listOf(1,2,3,4,5,6))
.log() //для логирования (необязательно)
.filter { value -> value % 2 == 0 }
.subscribe({value -> println(value) }) // output -> 2,4,6
map: используется для преобразования существующей последовательности по принципу 1 к 1. Например, если у нас есть Flux из списка чисел [1,2,3,4,5,6] и мы хотим возвести все числа в квадрат, то мы можем использовать map.
Пример кода:
val numbers = Flux.just(1,2,3,4,5,6)
val squaredNumbers = numbers.map { num -> num*num }
squaredNumbers.subscribe({ num -> println(num) }) //1,4,9,16,25,36
//все 3 линии кода вместе:
Flux.fromIterable(listOf(1,2,3,4,5,6))
.log() //для логирования (необязательно)
.map{ num -> num*num }
.subscribe({ value -> println(value) }) //1,4,9,16,25,36
flatMap: используется для преобразования существующей последовательности по принципу 1 к n. Оператор flatMap асинхронно преобразует элементы, испускаемые Flux/Mono, в Publisher, а затем объединяет (flatten) эти внутренние publishers в один Flux путем слияния (merging). Таким образом, в основном для асинхронной работы над испускаемыми (выдаваемыми, emitted) элементами используется flatMap. Например, если у нас есть Flux из списка чисел, который является ничем иным, как списком id песен, и мы хотим получить песни, соответствующие каждому из этих id (из базы данных, делая к ней асинхронные вызовы), то мы используем flatMap.
Пример кода:
fun fluxFlatMap() {
Flux.fromIterable(listOf(1,2,3,4,5,6))
.log()
.flatMap{ id -> getSongName(id) }
.subscribe({ song -> println(song) })
}
//Функция имитирует асинхронное обращение к базе данных:
private fun getSongName(id: Int?): Mono<String> {
val songs = mapOf(
1 to "pink venom",
2 to "nobody",
3 to "passionfruit",
4 to "nevermind",
5 to "this time"
)
Thread.sleep(3000);
return songs.getOrDefault(id, "not found").toMono()
}
результат:
merge: используется для объединения нескольких реактивных потоков (издателей) без сохранения их последовательности. Если мы оперируем двумя Flux из списков элементов: [1,2,3,4,5] и [11,12,13,14,15], и при их получении происходит задержка в получении отдельных элементов, то при merge(list1,list2) последовательность не обязательно будет строго упорядочена: [1,2,3,4,5,11,12,13,14,15]. Мы можем получить и другой порядок последовательности, например: [1,2,3,11,4,5,13,12,14,15]
val list1 = Flux.just(1, 2, 3, 4, 5, 6)
val list2 = Flux.just(11, 12, 13, 14, 15, 16)
val mergedList = Flux.merge(list1, list2)
mergedList.subscribe { num -> println(num) }
concat: используется для объединения издателей с сохранением их последовательности. Если мы оперируем двумя Flux из списков элементов: [1,2,3,4,5] и [11,12,13,14,15], и при их получении происходит задержка в получении отдельных элементов, то при concat(list1,list2) последовательность будет строго упорядочена: [1,2,3,4,5,11,12,13,14,15].
val list1 = Flux.just(1, 2, 3, 4, 5, 6)
val list2 = Flux.just(11, 12, 13, 14, 15, 16)
val mergedList = Flux.concat(list1, list2)
mergedList.subscribe { num -> println(num) }
zip: zip используется для объединения нескольких издателей путём ожидания, пока все источники выдадут один-единственный (единичный, один, one-one) элемент, с последующим объединением на выходе этих элементов в связку элементов (tuple). Например, если у нас есть два Flux [A,B, C,D] и [1,2,3,4], то объединенный c помощью zip, Flux будет иметь следующий вид: [[A,1], [B,2], [C,3], [D,4]].
Пример кода:
val list1 = Flux.just("hello" , "hi", "hola")
val list2 = Flux.just("reactor", "kt", "java")
val result = Flux.zip(list1, list2)
result.subscribe { tuple -> println(tuple) }
/*
результат:
[hello,reactor]
[hi,kt]
[hola,java]
*/
Spring WebClient
Spring WebClient - это реактивный веб-клиент, представленный в Spring 5. Он является неблокирующим и используется для выполнения http-запросов. Одним из его применений является взаимодействие одного приложения с другим, посредством http вызовов, для получения или отправки данных (обмена данными).
Например, у нас есть приложение для прослушивания музыки, где можно проиграть определенную песню, добавить ее в список воспроизведения или удалить из него. Сами песни наше приложение получает из какого-то другого api. Для того чтобы реактивным способом выполнить http-запрос (вызов) к этому стороннему api из нашего приложения, нам необходимо использовать Spring WebClient. Таким образом, WebClient - это отправная точка для выполнения веб-запросов в реактивных приложениях. WebClient поддерживает как синхронные, так и асинхронные операции.
В этой статье мы рассмотрели, что такое реактивное программирование, каковы его особенности и преимущества, что такое Project reactor, что такое Reactive Stream api, что такое Mono и Flux и как их создать, а также как выполнять различные операции над реактивными потоками, такие как: filter, map, flatmap, merge, concat, zip.
В конце мы рассмотрели, что такое Spring WebClient.
Большое спасибо, что прочитали эту статью :). Надеюсь, Вы получили краткое представление о том, что такое реактивное программирование.
Статья основана на труде "Reactive Programming with Kotlin and Spring WebFlux" автора Jyoti.