Pull to refresh

SQL Server Integration Services (SSIS) для начинающих – часть 3

Reading time 10 min
Views 38K

Часть 1
Часть 2

В этой части я расскажу о работе с параметрами и переменными внутри SSIS-пакета. Узнаем, как можно задавать и отслеживать значения переменных во время выполнения пакета.

Также рассмотрим вызов одного пакета из другого при помощи «Execute Package Task» и некоторые дополнительные компоненты и решения.

Здесь тоже будет много картинок.

Продолжим знакомство с SSIS


Создадим в трех демонстрационных базах новую таблицу ProductResidues, которая будет содержать информацию об остатках на каждый день:

USE DemoSSIS_SourceA
GO

CREATE TABLE ProductResidues(
  ResidueDate date NOT NULL,
  ProductID int NOT NULL,
  ResidueAmount decimal(10,2) NOT NULL,
CONSTRAINT PK_ProductResidues PRIMARY KEY(ResidueDate,ProductID),
CONSTRAINT FK_ProductResidues_ProductID FOREIGN KEY(ProductID) REFERENCES Products(ID)
)
GO

USE DemoSSIS_SourceB
GO

CREATE TABLE ProductResidues(
  ResidueDate date NOT NULL,
  ProductID int NOT NULL,
  ResidueAmount decimal(10,2) NOT NULL,
CONSTRAINT PK_ProductResidues PRIMARY KEY(ResidueDate,ProductID),
CONSTRAINT FK_ProductResidues_ProductID FOREIGN KEY(ProductID) REFERENCES Products(ID)
)
GO

USE DemoSSIS_Target
GO

CREATE TABLE ProductResidues(
  ResidueDate date NOT NULL,
  ProductID int NOT NULL,
  ResidueAmount decimal(10,2) NOT NULL,
CONSTRAINT PK_ProductResidues PRIMARY KEY(ResidueDate,ProductID),
CONSTRAINT FK_ProductResidues_ProductID FOREIGN KEY(ProductID) REFERENCES Products(ID)
)
GO

И наполним таблицы в источниках тестовыми данными, поочередно выполнив на базах DemoSSIS_SourceA и DemoSSIS_SourceB следующий скрипт:

USE DemoSSIS_SourceA
--USE DemoSSIS_SourceB
GO

DECLARE @MinDate date=DATEADD(MONTH,-2,GETDATE())
DECLARE @MaxDate date=GETDATE()

;WITH dayCTE AS(
  SELECT CAST(@MinDate AS date) ResidueDate,10000 ResidueAmount

  UNION ALL

  SELECT DATEADD(DAY,1,ResidueDate),ResidueAmount-1
  FROM dayCTE
  WHERE ResidueDate<@MaxDate
)
INSERT ProductResidues(ResidueDate,ProductID,ResidueAmount)
SELECT
  d.ResidueDate,
  p.ID,
  d.ResidueAmount
FROM dayCTE d
CROSS JOIN Products p
OPTION(MAXRECURSION 0)

Допустим, что в таблице ProductResidues будет очень много строк, и чтобы каждый раз не перезагружать всю информацию и упростить процедуру интеграции, логику загрузки в таблицу ProductResidues в базе DemoSSIS_Target реализуем следующую:

  1. Если в принимающей БД еще нет записей, то загрузим все данные;
  2. Если в принимающей БД есть данные, то будем удалять данные за неделю (последние 7 дней) от последней загруженной даты и загружать данные из источника начиная от этой даты снова.

Неудобство интеграции таблиц типа ProductResidues в том, что в ней нет полей, по которым можно было бы однозначно определить, когда появилась запись, в ней нет никакого идентификатора типа ID, нет ни поля типа UpdatedOn, которое содержало бы дату/время последнего обновления записи (т.е. зацепиться особо не за что), иначе бы мы, например, могли вычислить для каждого источника, по данным базы DemoSSIS_Target, последний ID или последнюю UpdatedOn и уже загружать данные из источников начиная от этих стартовых значений. К тому же не всегда есть возможность внести изменения в структуру источников, подстроив их под себя, т.к. это могут быть вообще чужие базы, к которым у нас нет полного доступа.

В нашем же случае еще допустим, что пользователи могут менять данные задним числом и могут вообще удалить некоторые ранее загруженные в базу DemoSSIS_Target строки из источников. Поэтому здесь обновление делается как-бы внахлёст, данные последней недели полностью перезаписываются. Здесь неделя берется условно, об этом минимальном сроке, например, мы могли бы условиться с заказчиком (он мог подтвердить, что данные обычно меняются максимум в течении недели). Конечно это не самый надежный способ, и иногда могут возникнуть расхождения, например, в том случае, когда пользователь поменял данные месячной давности и здесь стоит предусмотреть возможность перезагрузки данных начиная с более ранней даты, мы сделаем это при помощи параметра, в котором будем указывать нужное нам количество дней назад.

