null

Разработка интерфейса обмена данными между информационными системами с использованием NiFi. Получение запросов, выставление параметров и атрибутов

В прошлой части мы кратко рассмотрели основные возможности Apache NiFi для организации трансфера данных из одной информационной системы в другую, а также описали типовой сценарий использования. Теперь же предлагаю рассмотреть, как начать этот трансфер и построить взаимодействие с внешним миром.

Получение запросов к NiFi, Parameter Context, Controller Service

Чтобы начать передачу данных, необходимо получить запрос, инициирующий процесс отправки. Для этого необходимо, как минимум, задать хост и порт. Для хранения данных о них лучше всего подходит контекст параметров - глобальный набор переменных, характерный для всего потока, к которому можно обращаться из любого процессора. При необходимости изменить какое-либо значение, заданное в параметрах, будет достаточно отредактировать его лишь в одном месте, освобождаясь от проблемы поиска параметра по всем процессорам. Для создания контекста параметров необходимо нажать на меню в правом верхнем углу и выбрать “Parameter Contexts”:

Дадим название новому контексту (Здесь и далее не забываем нажимать кнопку “Apply”, чтобы изменения сохранились!):

Теперь необходимо связать группу процессоров с только что созданным контекстом параметров. Для этого нажмем на группу правой кнопкой мыши, перейдем в “Configure”, затем в поле “Process Group Parameter Context” выберем контекст:

После этих шагов можно перейти непосредственно к созданию параметров. Добавим параметры nifi_host и nifi_port и зададим им определенные значения:

Вот мы и создали первые параметры. Теперь же мы можем добавить процессор для получения запросов - HandleHttpRequest. В его настройках следует указать в качестве значений Listening Port и Hostname недавно созданные nifi_port и nifi_host. Решетка и фигурные скобки обозначают, что введено названия параметра, значение которого должно быть использовано. Однако этих полей недостаточно, среди обязательных есть и свойство “HTTP Context Map”, требующее создать новый сервис:

​​​​​​​

Если мы нажмем на “Create new service”, то откроется такое окно:

Сервисы - это службы, которые после добавления и настройки в пользовательском интерфейсе, запускаются одновременно с NiFi и предоставляют информацию для использования другими компонентами, в первую очередь, процессорами. Идея заключается в том, что вместо того, чтобы настраивать эту информацию в каждом процессоре, которому она может понадобиться, служба контроллера предоставляет ее любому процессору для использования по мере необходимости. В данном случае мы имеем дело со StandardHttpContextMap, предоставляющим возможность хранить и извлекать HTTP-запросы и ответы, внешние по отношению к процессору. С его помощью несколько процессоров могут взаимодействовать с одним и тем же HTTP-запросом.

После создания необходимо включить сервис, нажав “Enable”:

Вернувшись к процессору для приема запросов, мы видим, что все основные свойства, без которых процессор работать не будет, указаны. При необходимости можно также изменить набор поддерживаемых HTTP-методов (например, ниже, мы разрешаем только GET и POST запросы). Кроме того, можно указать допустимые пути для запросов, модифицируя свойство “Allowed Paths”. В указанном случае NiFi будет принимать только запросы, содержащие /sys1/ и /sys2/

Чтобы NiFi смог в конце передачи отправить ответ на полученный запрос, необходимо также добавить процессор HandleHttpResponse, при этом выставить тот же самый HTTP Context Map в свойствах процессора. Можно по нажатию на “плюс” также добавлять заголовки ответа, как например, Content-Type:

​​​​​

Таким образом, поток из процессоров, который мы сформировали, будет выглядеть примерно так, как ниже. Пока это только процессоры для получения HTTP-запроса и отдачи ответа. Между ними первая группа процессоров, которая будет делать “полезную работу”. Устройство этой группы мы прямо сейчас и рассмотрим.

Управление потоком и установка атрибутов данных

Внутри упомянутой группы мы расставили процессоры, показанные ниже. Рассмотрим, что делает каждый из них.

Основным компонентом является процессор RouteOnAttribute. С его помощью можно задавать условные переходы, возможные пути, по одному из которых продолжится обработка данных в зависимости от выполняющегося условия. В свойствах процессора ниже через кнопку “плюс” мы добавили три условия, проверяющих соответствие URL полученного запроса одному из трех путей. Таким образом мы фиксируем, что есть три типа трансфера, которые в данный момент нуждаются в разных методах обработки.

Каждое из созданных нами свойств становится отдельным отношением, которое передает файл потока в нужный процессор. Подобным образом мы сможем и в дальнейшем разделять поток на разные ветки обработки данных, когда это требуется. Пока же в примере ниже мы хотим задать атрибуты - метаинформацию о передаваемых данных, которая будет разной в зависимости от трансфера. В случае же, если URL не соответствует ни одному запросу, поток переходит по отношению “unmatched” в процессор LogAttribute, логирующий информацию об ошибке.

Зайдем же внутрь одного из процессоров UpdateAttribute. Как говорилось ранее, атрибуты - это метаинформация файла потока, вспомогательные данные, к которым можно обращаться через ${} (аналогично параметрам, но вместо # ставим $). Внутри данного процессора свойства, которые мы добавляем, становятся такими атрибутами. Конечно, набор атрибутов для каждого конкретного случая свой, но здесь мы среди прочего опишем какую информацию полезно добавлять всегда.

​​​​​​​

Итак, что же это за атрибуты?

  1. object.display.name - текстовое название трансфера, представленное в “человекочитаемом” виде. Бывает полезно всегда, поскольку часто возникает необходимость отображать в письмах, в HTTP-ответах или в иных выходных данных, предназначенных для пользователя, что собственно передается.
  2. object.log.prefix - префикс, хранящий значение типа данных для удобного отображения в логах. С помощью этого атрибута удобнее будет осуществлять поиск по объектам в логах. Обычно атрибут используется в процессорах LogAttribute.
  3. object.type - тип трансфера. В этом месте мы окончательно закрепляем за файлом потока, что за трансфер он собой представляет. Данный атрибут можно будет использовать для дальнейшего ветвления потока обработки. С его помощью можно будет осуществлять переходы в дальнейших процессорах RouteOnAttribute, разделяя обработку для разных объектов только тогда, когда это нужно.
  4. object.id и object.add_field - это параметры HTTP- запроса, отправленного в NiFi. Здесь мы в качестве значений атрибутов задаем значения других атрибутов, формируемых процессором HandleHttpRequest в начале обработки. Через указанные переменные можно будет в любой момент обращаться к изначальными параметрам из URL.
  5. object.data.url - атрибут, хранящий адрес, откуда брать вспомогательную информацию и данные для объекта. В нашем примере он нужен, поскольку NiFi мы передаем только идентификатор объекта, а сами данные хранятся в другом месте. В зависимости от конкретного случая этот атрибут можно опустить.

Что в итоге и что дальше

Таким образом, в этой части мы кратко рассмотрели начало построения потока обработки, сделали его «рамку», добавив процессоры для получения HTTP-запросов и отправки ответов. Кроме того, на данном простом примере мы изучили основы использования параметров контекста и сервисов, а также методы ветвления потока и выставления атрибутов. Впереди еще много полезных возможностей NiFi, включая практики взаимодействия с JSON и XML

Коротко о себе:

 

Работаю Java\Kotlin Backend Developer в компании Tune-it. На работе занимаюсь проектами, связанными с Liferay, NiFi, Spring Framework, а вне работы - философской антропологией