This post is out of order for the recipe series but there was a need on
dba.stackexchange.com for this solution.
Add a new class to the solution named ReplicateOMatic. This class will create a new package with a pair of connection managers and configure a data flow to copy all of the between paired tables. Modify the query in the method GenerateTableList to create the correct list of tables.
namespace EzAPIRecipies
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.SqlServer.SSIS.EzAPI;
using Microsoft.SqlServer.Dts.Runtime;
public class ReplicateOMatic : EzSrcDestPackage<EzOleDbSource, EzSqlOleDbCM, EzOleDbDestination, EzSqlOleDbCM>
{
public static string connectionStringTemplate = @"Data Source={0};Initial Catalog={1};Provider=SQLNCLI10.1;Integrated Security=SSPI;Packet Size=32767;";
public ReplicateOMatic(Package p) : base(p) { }
public static implicit operator ReplicateOMatic(Package p) { return new ReplicateOMatic(p); }
/// <summary>
/// Create a package with a single data flow, 2 variables.
/// The data flow will contain an OLEDB source and destination component.
/// We will use fastload options, identity inserts, table locking etc.
/// </summary>
/// <param name="sourceServer">Source computer name (UTUMNO)</param>
/// <param name="sourceDatabase">Source catalog name (DocumentsOld)</param>
/// <param name="table">Table to be replicated [schema].[tablename]</param>
/// <param name="destinationServer">Destination computer name (WESTMARCH)</param>
/// <param name="destinationDatabase">Destination catalog name (Documents)</param>
public ReplicateOMatic(string sourceServer, string sourceDatabase, string table, string destinationServer, string destinationDatabase)
: base()
{
string saniName = TableToTable.SanitizeName(table);
string sourceQuery = string.Format("SELECT D.* FROM {0} D", table);
////////////////////////////////////////////////////
// Pacakge settings
////////////////////////////////////////////////////
this.Name = string.Format("Replicate{0}", saniName);
/////////////////////////////////////////////////////
// Define package variables
/////////////////////////////////////////////////////
this.Variables.Add("sourceQuery", false, "User", sourceQuery);
this.Variables.Add("tableName", false, "User", table);
/////////////////////////////////////////////////////
// Configure DataFlow properties
/////////////////////////////////////////////////////
this.DataFlow.Name = string.Format("DFT_{0}", saniName);
this.DataFlow.Description = "Scripted replication";
/////////////////////////////////////////////////////
// Connection manager configuration
/////////////////////////////////////////////////////
this.SrcConn.ConnectionString = string.Format(ReplicateOMatic.connectionStringTemplate, sourceServer, sourceDatabase);
this.SrcConn.Name = string.Format("SRC_{0}", sourceDatabase);
this.SrcConn.Description = string.Empty;
this.DestConn.ConnectionString = string.Format(ReplicateOMatic.connectionStringTemplate, destinationServer, destinationDatabase);
this.DestConn.Name = string.Format("DST_{0}", destinationDatabase);
this.DestConn.Description = string.Empty;
/////////////////////////////////////////////////////
// Configure Dataflow's Source properties
/////////////////////////////////////////////////////
this.Source.Name = "Src " + saniName;
this.Source.Description = string.Empty;
this.Source.AccessMode = AccessMode.AM_SQLCOMMAND;
this.Source.SqlCommand = sourceQuery;
/////////////////////////////////////////////////////
// Configure Dataflow's Destination properties
/////////////////////////////////////////////////////
this.Dest.Name = "Dest " + saniName;
this.Dest.Description = string.Empty;
this.Dest.Table = table;
this.Dest.FastLoadKeepIdentity = true;
this.Dest.FastLoadKeepNulls = true;
this.Dest.FastLoadOptions = "TABLOCK,CHECK_CONSTRAINTS";
this.Dest.AccessMode = AccessMode.AM_OPENROWSET_FASTLOAD_VARIABLE;
this.Dest.DataSourceVariable = this.Variables["tableName"].QualifiedName;
this.Dest.LinkAllInputsToOutputs();
}
/// <summary>
/// Generate a list of all the tables in schema foo or bar
/// </summary>
/// <returns>Returns a list of all tables matching the query criteria. Format is [schema].[table]</returns>
public static List<string> GenerateTableList(string server, string catalog)
{
List<string> tables = null;
using (System.Data.SqlClient.SqlConnection connection = new System.Data.SqlClient.SqlConnection(string.Format("server={0};integrated security=sspi;database={1};", server, catalog)))
{
connection.Open();
string sql = @"
SELECT
quotename(schema_name(T.schema_id)) + '.' + quotename(T.name)
FROM
sys.tables T
WHERE
T.schema_id = schema_id('foo')
OR T.schema_id = schema_id('bar')
ORDER BY
1";
using (System.Data.SqlClient.SqlCommand command = new System.Data.SqlClient.SqlCommand(sql, connection))
{
command.CommandType = System.Data.CommandType.Text;
using (System.Data.SqlClient.SqlDataReader reader = command.ExecuteReader())
{
tables = new List<string>();
while (reader.Read())
{
tables.Add(reader[0].ToString());
}
}
}
}
return tables;
}
/// <summary>
/// Sanitize a name so that it is valid for SSIS objects.
/// Strips []/\:=
/// Replaces . with _
/// </summary>
/// <param name="name"></param>
/// <returns></returns>
public static string SanitizeName(string name)
{
string saniName = name.Replace("[", String.Empty).Replace("]", string.Empty).Replace(".", "_").Replace("/", string.Empty).Replace("\\", string.Empty).Replace(":", string.Empty);
return saniName;
}
}
}
Usage
This code would be added to the original
program. Call it from the Main and modify the code to reflect the correct server and catalog names
/// <summary>
/// Build a set of SSIS packages that take the form of ReplicateSchema_Table.dtsx
/// </summary>
public static void ReplicateTables()
{
string outputFolder = @"C:\sandbox\SSISHackAndSlash\SSISHackAndSlash";
string sourceServer = @"ANGBAND";
string sourceDatabase = "OldDocuments";
string destinationServer = @"UTUMNO";
string destinationDatabase = "Documents";
string outputFile = string.Empty;
bool tweak = true;
foreach (string table in ReplicateOMatic.GenerateTableList(sourceServer, sourceDatabase))
{
ReplicateOMatic rom = new ReplicateOMatic(sourceServer, sourceDatabase, table, destinationServer, destinationDatabase);
outputFile = System.IO.Path.Combine(outputFolder, string.Format("{0}.dtsx", rom.Name));
// This section is optional but I like to tweak things
if (tweak)
{
Package p = null;
Application app = null;
app = new Application();
p = new Package();
// Connection managers are easier through EzAPI
EzSqlOleDbCM loggingCM = new EzSqlOleDbCM(p);
string loggingCatalog = string.Empty;
//loggingCatalog = "SSISDB";
loggingCatalog = "SRC_msdb";
loggingCM.Name = loggingCatalog;
loggingCM.ConnectionString = string.Format(ReplicateOMatic.connectionStringTemplate, sourceServer, loggingCatalog);
// Serialize the EzAPI to XML and load our package from it
p.LoadFromXML(rom.SaveToXML(), null);
///////////////////////////////////////////////////////////////
// Enable SQL Server configuration, assumes
// an OLE DB connection manager named SSISDB exists
// uses a table called dbo.sysdtsconfig
// Configuration key is Default.2008.SalesDB
///////////////////////////////////////////////////////////////
//p.EnableConfigurations = true;
//Configuration c = null;
//c = p.Configurations.Add();
//c.Name = "SalesDB";
//c.ConfigurationType = DTSConfigurationType.SqlServer;
//c.ConfigurationString = @"""SSISDB"";""[dbo].[sysdtsconfig]"";""Default.2008.SalesDB""";
///////////////////////////////////////////////////////////////
// Enable logging
// Add log provider and such
// http://msdn.microsoft.com/en-us/library/microsoft.sqlserver.dts.runtime.logproviders.add.aspx
// ProgID: DTS.LogProviderSQLServer.2
///////////////////////////////////////////////////////////////
DTSEventColumnFilter eventFilter = new DTSEventColumnFilter();
eventFilter.Computer = true;
eventFilter.DataBytes = true;
eventFilter.ExecutionID = true;
eventFilter.MessageText = true;
eventFilter.Operator = true;
eventFilter.SourceID = true;
eventFilter.SourceName = true;
p.LoggingMode = DTSLoggingMode.Enabled;
// Define the events we care about
string[] notableEvents = new string[] { "OnError", "OnInformation", "OnPostExecute", "OnPreExecute", "OnWarning", "OnTaskFailed" };
// Add and configure the sql log provider
LogProvider provider = p.LogProviders.Add("DTS.LogProviderSQLServer.2");
provider.ConfigString = loggingCM.Name;
provider.Name = "SSIS log provider for SQL Server";
provider.Description = "Writes log entries for events to a SQL Server database";
p.LoggingOptions.SelectedLogProviders.Add(provider);
LoggingOptions options = p.LoggingOptions;
// load up the events we care about
options.EventFilter = notableEvents;
options.EventFilterKind = DTSEventFilterKind.Inclusion;
// configure the specifics of how an event should be logged
foreach (string item in notableEvents)
{
options.SetColumnFilter(item, eventFilter);
}
app.SaveToXml(outputFile, p, null);
}
else
{
rom.SaveToFile(outputFile);
}
}
}
No comments:
Post a Comment