Создадим новый SSIS-пакет и назовем его «LoadResidues.dtsx».

При помощи контекстного меню, отобразим область ввода переменных пакета (это можно сделать также при помощи меню «SSIS → Variables»):



В данном пакете, нажав на кнопку «Add Variable», создадим переменную LoadFromDate типа DateTime:



По умолчанию переменной присваивается текущее значение даты/времени, т.к. мы будем переопределять значение внутри пакета, нам это не важно.
Так же стоит обратить внимание на Expression – если прописать в этом поле выражение, то переменная будет работать как формула и мы не сможем изменять ее значение при помощи присваивания. При каждом обращении к такой переменной ее значение будет рассчитываться согласно указанному выражению.

Значения переменных можно задавать при помощи компонента «Expression Task». Давайте рассмотрим, как это делается. Создадим элемент «Expression Task»:



Двойным щелчком откроем редактор данного элемента:



Пропишем следующее выражение:

@[User::LoadFromDate] = (DT_DATE) (DT_DBDATE) GETDATE()

Здесь так же была применена двойная конвертация типа, чтобы избавиться от составляющей времени и оставить только дату.

Давайте так же посмотрим как можно отследить значение переменной во время выполнения пакета.

Создадим точку останова на элементе «Expression Task»:



Укажем, что точка должна срабатывать по окончанию выполнения данного блока:



Запустим пакет на выполнение (F5) и после остановке в нашей точке, перейдем на вкладку «Locals»:



Раскроем список Variables и найдем в нем свою переменную:



Для интереса можем поменять выражение элементе «Expression Task» на следующее:

@[User::LoadFromDate] = (DT_DATE) (DT_DBDATE) DATEADD("DAY", -7, GETDATE())

и также поэкспериментировать:



Точку останова убирается таким же образом каким и была установлена. Либо можно удалить сразу все точки останова, если их было несколько:



Создадим параметр, который будет отвечать за количество дней назад.

Параметр можно создать, как глобальный для всего проекта:



Так и локальный, внутри конкретного пакета:



Параметр может быть обязательный для задания – за это отвечает флаг Required. Если этот флаг установлен, то при создании задачи или при вызове пакета из другого пакета нужно будет определить входящее значение параметра (мы рассмотрим это далее).
В отличие от переменных значение параметров при помощи «Expression Task» менять нельзя.

Сохраним параметры и снова зайдем в редактор «Expression Task»:



Для примера я поменял выражение на следующее:

@[User::LoadFromDate] = (DT_DATE) (DT_DBDATE) DATEADD("DAY", - @[$Package::DateOffset] , GETDATE())

Думаю, на этом суть параметров и переменных ясна, и мы можем продолжить.

После того как мы поигрались с «Expression Task» мы его удалим.

Создадим «Execute SQL Task»:



Настроим его следующим образом:



Пропишем в SQLStatement следующий запрос:

SELECT ISNULL(DATEADD(DAY,-?,MAX(ResidueDate)),'19000101') FromDate
FROM ProductResidues

Т.к. данный запрос возвращает одну строку, установим ResultSet = «Single Row» и ниже на вкладке «Result Set» сохраним результат в значение переменной LoadFromDate.

На вкладке «Parameter Mapping» зададим значения параметров, которые в запросе обозначены знаком вопроса (?):



Параметры нумеруются, начиная с нуля.
Стоит отметить, что если при создании соединения воспользоваться другим видом провайдера, например, «ADO» или «ADO.Net», то вместо вопросов мы сможем использовать именованные параметры типа @ParamName и в качестве «Parameter Name» тоже могли бы указывать @ParamName, а не его номер. Но увы типом соединения с другим провайдером мы сможем воспользоваться не во всех случаях.

Теперь на вкладке «Result Set» укажем в какую переменную нужно записать результат выполнения запроса:



Здесь так же можем продебажить установив у этого компонента точку останова на «Break when the container receives the OnPostExecute event» и запустив пакет на выполнение:



Здесь я для удобства мониторинга значения переменной прописал название переменной в «Watch», чтобы не искать ее в блоке «Locals».

Как видим все верно в переменную LoadFromDate записалась дата «01.01.1900», т.к. строк в таблице ProductResidues на Target еще нет.

Переименую для наглядности «Execute SQL Task» в «Set LoadFromDate».

Создадим еще один элемент «Execute SQL Task» и назовем его «Delete Old Rows»:



Настроим его следующим образом:



SQLStatement содержит следующий запрос:

DELETE ProductResidues
WHERE ResidueDate>=?

И зададим значение параметра на вкладке «Parameters Mapping»:



Все, удаление старых данных за указанный конечный период у нас реализовано.

Теперь сделаем часть отвечающую загрузку свежих данных. Для этого воспользуемся компонентом «Data Flow Task»:



Зайдем в область данного компонента и создадим «Source Assistant»:



Настроим его следующим образом:



Нажав на кнопку «Parameters…» зададим значение параметра:



Для записи новых данных воспользуемся уже знакомым компонентом «Destination Assistant»:



Протянем стрелку от «Source Assistant» и настроим его:







Все, пакет для переноса данных с источника SourceA у нас готов, можем запустить его на выполнение:



Запустим еще раз:



Как видим при повторном запуске удалились и залились только данные за указанный период. Ну вроде все работает так как мы и хотели. На всякий случай сверимся, что данные получателя по количеству равны данным источника:



Дойдя до сюда, я понял, что я допустил ошибку. Кто понял в чем дело, молодец!

Но может это и хорошо, т.к. пример получился не таким перегруженным.

Ошибка в том, что я забыл учесть, что при интеграции данных таблицы Products у нас в Target формируются свои идентификаторы (поле ID с флагом IDENTITY)!

Давайте переделаем, чтобы все было правильно. Ничего страшного повторим, зато лучше запомним.

Забежим чуть вперед и добавим в пакет еще один параметр, который назовем «SourceID»:


Перенастроим «Set LoadFromDate»:



В SQLStatement пропишем новый запрос с учетом SourceID:

SELECT ISNULL(DATEADD(DAY,-?,MAX(res.ResidueDate)),'19000101') FromDate
FROM ProductResidues res
JOIN Products prod ON res.ProductID=prod.ID
WHERE prod.SourceID=?

Настроим второй параметр:



Теперь перенастроим «Delete Old Rows» аналогичным образом, чтобы учитывался SourceID:



В SQLStatement пропишем новый запрос с учетов SourceID:

DELETE res
FROM ProductResidues res
JOIN Products prod ON res.ProductID=prod.ID
WHERE ResidueDate>=?
  AND prod.SourceID=?

Настроим второй параметр:



Теперь зайдем в «Data Flow Task», удалим цепочку и добавим «Derived Column»:



Настроим его следующим образом:



Здесь я намеренно оставил тип «Unicode string», а не сделал преобразование как в первой части. Давайте за одно рассмотрим компонент «Data Conversion»:



Настроим его:



Теперь при помощи Lookup сделаем сопоставление и получим нужные нам идентификаторы продуктов:



Настроим его:







Теперь протянем синюю стрелку от Lookup к «OLE DB Destination»:



Выберем поток «Lookup Match Output»:



Настроим «OLE DB Destination», нужно перестроить Mappings:



Все, сделаем очистку таблицы от неправильно загруженных данных:

TRUNCATE TABLE DemoSSIS_Target.dbo.ProductResidues

И запустим пакет на выполнение:



И еще раз:



Похоже на правду. Можете самостоятельно проверить правильно ли разнеслись идентификаторы продуктов.

Так как структура DemoSSIS_SourceA и DemoSSIS_SourceB одинакова и нам нужно сделать для DemoSSIS_SourceB, то же самое, то мы можем при создании задачи создать два шага для пакета «LoadResidues.dtsx», в первом шаге настроить подключение к базе DemoSSIS_SourceA, а на втором шаге DemoSSIS_SourceB.

Откомпилируем и передеплоим SSIS проект:







Давайте теперь создадим новое задание в SQL Agent:



На вкладке Steps создадим шаг 1 для загрузки продуктов:



Создадим шаг 2 для загрузки остатков из SourceA:



На вкладке Configuration можем увидеть наши параметры:



Здесь от нас требуют ввести SourceID, т.к. мы указали его как Required. Зададим его:



Данные вкладки «Connection Managers» изменять для этого шаге не будем.

Создадим шаг 3 для загрузки остатков из SourceB:



Зададим параметр SourceID:



И изменим данные соединения SourceA таким образом, чтобы оно ссылалось на базу DemoSSIS_SourceB:



В данном случае мне достаточно было изменить ConnectionString и InitialCatalog, теперь они указывают на DemoSSIS_SourceB.

В итоге мы должны получить следующее – три шага:



Запустим эту задачу на выполнение:



Выполним запрос:

USE DemoSSIS_Target
GO

SELECT prod.SourceID,COUNT(*)
FROM ProductResidues res
JOIN Products prod ON res.ProductID=prod.ID
GROUP BY prod.SourceID

И убедимся, что все отработало как надо:



Теперь допустим, что базы DemoSSIS_SourceA и DemoSSIS_SourceB расположены на одном экземпляре SQL Server. Давайте переделаем «OLE DB Source»:





Текст команды:

DECLARE @SourceID char(1)=?

IF(@SourceID='A') USE DemoSSIS_SourceA
ELSE  USE DemoSSIS_SourceB

SELECT
  ResidueDate,
  ProductID,
  ResidueAmount
FROM ProductResidues
WHERE ResidueDate>=?

Зададим параметры:



Теперь наш пакет в зависимости от значения параметра SourceID будет брать данные либо из SourceA, либо из SourceB.

Для теста можете изменить значение параметра SourceB на «B» и запустить проект на выполнение:





Давайте теперь создадим новый пакет «LoadAll.dtsx» и создадим в нем «Execute Package Task» (переименуем его в «Load Products»):



Настроим «Load Products»:



Создадим в новом пакете 2 параметра:



В области «Control Flow» создадим еще 2 компонента «Execute Package Task» которые назовем «Load Resudues A» и «Load Resudues B»:



Настроим их задав у обоих название пакета «LoadResidues.dtsx»:



Зададим обязательный параметр SourceID для «Load Resudues A»:



Зададим обязательный параметр SourceID для «Load Resudues B»:



Обратите внимание, что у стрелок тоже есть свои свойства, например, мы можем поменять свойство Value на Completion, что будет означать, что следующий шаг будет выполнен даже в том случае если на шаге «Load Resudues A» произойдет ошибка:



Все, можем запустить пакет на выполнение:



Думаю, объяснять, что здесь произошло нет смысла.

Иногда параметры для конкретного пакета удобно хранить в вспомогательной таблице и считывать их оттуда в переменные пакета, например, используя для поиска глобальную переменную «System::PackageName». Для демонстрации, давайте переделаем наш пакет таким образом.

Создадим таблицу с параметрами:

USE DemoSSIS_Target
GO

CREATE TABLE IntegrationPackageParams(
  PackageName nvarchar(128) NOT NULL,
  DateOffset int NOT NULL,
CONSTRAINT PK_IntegrationPackageParams PRIMARY KEY(PackageName)
)
GO

INSERT IntegrationPackageParams(PackageName,DateOffset)VALUES
(N'LoadResidues',7)
GO

Удалим параметр DateOffset из пакета «LoadResidues.dtsx»:



Создадим в пакете переменную DateOffset:



В область «Control Flow» добавим еще один элемент «Execute SQL Task» и переименуем его «Load Params»:



Настроим его:



Запрос в SQLStatement пропишем следующий:

SELECT DateOffset
FROM IntegrationPackageParams
WHERE PackageName=?

Настроим параметр запроса используя системную переменную «System::PackageName»:



Осталось сбросить результат выполнения запроса в переменную:



Теперь осталось перенастроить «Set LoadFromDate», чтобы в нем использовалась теперь переменная:





Все, можем тестировать новую версию пакета.

Вот мы и добрались до финиша. Мои поздравления!

Заключение по третьей части


Уважаемые читатели, эта часть будет заключительной.

В данном цикле статей, я постарался продумать примеры таким образом, чтобы сделать их как можно короче и в свою очередь охватить как можно больше полезных и важных деталей.

Думаю, освоив это, далее вы уже без особого труда сможете освоить работу с остальными компонентами SSIS. В данных статьях я рассмотрел только самые важные компоненты (наиболее часто применяемые на моей практике), но зная только это вы уже можно сделать очень многое. По мере надобности изучайте самостоятельно другие компоненты, в первую очередь порекомендовал бы посмотреть следующее:

  • компоненты-контейнеры (Sequence Container, For Loop Container, Foreach Loop Container);
  • «Merge Join», который позволяет сделать операции JOIN, LEFT JOIN, FULL JOIN на стороне SSIS (это требует предварительно отсортировать два набора при помощи «Sort»);
  • «Conditional Split», который позволяет разбить один поток на несколько в зависимости от условий;
  • так же рассмотрите вкладку «Event Handlers», которая позволяет создать дополнительные области для определенного вида события пакета.

Доступного материала на эту тему очень много, используйте MSDN, Youtube и прочие источники.

Я не старался сделать подробный учебник (думаю, все это уже есть), а старался сделать такой материал, который позволит начинающим шаг за шагом создать все с самого нуля и делая все своими руками увидеть всю картину в целом, а после уже имея основу идти дальше самостоятельно. Очень надеюсь, что у меня это получилось и материал окажется полезен именно в таком ключе.

Я очень рад, что мне хватило сил осуществить задуманное и описать все так как я это хотел, даже получилось сделать большее, так как из-за допущенной в этой части ошибки возникли неожиданные повороты сюжета, но так я думаю, стало даже интересней. ;)

Спасибо за внимание! Удачи!

И возможно, до встреч в новых статьях…
Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
+8
Comments 2
Comments Comments 2

Articles