Tools of the trade: Processing Dynamics data with the Spark CDM connector

IMG_0219Crop

This is the 13th post in a series of blog posts using a theme of “Tools of the trade”. The series targets software tools, statistical concepts, data science techniques, or related items. In all cases, the topic will contribute to accomplishing data science tasks. The target audience for the posts is engineers, analysts, and managers who want to build their knowledge and skills in data science, particularly those in the Microsoft Dynamics ecosystem.

This post targets the Common Data Model (CDM) connector for Spark.

What is it?

Per spark-cdm-connector/overview.md at master · Azure/spark-cdm-connector (github.com), “The Spark CDM Connector enables a Spark program to read and write CDM entities in a CDM folder via Spark dataframes.” The CDM is the Common Data Model. It provides a “a shared data language for business and analytical applications to use” (Common Data Model – Common Data Model | Microsoft Docs).

Spark fundamentally uses a “schema on read” approach to data structures. When writing a Spark program, you must define the data types that are being read. While this provides flexibility to handle a variety of structured and unstructured data, there is overhead for the programmer. SQL uses a “schema on write” approach that is great for structured databases, but lacks flexibility.

The CDM language provides a mechanism to describe the data types in simple file types like CSV. This CDM connector simplifies reading and writing data by leveraging the CDM metadata.

How do I use it?

Dynamics uses the CDM when exporting data to Azure data lake. Once the ‘raw’ data from dynamics lands in the data lake, it often needs to be re-shaped into a data structure appropriate for analytics – reporting, machine learning, etc. The data volumes are often large enough that Spark is the best approach for the processing required to re-shape the data. The CDM connector simplifies data access and provides schema definition when writing the re-shaped data.

Discussion

What follows is a walkthrough for using the Spark CDM connector to retrieve FinOps ‘raw’ data (CustTrans), create a simple aggregate, and write the resulting dataset back to the data lake with CDM metadata.

Storage of FinOps raw data

I created a Synapse workspace with a new storage account (ADLS). I used AzCopy to copy data exported from an existing FinOps demo environment to the default storage account (https://lakedatainsights.com/2021/07/11/tools-of-the-trade-azcopy-for-moving-dynamics-365-data/). The CDM information shows up in several locations in the hierarchy. For example:

clip_image001

Transaction.manifest.cdm.json has the list of entities in this area. This is the manifest that we’ll pass in the CDM read API.

The data itself is stored in a CSV file in the CustTrans folder.

clip_image002

Code

The following code was developed using a Synapse notebook. Each comment is the start of a new code block. Once you have the data set up, all that should be required to execute this code is to update the variables in the second code block.  Note that I’ve included the print() output for you to get a feel for the data structures.

%%spark   
// The CDM connector is included by default in Synapse Spark.  No install required.
// Get CDM version.  Example of interchanging lanaguages (this is the scala syntax).
print(com.microsoft.cdm.BuildInfo.version)

# Managed identity access is used for access.  Since this is the default storage for Synapse, Storage
# Blob Data Contributor for the Synapse workspace is set up by default.
# Default storage for this Synapse workspace.
storageAccountName = "dynamicssynapsestore.dfs.core.windows.net" 

# Data was copied to this storage account using AzCopy.  Details follow for the path to CustTrans.
container = "finance-operations"
finOpsName = "/MyEnvironment.sandbox.operations.test.dynamics.com"
manifestPath = "/Tables/Finance/AccountsReceivable/Transaction"
manifest = "/Transaction.manifest.cdm.json"

# Read the CustTrans data leveraging the CDM metadata.
custTransData = (spark.read.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + finOpsName + manifestPath + manifest)
  .option("entity", "CustTrans")
  .load())

# Get some basic information on the data frame.
print(type(custTransData))
print('Column count: ' + str(len(custTransData.columns)))
print('Row count: ' + str(custTransData.count()))
custTransData.show(vertical=True)


<class 'pyspark.sql.dataframe.DataFrame'>
Column count: 102

Row count: 47634
-RECORD 0--------------------------------------------------
_SysRowId | 5637144576
LSN | null
LastProcessedChange_DateTime | null
DataLakeModified_DateTime | 2021-05-27 18:50:... 
RECID | 5637144576
<…>

# Get specifics of columns of interest. Note that the data types were found in the CDM definitions.
print(custTransData.select('DATAAREAID', 'ORDERACCOUNT', 'AMOUNTCUR').printSchema())

root
|-- DATAAREAID: string (nullable = true)
|-- ORDERACCOUNT: string (nullable = true)
|-- AMOUNTCUR: decimal(18,4) (nullable = true)

# Get summary order stats per company and account.
from pyspark.sql.functions import col
custTransSummaryByCompanyAccount = (custTransData.groupby('DATAAREAID', 'ORDERACCOUNT')
        .agg({'*': 'count', 
                  'AMOUNTCUR' : 'sum'})
        .toDF('Company', 'Account', 'OrderCount', 'TotalOrderAmount')
        .withColumn('AverageOrderAmount', col('TotalOrderAmount') / col('OrderCount'))
        .sort('TotalOrderAmount', ascending=False))
custTransSummaryByCompanyAccount.show()

+-------+-----------+----------+----------------+--------------------+
|Company| Account|OrderCount|TotalOrderAmount| AverageOrderAmount|
+-------+-----------+----------+----------------+--------------------+
| rumf|RUMF-000001| 47| 13249549.7600|281905.3140425531...|
| psus| 000009| 158| 8744477.0000|55344.79113924050633|
| psus| 000004| 2| 3269424.7500|1634712.375000000...|
| cnmf|CNMF-000002| 29| 2580267.9700|88974.75758620689655|
| jpmf|JPMF-000003| 10| 2097800.0000|209780.0000000000...|

# Save dataset with CDM metadata in CSV format.
(custTransSummaryByCompanyAccount.write.format("com.microsoft.cdm")
  .option("storage", storageAccountName)
  .option("manifestPath", container + finOpsName + "/aggregatedata/default.manifest.cdm.json")
  .option("entity", "CustTransSummary")
  .mode("append")
  .save())

Resulting data set

clip_image003

clip_image004

References

spark-cdm-connector/overview.md at master · Azure/spark-cdm-connector (github.com)

Common Data Model – Common Data Model | Microsoft Docs

https://lakedatainsights.com/2021/07/11/tools-of-the-trade-azcopy-for-moving-dynamics-365-data/

Pictuue details:  Taste of fall, 9/14/0200 Canon PowerShot G3 X, f/5, 1/60 s, ISO-400