A blog about SQL Server, SSIS, C# and whatever else I happen to be dealing with in my professional life.

Find ramblings

Friday, March 23, 2012

Replicate tables with SSIS EzAPI

This post is out of order for the recipe series but there was a need on dba.stackexchange.com for this solution. Add a new class to the solution named ReplicateOMatic. This class will create a new package with a pair of connection managers and configure a data flow to copy all of the between paired tables. Modify the query in the method GenerateTableList to create the correct list of tables.
namespace EzAPIRecipies
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using Microsoft.SqlServer.SSIS.EzAPI;
    using Microsoft.SqlServer.Dts.Runtime;

    public class ReplicateOMatic : EzSrcDestPackage<EzOleDbSource, EzSqlOleDbCM, EzOleDbDestination, EzSqlOleDbCM>
    {
        public static string connectionStringTemplate = @"Data Source={0};Initial Catalog={1};Provider=SQLNCLI10.1;Integrated Security=SSPI;Packet Size=32767;";

        public ReplicateOMatic(Package p) : base(p) { }

        public static implicit operator ReplicateOMatic(Package p) { return new ReplicateOMatic(p); }
        
        /// <summary>
        /// Create a package with a single data flow, 2 variables.
        /// The data flow will contain an OLEDB source and destination component.
        /// We will use fastload options, identity inserts, table locking etc.
        /// </summary>
        /// <param name="sourceServer">Source computer name (UTUMNO)</param>
        /// <param name="sourceDatabase">Source catalog name (DocumentsOld)</param>
        /// <param name="table">Table to be replicated [schema].[tablename]</param>
        /// <param name="destinationServer">Destination computer name (WESTMARCH)</param>
        /// <param name="destinationDatabase">Destination catalog name (Documents)</param>
        public ReplicateOMatic(string sourceServer, string sourceDatabase, string table, string destinationServer, string destinationDatabase)
            : base()
        {
            string saniName = TableToTable.SanitizeName(table);
            string sourceQuery = string.Format("SELECT D.* FROM {0} D", table);

            ////////////////////////////////////////////////////
            // Pacakge settings
            ////////////////////////////////////////////////////
            this.Name = string.Format("Replicate{0}", saniName);

            /////////////////////////////////////////////////////
            // Define package variables
            /////////////////////////////////////////////////////
            this.Variables.Add("sourceQuery", false, "User", sourceQuery);
            this.Variables.Add("tableName", false, "User", table);

            /////////////////////////////////////////////////////
            // Configure DataFlow properties
            /////////////////////////////////////////////////////
            this.DataFlow.Name = string.Format("DFT_{0}", saniName);
            this.DataFlow.Description = "Scripted replication";

            /////////////////////////////////////////////////////
            // Connection manager configuration
            /////////////////////////////////////////////////////
            this.SrcConn.ConnectionString = string.Format(ReplicateOMatic.connectionStringTemplate, sourceServer, sourceDatabase);
            this.SrcConn.Name = string.Format("SRC_{0}", sourceDatabase);
            this.SrcConn.Description = string.Empty;

            this.DestConn.ConnectionString = string.Format(ReplicateOMatic.connectionStringTemplate, destinationServer, destinationDatabase);
            this.DestConn.Name = string.Format("DST_{0}", destinationDatabase);
            this.DestConn.Description = string.Empty;

            /////////////////////////////////////////////////////
            // Configure Dataflow's Source properties
            /////////////////////////////////////////////////////
            this.Source.Name = "Src " + saniName;
            this.Source.Description = string.Empty;
            this.Source.AccessMode = AccessMode.AM_SQLCOMMAND;
            this.Source.SqlCommand = sourceQuery;

            /////////////////////////////////////////////////////
            // Configure Dataflow's Destination properties
            /////////////////////////////////////////////////////
            this.Dest.Name = "Dest " + saniName;
            this.Dest.Description = string.Empty;
            this.Dest.Table = table;
            this.Dest.FastLoadKeepIdentity = true;
            this.Dest.FastLoadKeepNulls = true;
            this.Dest.FastLoadOptions = "TABLOCK,CHECK_CONSTRAINTS";
            this.Dest.AccessMode = AccessMode.AM_OPENROWSET_FASTLOAD_VARIABLE;
            this.Dest.DataSourceVariable = this.Variables["tableName"].QualifiedName;

            this.Dest.LinkAllInputsToOutputs();
        }

        /// <summary>
        /// Generate a list of all the tables in schema foo or bar
        /// </summary>
        /// <returns>Returns a list of all tables matching the query criteria. Format is [schema].[table]</returns>
        public static List<string> GenerateTableList(string server, string catalog)
        {
            List<string> tables = null;
            using (System.Data.SqlClient.SqlConnection connection = new System.Data.SqlClient.SqlConnection(string.Format("server={0};integrated security=sspi;database={1};", server, catalog)))
            {
                connection.Open();
                string sql = @"
                    SELECT
                        quotename(schema_name(T.schema_id)) + '.' + quotename(T.name)
                    FROM
                        sys.tables T
                    WHERE
                        T.schema_id = schema_id('foo')
                        OR T.schema_id = schema_id('bar')
                    ORDER BY
                    1";
                using (System.Data.SqlClient.SqlCommand command = new System.Data.SqlClient.SqlCommand(sql, connection))
                {
                    command.CommandType = System.Data.CommandType.Text;
                    using (System.Data.SqlClient.SqlDataReader reader = command.ExecuteReader())
                    {
                        tables = new List<string>();
                        while (reader.Read())
                        {
                            tables.Add(reader[0].ToString());
                        }
                    }
                }
            }
            return tables;
        }

        /// <summary>
        /// Sanitize a name so that it is valid for SSIS objects. 
        /// Strips []/\:=
        /// Replaces . with _
        /// </summary>
        /// <param name="name"></param>
        /// <returns></returns>
        public static string SanitizeName(string name)
        {
            string saniName = name.Replace("[", String.Empty).Replace("]", string.Empty).Replace(".", "_").Replace("/", string.Empty).Replace("\\", string.Empty).Replace(":", string.Empty);
            return saniName;
        }
    }
}

