Pages

Friday, March 23, 2012

Replicate tables with SSIS EzAPI

This post is out of order for the recipe series but there was a need on dba.stackexchange.com for this solution. Add a new class to the solution named ReplicateOMatic. This class will create a new package with a pair of connection managers and configure a data flow to copy all of the between paired tables. Modify the query in the method GenerateTableList to create the correct list of tables.
namespace EzAPIRecipies
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using Microsoft.SqlServer.SSIS.EzAPI;
    using Microsoft.SqlServer.Dts.Runtime;

    public class ReplicateOMatic : EzSrcDestPackage<EzOleDbSource, EzSqlOleDbCM, EzOleDbDestination, EzSqlOleDbCM>
    {
        public static string connectionStringTemplate = @"Data Source={0};Initial Catalog={1};Provider=SQLNCLI10.1;Integrated Security=SSPI;Packet Size=32767;";

        public ReplicateOMatic(Package p) : base(p) { }

        public static implicit operator ReplicateOMatic(Package p) { return new ReplicateOMatic(p); }
        
        /// <summary>
        /// Create a package with a single data flow, 2 variables.
        /// The data flow will contain an OLEDB source and destination component.
        /// We will use fastload options, identity inserts, table locking etc.
        /// </summary>
        /// <param name="sourceServer">Source computer name (UTUMNO)</param>
        /// <param name="sourceDatabase">Source catalog name (DocumentsOld)</param>
        /// <param name="table">Table to be replicated [schema].[tablename]</param>
        /// <param name="destinationServer">Destination computer name (WESTMARCH)</param>
        /// <param name="destinationDatabase">Destination catalog name (Documents)</param>
        public ReplicateOMatic(string sourceServer, string sourceDatabase, string table, string destinationServer, string destinationDatabase)
            : base()
        {
            string saniName = TableToTable.SanitizeName(table);
            string sourceQuery = string.Format("SELECT D.* FROM {0} D", table);

            ////////////////////////////////////////////////////
            // Pacakge settings
            ////////////////////////////////////////////////////
            this.Name = string.Format("Replicate{0}", saniName);

            /////////////////////////////////////////////////////
            // Define package variables
            /////////////////////////////////////////////////////
            this.Variables.Add("sourceQuery", false, "User", sourceQuery);
            this.Variables.Add("tableName", false, "User", table);

            /////////////////////////////////////////////////////
            // Configure DataFlow properties
            /////////////////////////////////////////////////////
            this.DataFlow.Name = string.Format("DFT_{0}", saniName);
            this.DataFlow.Description = "Scripted replication";

            /////////////////////////////////////////////////////
            // Connection manager configuration
            /////////////////////////////////////////////////////
            this.SrcConn.ConnectionString = string.Format(ReplicateOMatic.connectionStringTemplate, sourceServer, sourceDatabase);
            this.SrcConn.Name = string.Format("SRC_{0}", sourceDatabase);
            this.SrcConn.Description = string.Empty;

            this.DestConn.ConnectionString = string.Format(ReplicateOMatic.connectionStringTemplate, destinationServer, destinationDatabase);
            this.DestConn.Name = string.Format("DST_{0}", destinationDatabase);
            this.DestConn.Description = string.Empty;

            /////////////////////////////////////////////////////
            // Configure Dataflow's Source properties
            /////////////////////////////////////////////////////
            this.Source.Name = "Src " + saniName;
            this.Source.Description = string.Empty;
            this.Source.AccessMode = AccessMode.AM_SQLCOMMAND;
            this.Source.SqlCommand = sourceQuery;

            /////////////////////////////////////////////////////
            // Configure Dataflow's Destination properties
            /////////////////////////////////////////////////////
            this.Dest.Name = "Dest " + saniName;
            this.Dest.Description = string.Empty;
            this.Dest.Table = table;
            this.Dest.FastLoadKeepIdentity = true;
            this.Dest.FastLoadKeepNulls = true;
            this.Dest.FastLoadOptions = "TABLOCK,CHECK_CONSTRAINTS";
            this.Dest.AccessMode = AccessMode.AM_OPENROWSET_FASTLOAD_VARIABLE;
            this.Dest.DataSourceVariable = this.Variables["tableName"].QualifiedName;

            this.Dest.LinkAllInputsToOutputs();
        }

        /// <summary>
        /// Generate a list of all the tables in schema foo or bar
        /// </summary>
        /// <returns>Returns a list of all tables matching the query criteria. Format is [schema].[table]</returns>
        public static List<string> GenerateTableList(string server, string catalog)
        {
            List<string> tables = null;
            using (System.Data.SqlClient.SqlConnection connection = new System.Data.SqlClient.SqlConnection(string.Format("server={0};integrated security=sspi;database={1};", server, catalog)))
            {
                connection.Open();
                string sql = @"
                    SELECT
                        quotename(schema_name(T.schema_id)) + '.' + quotename(T.name)
                    FROM
                        sys.tables T
                    WHERE
                        T.schema_id = schema_id('foo')
                        OR T.schema_id = schema_id('bar')
                    ORDER BY
                    1";
                using (System.Data.SqlClient.SqlCommand command = new System.Data.SqlClient.SqlCommand(sql, connection))
                {
                    command.CommandType = System.Data.CommandType.Text;
                    using (System.Data.SqlClient.SqlDataReader reader = command.ExecuteReader())
                    {
                        tables = new List<string>();
                        while (reader.Read())
                        {
                            tables.Add(reader[0].ToString());
                        }
                    }
                }
            }
            return tables;
        }

        /// <summary>
        /// Sanitize a name so that it is valid for SSIS objects. 
        /// Strips []/\:=
        /// Replaces . with _
        /// </summary>
        /// <param name="name"></param>
        /// <returns></returns>
        public static string SanitizeName(string name)
        {
            string saniName = name.Replace("[", String.Empty).Replace("]", string.Empty).Replace(".", "_").Replace("/", string.Empty).Replace("\\", string.Empty).Replace(":", string.Empty);
            return saniName;
        }
    }
}

Usage

This code would be added to the original program. Call it from the Main and modify the code to reflect the correct server and catalog names
        /// <summary>
        /// Build a set of SSIS packages that take the form of ReplicateSchema_Table.dtsx
        /// </summary>
        public static void ReplicateTables()
        {
            string outputFolder = @"C:\sandbox\SSISHackAndSlash\SSISHackAndSlash";
            string sourceServer = @"ANGBAND";
            string sourceDatabase = "OldDocuments";
            string destinationServer = @"UTUMNO";
            string destinationDatabase = "Documents";
            string outputFile = string.Empty;
            bool tweak = true;
            
            foreach (string table in ReplicateOMatic.GenerateTableList(sourceServer, sourceDatabase))
            {
                ReplicateOMatic rom = new ReplicateOMatic(sourceServer, sourceDatabase, table, destinationServer, destinationDatabase);
                outputFile = System.IO.Path.Combine(outputFolder, string.Format("{0}.dtsx", rom.Name));
                // This section is optional but I like to tweak things
                if (tweak)
                {
                    Package p = null;
                    Application app = null;
                    app = new Application();
                    p = new Package();

                    // Connection managers are easier through EzAPI
                    EzSqlOleDbCM loggingCM = new EzSqlOleDbCM(p);
                    string loggingCatalog = string.Empty;
                    //loggingCatalog = "SSISDB";
                    loggingCatalog = "SRC_msdb"; 
                    loggingCM.Name = loggingCatalog;
                    loggingCM.ConnectionString = string.Format(ReplicateOMatic.connectionStringTemplate, sourceServer, loggingCatalog);

                    // Serialize the EzAPI to XML and load our package from it
                    p.LoadFromXML(rom.SaveToXML(), null);

                    ///////////////////////////////////////////////////////////////
                    // Enable SQL Server configuration, assumes 
                    // an OLE DB connection manager named SSISDB exists
                    // uses a table called dbo.sysdtsconfig
                    // Configuration key is Default.2008.SalesDB
                    ///////////////////////////////////////////////////////////////
                    //p.EnableConfigurations = true;
                    //Configuration c = null;
                    //c = p.Configurations.Add();
                    //c.Name = "SalesDB";
                    //c.ConfigurationType = DTSConfigurationType.SqlServer;
                    //c.ConfigurationString = @"""SSISDB"";""[dbo].[sysdtsconfig]"";""Default.2008.SalesDB""";

                    ///////////////////////////////////////////////////////////////
                    // Enable logging
                    // Add log provider and such
                    // http://msdn.microsoft.com/en-us/library/microsoft.sqlserver.dts.runtime.logproviders.add.aspx
                    // ProgID: DTS.LogProviderSQLServer.2
                    ///////////////////////////////////////////////////////////////
                    DTSEventColumnFilter eventFilter = new DTSEventColumnFilter();
                    eventFilter.Computer = true;
                    eventFilter.DataBytes = true;
                    eventFilter.ExecutionID = true;
                    eventFilter.MessageText = true;
                    eventFilter.Operator = true;
                    eventFilter.SourceID = true;
                    eventFilter.SourceName = true;

                    p.LoggingMode = DTSLoggingMode.Enabled;

                    // Define the events we care about
                    string[] notableEvents = new string[] { "OnError", "OnInformation", "OnPostExecute", "OnPreExecute", "OnWarning", "OnTaskFailed" };

                    // Add and configure the sql log provider
                    LogProvider provider = p.LogProviders.Add("DTS.LogProviderSQLServer.2");

                    provider.ConfigString = loggingCM.Name;
                    provider.Name = "SSIS log provider for SQL Server";
                    provider.Description = "Writes log entries for events to a SQL Server database";
                    p.LoggingOptions.SelectedLogProviders.Add(provider);

                    LoggingOptions options = p.LoggingOptions;
                    // load up the events we care about
                    options.EventFilter = notableEvents;
                    options.EventFilterKind = DTSEventFilterKind.Inclusion;

                    // configure the specifics of how an event should be logged
                    foreach (string item in notableEvents)
                    {
                        options.SetColumnFilter(item, eventFilter);
                    }
                    app.SaveToXml(outputFile, p, null);
                }
                else
                {
                    rom.SaveToFile(outputFile);
                }
            }
        }

No comments:

Post a Comment