Работа c Talend Open Studio на примере парсинга CSV файла

Сбор данных из различных источников, преобразование с целью унифицирования либо удобства довольно распространенная задача. Конечно, в большинстве случаев можно обойтись собственным решением, но чтобы оно было гибким и легко расширяемым придется потратить немало времени. В таком случае разумным будет воспользоваться готовым решением. Talend Open Studio (TOS) одно из таких решений.

Меня несколько удивило отсутствие статей про работу с TOS на Хабре. Возможно, тому есть причины, мне непонятные. Как бы то ни было, постараюсь восполнить этот пробел.

Вероятно, при написании этой статьи я был излишне подробен в некоторых вопросах, поэтому некоторые инструкции я спрятал под спойлер.

Итак, TOS это Open Source решение для интеграции данных. Основным средством настройки процесса преобразования данных в TOS является специальный визуальный редактор, позволяющий добавлять и настраивать отдельные узлы преобразования данных и связи между ними.

Интересной особенностью и существенным плюсом TOS, на мой взгляд, является тот факт, что TOS преобразует наши компоненты и связи в код Java. По сути мы получаем библиотеку на Java с возможностью генерировать код на основе графа преобразования данных. Плюс мы можем собрать пакет и запустить его на любой машине, где есть Java и где может отсутствовать TOS.
Генерация кода дает нам еще один плюс — можно расширять возможности TOS путем написания собственного кода (для этого даже есть специальные инструменты).

Отдельное целостное преобразование данных в Talend называется задачей(job). Задача состоит из подзадач, которые, в свою очередь, состоят из компонент и связей. Компоненты непосредственно преобразуют данные либо занимаются вводом/выводом. Связи бывают нескольких типов. Основным средством обмена данными между компонентами являются связи типа “поток”(flow). Поток очень похож на таблицу в БД. У потока есть схема(названия, типы и атрибуты полей) и данные(значения полей). Как сами данные так и схема потока могут быть изменены в процессе обработки. Потоки в TOS не синхронизируются между собой. Они работают независимо друг от друга.

Далее, попробую на примере показать, как настраивается процесс обработки данных.

Предположим у есть нас CSV файл вида:

id,event_name,event_datetime,tag
1,"Hello, world!",2017-01-10T18:00:00Z,
2,"Event2",2017-01-10T19:00:00Z,tag1=q
3,Event3,2017-01-10T20:00:00Z,
4,"Hello, world!",2017-01-10T21:00:00Z,tag2=a
5,Event2,2017-01-10T22:00:00Z,
...

И мы хотим разделить данные по разным событиям(поле event).

Перед началом работы с данными нам потребуется создать задачу. Процесс создания не описываю по причине его тривиальности.

Итак, первое что нам надо сделать это прочитать и распарсить CSV. Для начала создадим запись метаданных для нашего входного CSV файла — это упростит дальнейшую работу (Metadata -> File delimited). Создание File delimited происходит более или менее интуитивно, поэтому подробное описание спрятано под спойлером.

Единственное, что заслуживает упоминания, это расстановка кавычек, при подстановке значений в поля форм. Это относится не только к созданию File delimited но и к большинству других полей в формах. Дело в том, что большинство значений во всевозможных полях будут подставлены в Java код “как есть”, т.е. должны являться выражением Java определенного типа. Если мы задаем строковую константу ее придется написать в кавычках. Это дает нам дополнительную гибкость. Получается что везде, где требуется значение, можно подставить значение параметра или выражения.

Создание File delimited


Далее необходимо задать имя и выбрать файл. Выбираем наш CSV файл с входными данными.

На следующем шаге необходимо настроить парсинг файла.



Интересные поля:

Разделитель полей — у нас запятая (comma).
В секции “Escape Char Settings” нас интересует поле “Text Enclosure”. Зададим значение “\”” — т.е. “двойная кавычка”. Теперь весь текст внутри двойных кавычек будет интерпретироваться как единое целое, даже если внутри есть разделитель(запятая).
В правой части можно настроить пропуск строк и ограничения. Нас это не интересует.
Поставим галочку “Установить строку заголовка в качестве имен столбцов” т.к. у нас в первой строчке находятся имена столбцов. Эти значения станут именами полей.

Кнопка “Refresh Preview” позволит обновить область предпросмотра. Убеждаемся, что все в порядке и идем дальше.

Далее от нас требуется настроить схему для выходного потока. Схема это набор типизированных полей. Каждому полю также могут быть назначены некоторые атрибуты.



Заголовки из CSV файла стали именами полей. Тип каждого поля определяется автоматически исходя из данных в файле. Здесь нас все устраивает, кроме формата даты. Дата в нашем файле выглядит примерно следующим образом 2017-01-10T22:00:00Z и для ее парсинга нужен шаблон «yyyy-MM-dd'T'HH:mm:ss'Z'». Обратите внимание на кавычки. Дело в том, что большинство значений во всевозможных полях будут подставлены в java код “как есть”, т.е. должны являться выражением Java определенного типа. Если мы задаем строковую константу ее придется написать в кавычках.

Теперь у нас есть шаблон парсера CSV файлов заданного формата.

Далее добавим компонент, который будет заниматься парсингом. Нужный нам компонент называется tFileInputDelimited.

О добавлении компонентов
Компоненты можно найти в меню компонентов (обычно справа) в разделе (tFileInputDelimited в разделе “Файл->Вход”) и перетащить в рабочую область, но можно поступить проще: кликнуть в любой точке рабочей области и начать набирать название компонента. Появится подсказка со списком компонент.



О соединении компонентов
Компонент можно выделить нажав на его иконку. При этом около иконки появится “язычок” “O” и в окне просмотра настроек компонента появится информация и текущем состоянии. “Язычок” “O” (output) это выходные данные. Потянув за него мы можем соединить компонент с другим компонентом.

Далее, настроим наш парсер. Для компонента tFileInputDelimited в настройках установим “Property type” в значение “Repository” и выберем ранее созданный шаблон.

Теперь парсер настроен на парсинг файлов нужного нам формата и если запустить работу то в логах мы увидим содержимое нашего исходного CSV файла. Проблема в том, что если парсер связан с шаблоном он жестко настроен на файл в шаблоне. Мы не можем указать другой файл такого же формата, что может быть не очень удобно, когда мы заранее не знаем что за файл собираемся обрабатывать.

Из этой ситуации есть два выхода. Первый — все время подменять файл из шаблона на нужный файл. Второй — развязать компонент парсера и шаблон. В этом случае настройки парсинга можно сохранить, но появляется возможность задать входной файл. Недостатки первого способа очевидны, к недостаткам второго относится отсутствие синхронизации между шаблоном и парсером. Если мы изменим шаблон то синхронизировать настройки парсера нужно будет вручную. Мы пойдем вторым путем и отвяжем парсер от шаблона. Для этого вернем значение “Build-In” в поле “Property type”. Настройки сохранились, но появилась возможность их менять.

Изменим имя входного файла на выражение context.INPUT_CSV. Обратите внимание, имя было в кавычках (строковая константа), а наше выражение без кавычек. Оно является параметром контекста. Также нужно создать этот параметр на вкладке контекста. Для отладки можно задать значение по умолчанию. Параметры контекста можно задавать как параметры командной строки (примерно так --context_param INPUT_CSV=path). Это относится к запуску собранного пакета Java.

Далее. Мы хотим разделить данные по именам событий.

Для этого потребуется компонент tMap и несколько tFilterRow. Давайте пока ограничимся двумя tFilterRow т.е. будем выделять только два разных события. Соединим их как показано на рисунке:



При соединении tMap и tFilterRow потребуется ввести имя для связи. Имя должно быть уникально. Далее нужно настроить компонент tMap. Для этого войдем в меню Map Editor либо дважды кликнув по иконке tMap, либо вызвав редактор из панели свойств компонента.

В нашем случае нам нужно только “скопировать” поток, поэтому просто перетащим все поля входящих данных (слева) в каждый из потоков вывода (справа).



В секции в середине можно задать внутреннюю переменную (можно использовать для генерации новых значений, например номеров строк, или подстановки параметров). В каждой ячейке редактора можно написать выражение. По сути то что мы сделали это и есть подстановка значений. row1 на скриншоте это имя входного потока. Теперь наш маппер будет делить входные данные на два потока.

Настройка фильтров tFilterRow не представляет из себя ничего особенного.

О настройке tFilterRow
Добавляем входную колонку, выбираем тип условия и вводим значение. Мы установим фильтры на поле event_name. Один фильтр будет проверять на равенство (==) «Hello, world!» (в кавычках), а второй «Event2».

Параметр “Функция” в настройках компонента задает преобразование входных данных и задает функцию преобразования F. Тогда условия отбора будет: F(input_column) {comparator} value. У нас нет функции F,{comparator} это равенство, а value это «Hello, world!». Получаем в нашем случае input_column == «Hello, world!».

Добавим после фильтров пару tLogRow, запустим и увидим что данные делятся. Единственно, лучше установить Mode для tLogRow в что-то отличное от “Основной”, иначе данные перемешаются.

Вместо tLogRow можно добавить какой угодно другой компонент вывода данных, например tFileOutputDelimited для записи в CSV файл, или компонент базы данных для записи в таблицу.

Для работы с базой данных есть множество компонент. Многие из них имеют в настройках поля для настройки доступа к базе. Однако если предполагается много обращаться к базе из разных компонент лучше всего использовать следующую схему:



Компонент Connection задает параметры доступа к базе и устанавливает соединение. Компонент Close отключается от базы. В среднем блоке, где на рисунке находится единственный компонент Commit, можно использовать базу данных не устанавливая новых соединений. Для этого в настройках компонентов нужно выбрать опцию “Использовать существующее соединение” и выбрать нужный компонент Connection.

Здесь также используется еще один механизм TOS — подзадачи (subjob). Путем создания подзадач можно добиться, чтобы некоторые части задачи завершались прежде, чем начнутся другие. В данной примере компонент Commit не начнет работу пока не установится соединение. Между подзадачами проводится связь OnSubjobOk (в контекстном меню компонента доступен пункт Trigger, внутри которого есть эта связь). Существуют и другие связи, например, ObSubjobError для обработки ошибок.

Вернемся к нашему примеру с CSV файлом.

Поле tag у нас не очень подходит для записи в базу данных — tag2=a. Наверняка мы захотим разделить пару ключ-значение по разным полям в базе. Это можно сделать разными способами, но мы сделаем это при помощи компонента tJavaFlex. tJavaFlex это компонент, поведение которого можно описать на языке Java. В его настройках присутствуют три секции — первая выполняется до начала обработки данных (инициализация), вторая занимается обработкой данных и третья выполняется после обработки всех данных. Также, как и у остальных компонент есть редактор схемы. Удалим из схемы данных на выходе поле tag и добавить пару новых — tag_name и tag_value (типа String).



Далее, в средней секции компонента напишем
row4.tag_name = "";
row4.tag_value = "";

if(row2.tag.contains("="))
{
String[] parts = row2.tag.split("=");
row4.tag_name = parts[0];
row4.tag_value = parts[1];
}

Код тривиальный, и, пожалуй единственной что стоит пояснить, это конструкции вида row4.tag_value. tag_value это имя поля, созданного нами. Таким же образом можно обращаться к другим полям. row4 это имя исходящего потока (row2 входящего). Их можно поменять.



Таким образом, теги будут разделены на два поля. Однако нужно убедиться, что в настройках tJavaFlex стоит галочка “Авто-распространение данных”, иначе все остальные данные исчезнут. По сути мы просто дописали дополнительное преобразование. Прочие поля совпадают по названию и будут скопированы автоматически.

Далее я расскажу о двух чуть более сложных и конкретных вещах.

Предположим, что мы все же хотим сложить наши данные в базу данных. Соответственно, у нас есть табличка Event с полями: имя события, идентификатор события, дата события и ссылка на запись в таблице тегов. В таблице тегов два поля — ключ и значение. Мы хотим добавлять нашу пару ключ-значение в таблицу тегов только в том случае, если ее там нет. И хотим также добавлять связь между таблицами тегов и событий.
Т.е. мы хотим:

  • проверить есть ли такая пара ключ-значение в таблице тегов и если нет добавить
  • получить id записи в базе данных, соответствующей нашему тегу
  • записать данные о событии, включая id записи в таблице тегов, в таблицу событий

Для того чтобы добавить запись только в том случае если ее нет в Postgres можно использовать конструкцию вида
INSERT
WHERE NOT EXISTS
Сделать это можно при помощи компонента tPostgresqlRow. Это компонент позволяет выполнить произвольный SQL запрос. Но нам придется подставить в наш запрос реальные данные. Это можно сделать, например, так

