Sunday, 10 November 2013

Execute multiple child packages in parallel with loop

Case
I used a foreach loop to execute all my staging packages, but my server isn't using all resources. Is there a way to execute multiple child packages at once, but with a loop?

Executing packages sequentially






















Solution
The trick is to use a queue of packages and to have multiple 'processes' taking packages from the queue to execute them. The number of packages that can be executed at a time depends on the complexity of your packages. Staging packages for a single source are not complex, so a good guideline/starting point is to execute one package per processor core.


4 cores => execute 4 child packages at the same time
















1) The Queue
For this example I will use a FIFO (first in, first out) queue that is stored in a database table. Alternatives could be the Windows Message Queue or the SQL Service Broker.
-- Create QUEUE table
CREATE TABLE [dbo].[FifoPackageQueue](
 [Id] [bigint] IDENTITY(1,1) NOT NULL,
 [Package] [varchar](50) NULL
) ON [PRIMARY]

GO

-- Add Index on Id
CREATE CLUSTERED INDEX cdxFifoPackageQueue on FifoPackageQueue (Id)

GO

-- Log tabel
CREATE TABLE [dbo].[FifoPackageQueueLog](
 [Id] [bigint] IDENTITY(1,1) NOT NULL,
 [Package] [varchar](50) NULL,
 [StartTime] [datetime] NULL,
 [EndTime] [datetime] NULL
) ON [PRIMARY]

GO


2) Variables and Parameter
For each 'execution line' in my control flow I need one SSIS string variable to store the package name. And I use one Package Parameter to indicate how many execution lines will be active. You can also use an integer variable for that if your SSIS version doesn't have parameters.

String variables, one per execution line









Integer Package Parameter.





3) Fill Queue
In this example I will use an INSERT INTO SELECT query to get all staging packages from SSISDB. You could also use regular INSERT queries or even a Foreach Loop or Script Task that loops through a folder to add packages from the file system to the queue.
INSERT INTO CONV_LOG..FifoPackageQueue
(
     [Package]
)
SELECT      Packages.[name]
FROM        [SSISDB].[internal].[packages] as Packages
INNER JOIN  [SSISDB].[internal].[projects] as Projects
            on Packages.project_version_lsn = Projects.object_version_lsn
WHERE       Projects.name = 'MyProject'
AND         Packages.name like 'STG%';


4) Execution Lines
Each Execution line starts with a Sequence Container connected to the Fill queue task. The Precedence Constraint Expression is @[$Package::ParallelProcesses] >= 1 for the first and @[$Package::ParallelProcesses] >= 2 for the second and so on.
Expression to limit the number of parallel executions


















5) Get first package from queue
The Execute SQL Task gets the first package name from the queue and stores it in the SSIS string variable. If the variable is empty then it doesn't continue to the next For Loop.
Output clause is available in SQL 2005





















set nocount on;

-- Declare temporary table to store the result of the delete
DECLARE @TmpTable TABLE (Package varchar(50));
-- Declare integer variable for counting the delete result
DECLARE @PackageCount int;

-- Select first record, lock it, delete it and store name in temporary table
WITH cte as (
   SELECT  top(1) Package
   FROM  FifoPackageQueue WITH (rowlock, readpast)
   ORDER BY Id
   )
DELETE FROM cte
output deleted.Package INTO @TmpTable

-- Check if there is 1 record in temporary table
SELECT @PackageCount = count(*) FROM @TmpTable
if @PackageCount = 1
BEGIN
 -- Return package name
 SELECT Package FROM @TmpTable
END
ELSE
BEGIN
 -- Temporary table was empty so queue was empty
 -- Return empty string to stop next precedence constraint
 SELECT '' as Package
END


Store package name in SSIS variable
























6) For Loop
The For Loop loops until the package name is empty.
Loop until empty























7) Log starttime
The first Execute SQL Task in the loop inserts the package name and a GETDATE() for the starttime in the log table with an INSERT query. The variable containing the package name is a parameter for this task. A very simple/basic log mechanisme. Adjust it to your needs or remove it if you have an other log mechanism.
INSERT INTO  FifoPackageQueueLog (Package, StartTime)
VALUES   (?, GETDATE())


8) Execute Package Task

Add an expression on the packagename so that it gets replaced with the value of the variable. In the properties of the task set DelayValidation = True. This will prevent errors if your variable is empty.





















9) Log endtime
The second Execute SQL Task in the loop logs the enddate with an UPDATE query. The variable containing the package name is a parameter for this task. The start- and enddate will help you choose the optimal number of parallel tasks.
UPDATE  FifoPackageQueueLog
SET   EndTime = GETDATE()
WHERE  Package = ?
AND   EndTime is null

10)  Get next package from queue
This is the exact same task/query as for getting the first package from the queue. If it can't find a package in the queue then it will fill the variable with an empty string and the For Loop will stop.

11) Multiple Execution lines
Repeat steps 4 to 10 an x times. Probably a couple more then you have processor cores in your server. Then start testing to find out the optimal number of parallel tasks.

12) Download example package
For SSIS 2012 I have added an example package for download. It contains 5 execution lines. Add more if you have more cores available. The example is provided for educational purposes only. This example package is not intended to be used in a production environment and has not been tested in a production environment. Test it thoroughly before using it.


Note: you will find a similar solution in the 10 Tips and Tricks for Better SSIS Performance presentation of David Peter Hansen and also in the SQL Rally Amsterdam presentation of Davide Mauri about Automating DWH Patterns Through Metadata. This example package is inspired by their solutions.

8 comments:

  1. Great Post. Quick question: How would handle multilevel dependency with this framework?

    ReplyDelete
    Replies
    1. This solution/framework is for independent packages only (for example staging). You can change the order of execution because it's a FIFO construction, but package 3 can still be busy while packages 4, 5 and 6 have already finished.
      The only thing you could do is to have multiple versions of this package. Each executing its own set of packages.

      Delete
  2. Joost - Immense piece of work. Thanks for sharing.

    ReplyDelete
  3. Nice Post Joost. I'm working on something similar to this. I have very limited working knowledge of SSIS So I have few questions. Please Advise

    1. Why are you deleting the records from the CTE when you can delete the record from the actual FIFO Queue table.?

    2. I have a similar requirement , but i keep the stored procedure commands in the Queue table instead of package details. There is only a single package that polls the Queue to execute the stored procedure and extracts the results into a file.

    3. Lets say i do not use Parallel For Each Loop containers. Instead, i create a master package and use four parallel execute package tasks. Now i have 8 records that are in Queue and each has a priority. The Package begins executing the first four as per priority. What if , the 3 of them complete and the fourth one is still processing. It would not allow the other 4 that are in Queue untill the fourth one is complete. So i would need to execute multiple copies of the master package using SQL Jobs so that all the records in queue get executed in parallel until the Queue is empty.

    4. Finally , im scheduling four jobs simultaneously at the same time , if i use rowlock,readpast hints on the Queue table then will it ensure the same record is not picked by more than one job?

    i'm using a code as below. Here , i'm updating the Processing Status instead of deleting the record from the Queue. Please advise.

    DECLARE @ReportExtractID int;
    SELECT @ReportExtractID = (SELECT TOP 1 ReportExtractID
    FROM ReportExtractQueue WITH (rowlock, readpast)
    WHERE ProcessingStatus = 'Q' -- InQueue
    ORDER BY Priority);
    IF @ReportExtractID IS NOT NULL
    BEGIN
    UPDATE ReportExtractQueue WITH (rowlock)
    SET ProcessingStatus = 'P' --InProgress
    WHERE ReportExtractID = @ReportExtractID
    SELECT @ReportExtractID AS ReportExtractID
    END;

    ReplyDelete
    Replies
    1. This is because I want to delete the first item/package from the queue, but also get its name so I can use it to execute the package. But there are probably more solutions to accomplish this (all roads lead to Rome). My queue table (only containing a packagename) slowly empties and the parallel loops stop when it's completely empty.

      In the Transact SQL forum on MSDN you ask the TSQL gurus your TSQL related questions.

      Delete
  4. Very good approach. But what would you use instead of package caller task, if the packages that should be executed have different list of paramters?
    I am working now on a SSIS packge to implemet this approach for a DW load, but since the packages have different list of parameters, I can't use the same package Caller task. I am thinking of using a script Task componenet, and call the packages using .Net Code. Do you have a better approach?

    ReplyDelete
    Replies
    1. If each package has a different set of parameter that are filled by the parent package then this trick doesn't work. You could try the script task for that.

      Delete

Please use the SSIS MSDN forum for general SSIS questions that are not about this post. I'm a regular reader of that forum and will gladly answer those questions over there.

All comments are moderated manually to prevent spam.