Saturday, 3 December 2011

Redirect duplicate rows

Case
I have duplicate records in my source which I want to redirect to a separate destination in my data flow task. Most solutions like the Aggregate Transformation or the Sort Transformation with "Remove rows with duplicate sort values"  selected, will remove the records instead of redirecting them.

Example: If there are three identical records (two columns), then two of them should be redirected as duplicate so that I can capture them in an error destination.

Solution
This solution uses a TSQL order by or a SSIS sort in combination with a Script Component to detect and redirect all duplicate records.

If your source is a SQL Server table you could probably come up with a fancy TSQL deduplication script to do the same in less processing time, but this solution is easy and also works for other kind of sources. Even if for example you only want to compare two out of five columns.

1) Source and Sort
Add your source to the Data Flow Task. You need to sort on the columns you want to use for deduplication. If your source is a database table, then you should add a GROUP BY clause to the query else you should add a SORT Transformation right after your source.
Add a sorted source


















2) Add Script Component
Add a Script Component type Transformation after your sorted source and give it a suitable name.
Add Script Component





















3) Edit Script Component: Input Columns
Edit the Script Component and go to the Input Columns tab and select all the columns you want to use for deduplication as ReadOnly (the same fields as you used for sorting).
Select columns.Type doesn't matter.





















4) Edit Script Components: Inputs and Outputs, Edit Output
Go to the Inputs and Outputs tab and change the name of the Output port from "Output 0" to "Unique". Also change the ExclusionGroup to 1.
Rename default output port





















5) Edit Script Components: Inputs and Outputs, New output
Add a new Output and give it the name Duplicate and change the ExlusionGroup to 1. To connect this new output port to the input port, change the SynchronousInputID property and select the input port.
New Output port






















6) The Script
Edit the Script and copy the following code. This script uses reflection to get all selected columns so that you don't have to change the script if you change the input columns. But read the coding comments.
// C# code
// This script automaticly compares the selected columns, but there is one 'bug':
// You have to edit and close this script again if you change input columns.
using System;
using System.Data;
using System.Reflection;                                // Added
using System.Text;                                      // Added
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
    // Create a variable to store the concatenated values for the previous row
    string previousRow = "";

    public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        // Create a variable to store the concatenated values of current row
        StringBuilder currentRow = new StringBuilder();

        foreach (PropertyInfo p in Row.GetType().GetProperties())
        {
            // We can't use the _IsNull columns, so ignore them. Also ignore the new output column
            if ((p.Name.ToLower().EndsWith("_IsNull") == false) && (p.Name.Equals("Duplicate") == false))
            {
                try
                {
                    // Concatenate value as string to variable
                    currentRow.Append(p.GetValue(Row, null).ToString() + "|");
                }
                catch (ArgumentException)
                {
                    // If the value is NULL (empty) then you can't get the value of it
                    currentRow.Append("NULL|");
                }
                catch (Exception ex)
                {
                    // Raise error because something unexpected went wrong
                    bool pbCancel = false;
                    this.ComponentMetaData.FireError(0, "MarkDuplicates", p.Name + ": " + ex.Message, string.Empty, 0, out pbCancel);
                }
            }
        }

        // Check if the current row and previous row are the same
        if (currentRow.ToString().Equals(previousRow))
        {
            // Redirect to duplicate output
            Row.DirectRowToDuplicate();
        }
        else
        {
            // Redirect to unique output
            Row.DirectRowToUnique();
        }

        // Fill previous row with current value for next check
        previousRow = currentRow.ToString();
    }
}

or VB.Net
' VB.Net code
' This script automaticly compares the selected columns, but there is one 'bug':
' You have to edit and close this script again if you change input columns.

Imports System
Imports System.Data
Imports System.Math
Imports System.Reflection                           ' Added
Imports System.Text                                 ' Added
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper

<Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute> _
<CLSCompliant(False)> _
Public Class ScriptMain
    Inherits UserComponent

    ' Create a variable to store the concatenated values for the previous row
    Dim previousRow As String = ""

    Public Overrides Sub Input0_ProcessInputRow(ByVal Row As Input0Buffer)
        ' Create a variable to store the concatenated values of current row
        Dim currentRow As StringBuilder = New StringBuilder()

        Dim p As PropertyInfo
        For Each p In Row.GetType().GetProperties()
            ' We can't use the _IsNull columns, so ignore them. Also ignore the new output column
            If ((p.Name.ToLower().EndsWith("_IsNull") = False) And (p.Name.Equals("Duplicate") = False)) Then
                Try
                    ' Concatenate value as string to variable
                    currentRow.Append(p.GetValue(Row, Nothing).ToString() + "|")
                Catch ex As ArgumentException
                    ' If the value is NULL (empty) then you can't get the value of it
                    currentRow.Append("NULL|")
                Catch ex As Exception
                    ' Raise error because something unexpected went wrong
                    Dim pbCancel As Boolean
                    Me.ComponentMetaData.FireError(0, "MarkDuplicates", p.Name + ": " + ex.Message, String.Empty, 0, pbCancel)
                End Try
            End If
        Next

        ' Check if the current row and previous row are the same
        If (currentRow.ToString().Equals(previousRow)) Then
            ' Redirect to duplicate output
            Row.DirectRowToDuplicate()
        Else
            ' Redirect to unique output
            Row.DirectRowToUnique()
        End If

        ' Fill previous row with current value for next check
        previousRow = currentRow.ToString()
    End Sub
End Class


6) Destinations
Add two destinations and connect the Data Flow Paths to the destinations. When connecting you will have to select the output port.
Choose right output















7) Testing
I added some dataviewers for testing purposes and selected the first three columns for deduplication.
The Result

























All roads lead to Rome, so let me know what your solution is!
Update: here is an other cool solution that doesn't need sorting.

15 comments:

  1. Very useful and helpful article.
    The only one that give me the way to properly manage duplicates from flat files.

    Key words for french search : SSIS et gestion des doublons.

    ReplyDelete
  2. Very useful article. It can even be used for redirecting error rows as well

    Thanks for the explanations

    ReplyDelete
  3. excellent article......but same can be achieved using Aggregate transform which is more efficient and takes less time.

    ReplyDelete
    Replies
    1. @Mohd: All roads lead to Rome, but please show me how you would accomplish the same result with the Aggregate Transformation. I'm curious about your solution.

      Delete
    2. MATE, you cant use Aggregate, you'll lose one column

      Delete
  4. Hi Joost,

    the source used was XML source. I followed all step unique records coming properly but duplicate records are not getting displayed. If I put debugger, the script is just running not able to stop to execute and check why the duplicate key is not working. Can you please help?

    ReplyDelete
    Replies
    1. Hi Kavitha,

      Step 4 and 5 are the most inmportant. Make sure you followed each step thoroughly. Contact me via the contact form.

      1) Did you have any errors?
      2) Did you add input columns without re-editing the script?
      3) What are the data types?
      4) Which version of SSIS do you use and are you using VB or C#?

      Delete
  5. Hi Joost,

    :) It is working fine now, I am able to get Duplicate values. The mistake I did was in sort transformation I have selected check box remove duplicated values, so in sort transformation it was filtering all duplicate values. I am using sql server 2008.. Thanks for the script. I am totally new to this. Thank you very much for helping.

    ReplyDelete
  6. This is a great article. I needed to get the row count of the duplicates found so we could fail the entire load and reject the source file. This provided that row count. Does anyone know if the sort transformation will have a redirect option in the next release of SSIS?

    ReplyDelete
    Replies
    1. You can add a request for that at Connect and then promote it...

      Delete
  7. Thank you so much it helped me a lot in removing the duplicates from table as well as from the flat file.

    ReplyDelete
  8. That can be achieved by changing data source from table to query. Adding a column there in query with ROW_NUmber (partition by.. group by clause). Use CTE to select all rows with new column (row_number). Conditional split all records based on ROW_NUmber value.

    ReplyDelete
  9. I am new to SSIS - and that was pretty cool. I understand what is going on but need to review code to understand how you make it work.. super post.. Thanks

    ReplyDelete
  10. Appreciate for posting this article with steps in detail, Thank you so much

    ReplyDelete
  11. Awesome...Saved my day....~~ Thanks a bunch

    ReplyDelete

Please use the SSIS MSDN forum for general SSIS questions that are not about this post. I'm a regular reader of that forum and will gladly answer those questions over there.

All comments are moderated manually to prevent spam.