Usage

This code would be added to the original program. Call it from the Main and modify the code to reflect the correct server and catalog names
        /// <summary>
        /// Build a set of SSIS packages that take the form of ReplicateSchema_Table.dtsx
        /// </summary>
        public static void ReplicateTables()
        {
            string outputFolder = @"C:\sandbox\SSISHackAndSlash\SSISHackAndSlash";
            string sourceServer = @"ANGBAND";
            string sourceDatabase = "OldDocuments";
            string destinationServer = @"UTUMNO";
            string destinationDatabase = "Documents";
            string outputFile = string.Empty;
            bool tweak = true;
            
            foreach (string table in ReplicateOMatic.GenerateTableList(sourceServer, sourceDatabase))
            {
                ReplicateOMatic rom = new ReplicateOMatic(sourceServer, sourceDatabase, table, destinationServer, destinationDatabase);
                outputFile = System.IO.Path.Combine(outputFolder, string.Format("{0}.dtsx", rom.Name));
                // This section is optional but I like to tweak things
                if (tweak)
                {
                    Package p = null;
                    Application app = null;
                    app = new Application();
                    p = new Package();

                    // Connection managers are easier through EzAPI
                    EzSqlOleDbCM loggingCM = new EzSqlOleDbCM(p);
                    string loggingCatalog = string.Empty;
                    //loggingCatalog = "SSISDB";
                    loggingCatalog = "SRC_msdb"; 
                    loggingCM.Name = loggingCatalog;
                    loggingCM.ConnectionString = string.Format(ReplicateOMatic.connectionStringTemplate, sourceServer, loggingCatalog);

                    // Serialize the EzAPI to XML and load our package from it
                    p.LoadFromXML(rom.SaveToXML(), null);

                    ///////////////////////////////////////////////////////////////
                    // Enable SQL Server configuration, assumes 
                    // an OLE DB connection manager named SSISDB exists
                    // uses a table called dbo.sysdtsconfig
                    // Configuration key is Default.2008.SalesDB
                    ///////////////////////////////////////////////////////////////
                    //p.EnableConfigurations = true;
                    //Configuration c = null;
                    //c = p.Configurations.Add();
                    //c.Name = "SalesDB";
                    //c.ConfigurationType = DTSConfigurationType.SqlServer;
                    //c.ConfigurationString = @"""SSISDB"";""[dbo].[sysdtsconfig]"";""Default.2008.SalesDB""";

                    ///////////////////////////////////////////////////////////////
                    // Enable logging
                    // Add log provider and such
                    // http://msdn.microsoft.com/en-us/library/microsoft.sqlserver.dts.runtime.logproviders.add.aspx
                    // ProgID: DTS.LogProviderSQLServer.2
                    ///////////////////////////////////////////////////////////////
                    DTSEventColumnFilter eventFilter = new DTSEventColumnFilter();
                    eventFilter.Computer = true;
                    eventFilter.DataBytes = true;
                    eventFilter.ExecutionID = true;
                    eventFilter.MessageText = true;
                    eventFilter.Operator = true;
                    eventFilter.SourceID = true;
                    eventFilter.SourceName = true;

                    p.LoggingMode = DTSLoggingMode.Enabled;

                    // Define the events we care about
                    string[] notableEvents = new string[] { "OnError", "OnInformation", "OnPostExecute", "OnPreExecute", "OnWarning", "OnTaskFailed" };

                    // Add and configure the sql log provider
                    LogProvider provider = p.LogProviders.Add("DTS.LogProviderSQLServer.2");

                    provider.ConfigString = loggingCM.Name;
                    provider.Name = "SSIS log provider for SQL Server";
                    provider.Description = "Writes log entries for events to a SQL Server database";
                    p.LoggingOptions.SelectedLogProviders.Add(provider);

                    LoggingOptions options = p.LoggingOptions;
                    // load up the events we care about
                    options.EventFilter = notableEvents;
                    options.EventFilterKind = DTSEventFilterKind.Inclusion;

                    // configure the specifics of how an event should be logged
                    foreach (string item in notableEvents)
                    {
                        options.SetColumnFilter(item, eventFilter);
                    }
                    app.SaveToXml(outputFile, p, null);
                }
                else
                {
                    rom.SaveToFile(outputFile);
                }
            }
        }

No comments: