Azure, Azure Data Factory, Azure Data Lake, Biml, Microsoft Technologies

Copying data from On Prem SQL to ADLS with ADF and Biml – Part 2

Note: This post is about Azure Data Factory V1
I showed in my previous post how we generated the datasets for our Azure Data Factory pipelines. In this post, I’ll show the BimlScript for our pipelines. Pipelines define the activities, identify the input and output datasets for those activities, and set an execution schedule. We were creating several pipelines with copy activities to copy data to Azure Data Lake Store.

We generated one pipeline per schedule and load type:

  • Hourly – Full
  • Hourly – Incremental
  • Daily – Full
  • Daily – Incremental

We also generated some one-time load pipelines for DR/new environment setup.

The first code file below is the template for the pipeline. You can see code nuggets for the data we receive from the generator file and for conditional logic we implemented. The result is one copy activity per source table within the appropriate pipeline.

In the second code file below, lines 104 to 119 are generating the pipelines. We read in the necessary data from the Excel file:

  • Schema name
  • Table name
  • Columns list
  • Incremental predicate

Sidenote: We wrote a quick T-SQL statement (not shown) to generate the columns list. This could have been done in our BimlScript, but it was something we changed after the fact to accommodate the limitations of Polybase (Dear Microsoft: Please fix). SQL was quicker and easier for us, but if I were to do this again I would add that into our BimlScript. We needed to replace new lines and double quotes in our data before we could read it in from the data lake.  You can get around this issue by using .ORC files rather than text delimited files. But the ORC files aren’t human readable, and we felt that was important for adoption of the data lake with the client on this project. They were already jumping in with several new technologies and we didn’t want to add anything else to the stack. So our select statements list out fields and replace the unwanted characters in the string fields.

Our Excel file looks like this.

ADF Biml Metadata

Columns B, C, L, and M are populated by Excel formulas. This is the file that is read in by the BimlScript in the code below.

In our generator file (which is the same file that was used to generate the datasets), we use the CallBimlScript function to call the pipeline template file and pass along the required properties (table, schema, frequency, scope, columns list, predicate).

<#@ import namespace="System.Data" #>
<#@ import namespace="System.Text" #>
<#@ property name="targetTables" type="DataView"#>
<#@ property name="frequency" type="string"#>
<#@ property name="scope" type="string"#>
{
"$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json&quot;,
"name": "PL_Copy_MySourceDBToADLS_<#=frequency#>_<#=scope#>",
"properties": {
"description": "<#=frequency#> <#=scope#> copies of data from Source db to the data lake.",
"activities": [
<# var isFirst = true; foreach( DataRowView rowView in targetTables) {#>
<# DataRow row = rowView.Row; #>
<# string schemaName = row["SchemaName"].ToString();#>
<# string tableName = row["TableName"].ToString();#>
<# string columnList = row["ColumnListForSelect"].ToString(); #>
<# string predicate = row["IncrementalPredicate"].ToString(); #>
<#=isFirst ? "" : ","#>
{
"name": "Copy to Lake – <#=schemaName#>.<#=tableName#>",
"type": "Copy",
"inputs": [
{
"name": "DS_OnPremSQL_MySourceDB_<#=schemaName#>_<#=tableName#>"
}
],
"outputs": [
{
"name": "DS_DataLake_MySourceDB_<#=schemaName#>_<#=tableName#>"
}
],
"typeProperties": {
"source": {
"type": "SqlSource",
<# if (scope == "Full") {#>
"sqlReaderQuery": "SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>]"
<#} else if (scope == "Deltas" && frequency == "Hourly") {#>
"sqlReaderQuery": "$$Text.Format('SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>] WHERE <#=predicate#>', Time.AddHours(WindowStart, -5), Time.AddHours(WindowEnd, -5))"
<#} else if (scope == "Deltas" && frequency == "Daily") {#>
"sqlReaderQuery": "$$Text.Format('SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>] WHERE <#=predicate#>', WindowStart, WindowEnd)"
<# } #>
},
"sink": {
"type": "AzureDataLakeStoreSink"
}
},
"policy": {
"concurrency": 1,
"executionPriorityOrder": "OldestFirst",
"retry": 3,
"timeout": "01:00:00"
},
"scheduler": {
<# if (frequency == "Daily") {#>
"frequency": "Day",
"offset": "09:00:00",
<#} else if (frequency == "Hourly") {#>
"frequency": "Hour",
<# } #>
"style": "EndOfInterval",
"interval": 1
}
}
<# isFirst = false; }#>
],
<# if (frequency == "Hourly") {#>
"start": "2017-03-01T01:00:00",
<#}else {#>
"start": "2017-03-02T00:00:00",
<#}#>
"end": "9999-09-09"
}
}

<#@ template tier="10" #>
<#@ import namespace="System.Data" #>
<#@ import namespace="System.Text" #>
<#@ code file="BGHelper.cs" #>
<#@ import namespace="BGHelper" #>
<Biml xmlns="http://schemas.varigence.com/biml.xsd"&gt;
</Biml>
<#
string mdFilePath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\metadata";
string mdFileName = "TargetTableMetadata.xlsx";
string mdWorkSheetName = "Metadata$";
bool mdHasHeader = true;
string logPath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\data_factory\\generate_data_factory_biml\\log.txt";
string adfProjPath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\data_factory\\data_factory\\";
DataSet ds = new DataSet();
ds = ExcelReader.ReadExcelQuery(mdFilePath, mdFileName, mdWorkSheetName, mdHasHeader);
System.IO.File.AppendAllText(@logPath, "MetaData File Path: " + System.IO.Path.Combine(mdFilePath, mdFileName).ToString() +Environment.NewLine );
System.IO.File.AppendAllText(@logPath, "MetaData File Path: " + System.IO.File.Exists(System.IO.Path.Combine(mdFilePath, mdFileName)).ToString() +Environment.NewLine );
System.IO.File.AppendAllText(@logPath, "Dataset table count: " + ds.Tables.Count.ToString() + Environment.NewLine);
DataView dailyFulls = new DataView(ds.Tables["Metadata"],"Frequency = 'Daily' and [Changes Only] = 'No'","", DataViewRowState.CurrentRows);
DataView dailyDeltas = new DataView(ds.Tables["Metadata"], "Frequency = 'Daily' and [Changes Only] = 'Yes'", "", DataViewRowState.CurrentRows);
DataView hourlyFulls = new DataView(ds.Tables["Metadata"], "Frequency = 'Hourly' and [Changes Only] = 'No'", "", DataViewRowState.CurrentRows);
DataView hourlyDeltas = new DataView(ds.Tables["Metadata"], "Frequency = 'Hourly' and [Changes Only] = 'Yes'", "", DataViewRowState.CurrentRows);
//log count of results for each filter
System.IO.File.AppendAllText(@logPath, "Daily Fulls Count: " + dailyFulls.Count.ToString() + Environment.NewLine);
System.IO.File.AppendAllText(@logPath, "Daily Deltas Count: " + dailyDeltas.Count.ToString() + Environment.NewLine);
System.IO.File.AppendAllText(@logPath, "Hourly Fulls Count: " + hourlyFulls.Count.ToString() + Environment.NewLine);
System.IO.File.AppendAllText(@logPath, "Hourly Deltas Count: " + hourlyDeltas.Count.ToString() + Environment.NewLine);
//Generate datasets
foreach (DataRowView rowView in dailyFulls)
{
DataRow row = rowView.Row;
string schemaName = row["SchemaName"].ToString();
string tableName = row["TableName"].ToString();
string frequency = row["Frequency"].ToString();
string scope = "full";
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine);
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency));
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope));
}
foreach (DataRowView rowView in dailyDeltas)
{
DataRow row = rowView.Row;
string schemaName = row["SchemaName"].ToString();
string tableName = row["TableName"].ToString();
string frequency = row["Frequency"].ToString();
string scope = "deltas";
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine);
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency));
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope));
}
foreach (DataRowView rowView in hourlyFulls)
{
DataRow row = rowView.Row;
string schemaName = row["SchemaName"].ToString();
string tableName = row["TableName"].ToString();
string frequency = row["Frequency"].ToString();
string scope = "full";
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine);
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency));
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope));
}
foreach (DataRowView rowView in hourlyDeltas)
{
DataRow row = rowView.Row;
string schemaName = row["SchemaName"].ToString();
string tableName = row["TableName"].ToString();
string frequency = row["Frequency"].ToString();
string scope = "deltas";
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine);
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency));
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope));
}
// Generate pipelines
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Daily_Full.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", dailyFulls, "Daily", "Full"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Daily_Deltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", dailyDeltas, "Daily", "Deltas"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Hourly_Full.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", hourlyFulls, "Hourly", "Full"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Hourly_Deltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", hourlyDeltas, "Hourly", "Deltas"));
//Generate One-Time Pipelines
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_DailyFulls.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", dailyFulls, "Daily", "Full"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_DailyDeltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", dailyDeltas, "Daily", "Deltas"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_HourlyFulls.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", hourlyFulls, "Hourly", "Full"));
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_HourlyDeltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", hourlyDeltas, "Hourly", "Deltas"));
#>

view raw
ADFGenerator.biml
hosted with ❤ by GitHub

The great thing about Biml is that I can use it as much or as little as I feel is helpful. That T-SQL statement to get column lists could have been Biml, but it didn’t have to be. The client can maintain and enhance these pipelines with or without Biml as they see fit. There is no vendor lock-in here. Just as with Biml-generated SSIS projects, there is no difference between a hand-written ADF solution and a Biml-generated ADF solution, other than the Biml-generated solution is probably more consistent.

And have I mentioned the time savings? There is a reason why Varigence gives out shirts that say “It’s Monday and I’m done for the week.”

We made changes and regenerated our pipelines a few times, which would have taken hours without Biml. With Biml, it was no big deal.

Thanks to Levi for letting me share some of his code, and for working with me on this project!

1 thought on “Copying data from On Prem SQL to ADLS with ADF and Biml – Part 2”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s