String.format("
INSERT INTO tag(tag_name, tag_value)
	SELECT \'%s\', \'%s\'
	WHERE NOT EXISTS
	(SELECT * FROM tag
	WHERE tag_name = \'%s\'
	AND tag_value = \'%s\');",
input_row.tag_name, input_row.tag_value,
input_row.tag_name, input_row.tag_value)

Да, параметры перечисляются два раза одни и те же (возможно я слишком плохо знаю Java). Обратите внимание, что точка с запятой в конце кода на Java не нужна.

После этого нужно сделать тривиальный запрос, чтобы получить id записи в таблице.
Но в случае с Postgres можно пойти более простым путем, и использовать RETURNING id. Однако этот механизм вернет значение только если данные будут добавлены. Но при помощи подзапроса можно обойти это ограничение. Тогда наш запрос преобразуется во что-то подобное:

String.format("
WITH T1 AS (
	SELECT * 
	FROM tag
	WHERE tag_name = \'%s\'
	AND tag_value = \'%s\'
), T2 AS (
	INSERT INTO tag(tag_name, tag_value)
	SELECT \'%s\', \'%s\'
	WHERE NOT EXISTS (SELECT * FROM T1)
	RETURNING tag_id
)
SELECT tag_id FROM T1
UNION ALL
SELECT tag_id FROM T2;",
input_row.tag_name, input_row.tag_value,
input_row.tag_name, input_row.tag_value)

Как получить значение из запроса
В случае если запрос должен возвращать значения в компоненте tPostgresqlRow нужно включить опцию “Propagate QUERY’s recordset” (на вкладке “Advanced settings”), а также в исходящем потоке нам потребуется поле типа Object, которое нужно указать как поле для распространения данных. Чтобы извлечь данные из recordset нам потребуется компонент tParseRecordSet. В настройках в поле “Prev. Comp. Column list” нужно выбрать наше поле, через которое распространяются данные. Далее в таблице атрибутов для полей прописать имена полей, возвращенных запросом.
Должно получиться примерно следующее:



Т.е. все наши поля автоматически будут установлены в нужные значения, а новое поле dbtag_id типа int будет взят из результатов запроса по ключу “tag_id”. Сложить все в таблицу event можно при помощи того же tPostgresqlRow или tProstgresqlOutput.

В итоге получится примерно следующая схема:




Отдельного рассмотрения заслуживает случай, когда у нас возникает необходимость в создании замкнутой структуры. TOS не позволяет делать замкнутых структур, даже если они не являются циклическими. Дело в том, что потоки данных живут каждый сами по себе, не синхронизируются, и могут нести разное количество записей. Наверняка, почти всегда можно обойтись без образования замкнутых контуров. Для этого придется не делить потоки и делать все в одном. Но если очень хочется, то при желании можно обойти ограничение на создание замкнутых структур. Нам потребуется компоненты tHashInput и tHashOutput.

Как найти tHashInput и tHashOutput
По умолчанию они не отображаются в панели компонент и их придется сначала добавить туда. Для этого нужно перейти в меню Файл -> Edit project properties -> Дизайнер -> Palette Settings далее в закладке технические найти наши компоненты и добавить в рабочий набор.

Эти компоненты позволяют сохранять потоки в памяти и после осуществлять к ним доступ. Можно, конечно, воспользоваться временным файлом, но хэш наверняка быстрее (если у вас хватит оперативной памяти).

Добавляем компонент tHashOutput и входящий поток, который хотим сохранить. Компоненту можно настроить на самостоятельную работу, либо на дописывание данных в другой tHashOutput компонент. В этом случае компонент будет работать подобно union из sql, т.е. данные будут записываться в один общий поток.Придется создать новую подзадачу, в которой будет осуществляться слияние. Не забудьте добавить связь OnSubjobOk.

Для каждого потока, работающего индивидуально, нужно создать компоненту tHashInput. У этой компоненты есть недостаток — даже после указания компоненты tHashOutput, из которой будут браться данные, схема не подгрузится автоматически. Далее все tHashInput нужно объединить при помощи tMap. Только один поток будет помечен как Main, по нему будут синхронизироваться остальные входящие потоки, а также исходящий, остальные входящие потоки будут Lookup. Кроме того нам надо задать связь между потоками, иначе мы получим Cross Join.



Теперь, несмотря на то что в левой части маппера много потоков, можно считать что у нас один входной поток и использовать для маппинга любые поля любых потоков.

На этом все, спасибо всем кто дочитал до конца.
Поделиться публикацией
Похожие публикации
AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама
Комментарии 5
  • –1

    Может я чего не понимаю, но такие задачи я делал в своей практике десятки раз. С фиксированным набором полей, с неизвестным набором полей, в базу, в очередь, insert или merge — куда угодно и как угодно.


    Такая задачка обычно укладывается минимум строк в 30 кода, максимум — в зависимости от сложности обновления базы. В объем вашего поста решение на Camel по опыту уложится целиком, без всяких проблем, с большим запасом.


    И ни разу никакая IDE для этого не понадобилась, и жалания такого не возникло. А самое главное — особой гибкости в вашем решении пока не видно. А вот недостатки записи решений в виде диаграмм как раз хорошо известны, и именно гибкость решений как раз от них и страдает в первую очередь.


    Может, поэтому никто и не пользуется/посты не пишет, что есть более простые (и при этом адекватные) Camel или Spring Integration?

    • 0
      Заявление в стиле «зачем вам MS если есть Linux» или vice versa.
      Для интеграции кроме упомянутых вами Camel и Spring есть еще куча проприетарных (MS, IBM, Oracle etc), опенсоурсных и промежуточных (типа MulrSoft) решений. Зачем они все казалось бы?
      • 0

        Заметьте, я ни слова вообще не сказал насчет проприетарности (да и всего остального, что вы перечислили). Мое основное возражение касается ровно одной вещи — что заявленная большая гибкость описанного средства в этом посте не раскрыта.

    • 0

      Кстати, оно умеет не только на Java, а и на Perl генерировать код. Но насколько всё проще получается на чистом перле...

      • 0
        Ковырял таленд. Простые вещи я и сам налабаю, как написали выше, всё и так легко. А вот сложные таленд сам не сделает, надо делать свои компоненты — а это уже сильно нетривиальная задача.
        Опять же генерация кода, компиляция его потом — как-то это всё негибко. В итоге у меня свой велосипед, чем-то похожий на nifi…

        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.