В прошлой части мы кратко рассмотрели основные возможности Apache NiFi для организации трансфера данных из одной информационной системы в другую, а также описали типовой сценарий использования. Теперь же предлагаю рассмотреть, как начать этот трансфер и построить взаимодействие с внешним миром.
Получение запросов к NiFi, Parameter Context, Controller Service
Чтобы начать передачу данных, необходимо получить запрос, инициирующий процесс отправки. Для этого необходимо, как минимум, задать хост и порт. Для хранения данных о них лучше всего подходит контекст параметров - глобальный набор переменных, характерный для всего потока, к которому можно обращаться из любого процессора. При необходимости изменить какое-либо значение, заданное в параметрах, будет достаточно отредактировать его лишь в одном месте, освобождаясь от проблемы поиска параметра по всем процессорам. Для создания контекста параметров необходимо нажать на меню в правом верхнем углу и выбрать “Parameter Contexts”:
Дадим название новому контексту (Здесь и далее не забываем нажимать кнопку “Apply”, чтобы изменения сохранились!):
Теперь необходимо связать группу процессоров с только что созданным контекстом параметров. Для этого нажмем на группу правой кнопкой мыши, перейдем в “Configure”, затем в поле “Process Group Parameter Context” выберем контекст:
После этих шагов можно перейти непосредственно к созданию параметров. Добавим параметры nifi_host и nifi_port и зададим им определенные значения:
Вот мы и создали первые параметры. Теперь же мы можем добавить процессор для получения запросов - HandleHttpRequest. В его настройках следует указать в качестве значений Listening Port и Hostname недавно созданные nifi_port и nifi_host. Решетка и фигурные скобки обозначают, что введено названия параметра, значение которого должно быть использовано. Однако этих полей недостаточно, среди обязательных есть и свойство “HTTP Context Map”, требующее создать новый сервис:
Если мы нажмем на “Create new service”, то откроется такое окно:
Сервисы - это службы, которые после добавления и настройки в пользовательском интерфейсе, запускаются одновременно с NiFi и предоставляют информацию для использования другими компонентами, в первую очередь, процессорами. Идея заключается в том, что вместо того, чтобы настраивать эту информацию в каждом процессоре, которому она может понадобиться, служба контроллера предоставляет ее любому процессору для использования по мере необходимости. В данном случае мы имеем дело со StandardHttpContextMap, предоставляющим возможность хранить и извлекать HTTP-запросы и ответы, внешние по отношению к процессору. С его помощью несколько процессоров могут взаимодействовать с одним и тем же HTTP-запросом.
После создания необходимо включить сервис, нажав “Enable”:
Вернувшись к процессору для приема запросов, мы видим, что все основные свойства, без которых процессор работать не будет, указаны. При необходимости можно также изменить набор поддерживаемых HTTP-методов (например, ниже, мы разрешаем только GET и POST запросы). Кроме того, можно указать допустимые пути для запросов, модифицируя свойство “Allowed Paths”. В указанном случае NiFi будет принимать только запросы, содержащие /sys1/ и /sys2/
Чтобы NiFi смог в конце передачи отправить ответ на полученный запрос, необходимо также добавить процессор HandleHttpResponse, при этом выставить тот же самый HTTP Context Map в свойствах процессора. Можно по нажатию на “плюс” также добавлять заголовки ответа, как например, Content-Type:
Таким образом, поток из процессоров, который мы сформировали, будет выглядеть примерно так, как ниже. Пока это только процессоры для получения HTTP-запроса и отдачи ответа. Между ними первая группа процессоров, которая будет делать “полезную работу”. Устройство этой группы мы прямо сейчас и рассмотрим.
Управление потоком и установка атрибутов данных
Внутри упомянутой группы мы расставили процессоры, показанные ниже. Рассмотрим, что делает каждый из них.
Основным компонентом является процессор RouteOnAttribute. С его помощью можно задавать условные переходы, возможные пути, по одному из которых продолжится обработка данных в зависимости от выполняющегося условия. В свойствах процессора ниже через кнопку “плюс” мы добавили три условия, проверяющих соответствие URL полученного запроса одному из трех путей. Таким образом мы фиксируем, что есть три типа трансфера, которые в данный момент нуждаются в разных методах обработки.
Каждое из созданных нами свойств становится отдельным отношением, которое передает файл потока в нужный процессор. Подобным образом мы сможем и в дальнейшем разделять поток на разные ветки обработки данных, когда это требуется. Пока же в примере ниже мы хотим задать атрибуты - метаинформацию о передаваемых данных, которая будет разной в зависимости от трансфера. В случае же, если URL не соответствует ни одному запросу, поток переходит по отношению “unmatched” в процессор LogAttribute, логирующий информацию об ошибке.
Зайдем же внутрь одного из процессоров UpdateAttribute. Как говорилось ранее, атрибуты - это метаинформация файла потока, вспомогательные данные, к которым можно обращаться через ${} (аналогично параметрам, но вместо # ставим $). Внутри данного процессора свойства, которые мы добавляем, становятся такими атрибутами. Конечно, набор атрибутов для каждого конкретного случая свой, но здесь мы среди прочего опишем какую информацию полезно добавлять всегда.
Итак, что же это за атрибуты?
- object.display.name - текстовое название трансфера, представленное в “человекочитаемом” виде. Бывает полезно всегда, поскольку часто возникает необходимость отображать в письмах, в HTTP-ответах или в иных выходных данных, предназначенных для пользователя, что собственно передается.
- object.log.prefix - префикс, хранящий значение типа данных для удобного отображения в логах. С помощью этого атрибута удобнее будет осуществлять поиск по объектам в логах. Обычно атрибут используется в процессорах LogAttribute.
- object.type - тип трансфера. В этом месте мы окончательно закрепляем за файлом потока, что за трансфер он собой представляет. Данный атрибут можно будет использовать для дальнейшего ветвления потока обработки. С его помощью можно будет осуществлять переходы в дальнейших процессорах RouteOnAttribute, разделяя обработку для разных объектов только тогда, когда это нужно.
- object.id и object.add_field - это параметры HTTP- запроса, отправленного в NiFi. Здесь мы в качестве значений атрибутов задаем значения других атрибутов, формируемых процессором HandleHttpRequest в начале обработки. Через указанные переменные можно будет в любой момент обращаться к изначальными параметрам из URL.
- object.data.url - атрибут, хранящий адрес, откуда брать вспомогательную информацию и данные для объекта. В нашем примере он нужен, поскольку NiFi мы передаем только идентификатор объекта, а сами данные хранятся в другом месте. В зависимости от конкретного случая этот атрибут можно опустить.
Что в итоге и что дальше
Таким образом, в этой части мы кратко рассмотрели начало построения потока обработки, сделали его «рамку», добавив процессоры для получения HTTP-запросов и отправки ответов. Кроме того, на данном простом примере мы изучили основы использования параметров контекста и сервисов, а также методы ветвления потока и выставления атрибутов. Впереди еще много полезных возможностей NiFi, включая практики взаимодействия с JSON и XML