End-to-end blob ingestion into Azure Data Explorer (2023)

  • Article

Azure Data Explorer is a fast and scalable data exploration service for log and telemetry data. This article gives you an end-to-end example of how to ingest data from Azure Blob storage into Azure Data Explorer.

You'll learn how to programmatically create a resource group, a storage account and container, an event hub, and an Azure Data Explorer cluster and database. You'll also learn how to programmatically configure Azure Data Explorer to ingest data from the new storage account.

For code samples based on previous SDK versions, see the archived article.

Prerequisites

  • An Azure subscription. Create a free Azure account.
  • An Azure AD application and service principal that can access resources. Save the Directory (tenant) ID, Application ID, and Client Secret.

Install packages

This article contains examples in C# and Python. Choose the tab for your preferred language, and install the required packages.

  • C#
  • Python

Azure Resource Manager template

In this article, you use an Azure Resource Manager (ARM) template to create a resource group, a storage account and container, an event hub, and an Azure Data Explorer cluster and database. Save the following content in a file with the name template.json. You'll use this file to run the code example.

{ "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#", "contentVersion": "1.0.0.0", "parameters": { "eventHubNamespaceName": { "type": "string", "metadata": { "description": "Specifies a the event hub Namespace name." } }, "eventHubName": { "type": "string", "metadata": { "description": "Specifies a event hub name." } }, "storageAccountType": { "type": "string", "defaultValue": "Standard_LRS", "allowedValues": ["Standard_LRS", "Standard_GRS", "Standard_ZRS", "Premium_LRS"], "metadata": { "description": "Storage Account type" } }, "storageAccountName": { "type": "string", "defaultValue": "[concat('storage', uniqueString(resourceGroup().id))]", "metadata": { "description": "Name of the storage account to create" } }, "containerName": { "type": "string", "defaultValue": "[concat('storagecontainer', uniqueString(resourceGroup().id))]", "metadata": { "description": "Name of the container in storage account to create" } }, "eventHubSku": { "type": "string", "allowedValues": ["Basic", "Standard"], "defaultValue": "Standard", "metadata": { "description": "Specifies the messaging tier for service Bus namespace." } }, "kustoClusterName": { "type": "string", "defaultValue": "[concat('kusto', uniqueString(resourceGroup().id))]", "metadata": { "description": "Name of the cluster to create" } }, "kustoDatabaseName": { "type": "string", "defaultValue": "kustodb", "metadata": { "description": "Name of the database to create" } }, "clusterPrincipalAssignmentName": { "type": "string", "defaultValue": "clusterPrincipalAssignment1", "metadata": { "description": "Specifies the name of the principal assignment" } }, "principalIdForCluster": { "type": "string", "metadata": { "description": "Specifies the principal id. It can be user email, application (client) ID, security group name" } }, "roleForClusterPrincipal": { "type": "string", "defaultValue": "AllDatabasesViewer", "metadata": { "description": "Specifies the cluster principal role. It can be 'AllDatabasesAdmin', 'AllDatabasesMonitor' or 'AllDatabasesViewer'" } }, "tenantIdForClusterPrincipal": { "type": "string", "metadata": { "description": "Specifies the tenantId of the cluster principal" } }, "principalTypeForCluster": { "type": "string", "defaultValue": "App", "metadata": { "description": "Specifies the principal type. It can be 'User', 'App', 'Group'" } }, "databasePrincipalAssignmentName": { "type": "string", "defaultValue": "databasePrincipalAssignment1", "metadata": { "description": "Specifies the name of the principal assignment" } }, "principalIdForDatabase": { "type": "string", "metadata": { "description": "Specifies the principal id. It can be user email, application (client) ID, security group name" } }, "roleForDatabasePrincipal": { "type": "string", "defaultValue": "Admin", "metadata": { "description": "Specifies the database principal role. It can be 'Admin', 'Ingestor', 'Monitor', 'User', 'UnrestrictedViewers', 'Viewer'" } }, "tenantIdForDatabasePrincipal": { "type": "string", "metadata": { "description": "Specifies the tenantId of the database principal" } }, "principalTypeForDatabase": { "type": "string", "defaultValue": "App", "metadata": { "description": "Specifies the principal type. It can be 'User', 'App', 'Group'" } }, "location": { "type": "string", "defaultValue": "[resourceGroup().location]", "metadata": { "description": "Location for all resources." } } }, "variables": { }, "resources": [{ "apiVersion": "2017-04-01", "type": "Microsoft.EventHub/namespaces", "name": "[parameters('eventHubNamespaceName')]", "location": "[parameters('location')]", "sku": { "name": "[parameters('eventHubSku')]", "tier": "[parameters('eventHubSku')]", "capacity": 1 }, "properties": { "isAutoInflateEnabled": false, "maximumThroughputUnits": 0 } }, { "apiVersion": "2017-04-01", "type": "Microsoft.EventHub/namespaces/eventhubs", "name": "[concat(parameters('eventHubNamespaceName'), '/', parameters('eventHubName'))]", "location": "[parameters('location')]", "dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', parameters('eventHubNamespaceName'))]"], "properties": { "messageRetentionInDays": 7, "partitionCount": 1 } }, { "type": "Microsoft.Storage/storageAccounts", "name": "[parameters('storageAccountName')]", "location": "[parameters('location')]", "apiVersion": "2018-07-01", "sku": { "name": "[parameters('storageAccountType')]" }, "kind": "StorageV2", "resources": [ { "name": "[concat('default/', parameters('containerName'))]", "type": "blobServices/containers", "apiVersion": "2018-07-01", "dependsOn": [ "[parameters('storageAccountName')]" ], "properties": { "publicAccess": "None" } } ], "properties": {} }, { "name": "[parameters('kustoClusterName')]", "type": "Microsoft.Kusto/clusters", "sku": { "name": "Standard_E8ads_v5", "tier": "Standard", "capacity": 2 }, "apiVersion": "2019-09-07", "location": "[parameters('location')]", "tags": { "Created By": "GitHub quickstart template" } }, { "name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'))]", "type": "Microsoft.Kusto/clusters/databases", "apiVersion": "2019-09-07", "location": "[parameters('location')]", "dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"], "properties": { "softDeletePeriodInDays": 365, "hotCachePeriodInDays": 31 } }, { "type": "Microsoft.Kusto/Clusters/principalAssignments", "apiVersion": "2019-11-09", "name": "[concat(parameters('kustoClusterName'), '/', parameters('clusterPrincipalAssignmentName'))]", "dependsOn": ["[resourceId('Microsoft.Kusto/clusters', parameters('kustoClusterName'))]"], "properties": { "principalId": "[parameters('principalIdForCluster')]", "role": "[parameters('roleForClusterPrincipal')]", "tenantId": "[parameters('tenantIdForClusterPrincipal')]", "principalType": "[parameters('principalTypeForCluster')]" } }, { "type": "Microsoft.Kusto/Clusters/Databases/principalAssignments", "apiVersion": "2019-11-09", "name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDatabaseName'), '/', parameters('databasePrincipalAssignmentName'))]", "dependsOn": ["[resourceId('Microsoft.Kusto/clusters/databases', parameters('kustoClusterName'), parameters('kustoDatabaseName'))]"], "properties": { "principalId": "[parameters('principalIdForDatabase')]", "role": "[parameters('roleForDatabasePrincipal')]", "tenantId": "[parameters('tenantIdForDatabasePrincipal')]", "principalType": "[parameters('principalTypeForDatabase')]" } } ]}

Code example

  • C#
  • Python

The following code example gives you a step-by-step process that results in data ingestion into Azure Data Explorer.

You first create a resource group. You also create Azure resources such as a storage account and container, an event hub, and an Azure Data Explorer cluster and database, and add principals. You then create an Azure Event Grid subscription, along with a table and column mapping, in the Azure Data Explorer database. Finally, you create the data connection to configure Azure Data Explorer to ingest data from the new storage account.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Directory (tenant) IDvar clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"; //Application IDvar clientSecret = "PlaceholderClientSecret"; //Client Secretvar subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";var credentials = new ClientSecretCredential(tenantId, clientId, clientSecret);var resourceManagementClient = new ArmClient(credentials, subscriptionId);var deploymentName = "e2eexample";Console.WriteLine("Step 1: Create a new resource group in your Azure subscription to manage all the resources for using Azure Data Explorer.");var subscriptions = resourceManagementClient.GetSubscriptions();var subscription = (await subscriptions.GetAsync(subscriptionId)).Value;var resourceGroups = subscription.GetResourceGroups();var resourceGroupName = deploymentName + "resourcegroup";var location = AzureLocation.WestEurope;var resourceGroupData = new ResourceGroupData(location);var resourceGroup = (await resourceGroups.CreateOrUpdateAsync(WaitUntil.Completed, resourceGroupName, resourceGroupData)).Value;Console.WriteLine("Step 2: Create a Blob Storage, a container in the Storage account, an event hub, an Azure Data Explorer cluster, database, and add principals by using an Azure Resource Manager template.");var deployments = resourceGroup.GetArmDeployments();var azureResourceTemplatePath = @"xxxxxxxxx\template.json"; //Path to the Azure Resource Manager template JSON from the previous sectionvar eventHubName = deploymentName + "eventhub";var eventHubNamespaceName = eventHubName + "ns";var storageAccountName = deploymentName + "storage";var storageContainerName = deploymentName + "storagecontainer";var eventGridSubscriptionName = deploymentName + "eventgrid";var kustoClusterName = deploymentName + "kustocluster";var kustoDatabaseName = deploymentName + "kustodatabase";var kustoTableName = "Events";var kustoColumnMappingName = "Events_CSV_Mapping";var kustoDataConnectionName = deploymentName + "kustoeventgridconnection";var armDeploymentContent = new ArmDeploymentContent( new ArmDeploymentProperties(ArmDeploymentMode.Incremental) { Template = BinaryData.FromString(File.ReadAllText(azureResourceTemplatePath, Encoding.UTF8)), Parameters = BinaryData.FromObjectAsJson( JsonConvert.SerializeObject( new Dictionary<string, Dictionary<string, string>> { ["eventHubNamespaceName"] = new(capacity: 1) { { "value", eventHubNamespaceName } }, ["eventHubName"] = new(capacity: 1) { { "value", eventHubName } }, ["storageAccountName"] = new(capacity: 1) { { "value", storageAccountName } }, ["containerName"] = new(capacity: 1) { { "value", storageContainerName } }, ["kustoClusterName"] = new(capacity: 1) { { "value", kustoClusterName } }, ["kustoDatabaseName"] = new(capacity: 1) { { "value", kustoDatabaseName } }, ["principalIdForCluster"] = new(capacity: 1) { { "value", "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" } }, //Application ID ["roleForClusterPrincipal"] = new(capacity: 1) { { "value", "AllDatabasesAdmin" } }, ["tenantIdForClusterPrincipal"] = new(capacity: 1) { { "value", tenantId } }, ["principalTypeForCluster"] = new(capacity: 1) { { "value", "App" } }, ["principalIdForDatabase"] = new(capacity: 1) { { "value", "xxxxxxxx@xxxxxxxx.com" } }, //User Email ["roleForDatabasePrincipal"] = new(capacity: 1) { { "value", "Admin" } }, ["tenantIdForDatabasePrincipal"] = new(capacity: 1) { { "value", tenantId } }, ["principalTypeForDatabase"] = new(capacity: 1) { { "value", "User" } } } ) ) });await deployments.CreateOrUpdateAsync(WaitUntil.Completed, deploymentName, armDeploymentContent);Console.WriteLine("Step 3: Create an Event Grid subscription to publish blob events created in a specific container to an event hub.");var storageResourceId = new ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Storage/storageAccounts/{storageAccountName}");var eventHubResourceId = new ResourceIdentifier($"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.EventHub/namespaces/{eventHubNamespaceName}/eventhubs/{eventHubName}");var eventSubscriptions = resourceManagementClient.GetEventSubscriptions(storageResourceId);var eventSubscriptionData = new EventGridSubscriptionData{ Destination = new EventHubEventSubscriptionDestination { ResourceId = eventHubResourceId }, Filter = new EventSubscriptionFilter { SubjectBeginsWith = $"/blobServices/default/containers/{storageContainerName}", }};eventSubscriptionData.Filter.IncludedEventTypes.Add(BlobStorageEventType.MicrosoftStorageBlobCreated.ToString());await eventSubscriptions.CreateOrUpdateAsync(WaitUntil.Completed, eventGridSubscriptionName, eventSubscriptionData);Console.WriteLine("Step 4: Create a table (with three columns: EventTime, EventId, and EventSummary) and column mapping in your Azure Data Explorer database.");var kustoUri = $"https://{kustoClusterName}.{location}.kusto.windows.net";var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri){ InitialCatalog = kustoDatabaseName, FederatedSecurity = true, ApplicationClientId = clientId, ApplicationKey = clientSecret, Authority = tenantId};using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder)){ kustoClient.ExecuteControlCommand( CslCommandGenerator.GenerateTableCreateCommand( kustoTableName, new[] { Tuple.Create("EventTime", "System.DateTime"), Tuple.Create("EventId", "System.Int32"), Tuple.Create("EventSummary", "System.String"), } ) ); kustoClient.ExecuteControlCommand( CslCommandGenerator.GenerateTableMappingCreateCommand( IngestionMappingKind.Csv, kustoTableName, kustoColumnMappingName, new ColumnMapping[] { new() { ColumnName = "EventTime", ColumnType = "dateTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } }, new() { ColumnName = "EventId", ColumnType = "int", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } }, new() { ColumnName = "EventSummary", ColumnType = "string", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } }, } ) );}Console.WriteLine("Step 5: Add an Event Grid data connection. Azure Data Explorer will automatically ingest the data when new blobs are created.");var cluster = (await resourceGroup.GetKustoClusterAsync(kustoClusterName)).Value;var database = (await cluster.GetKustoDatabaseAsync(kustoDatabaseName)).Value;var dataConnections = database.GetKustoDataConnections();var eventGridDataConnectionData = new KustoEventGridDataConnection{ StorageAccountResourceId = storageResourceId, EventGridResourceId = eventHubResourceId, ConsumerGroup = "$Default", Location = location, TableName = kustoTableName, MappingRuleName = kustoColumnMappingName, DataFormat = KustoEventGridDataFormat.Csv};await dataConnections.CreateOrUpdateAsync(WaitUntil.Completed, kustoDataConnectionName, eventGridDataConnectionData);
SettingField description
tenantIdYour tenant ID. It's also known as a directory ID.
subscriptionIdThe subscription ID that you use for resource creation.
clientIdThe client ID of the application that can access resources in your tenant.
clientSecretThe client secret of the application that can access resources in your tenant.

Test the code example

  • C#
  • Python
  1. Upload a file into the storage account.

    var container = new BlobContainerClient( "DefaultEndpointsProtocol=https;AccountName=xxxxxxxxxxxxxx;AccountKey=xxxxxxxxxxxxxx;EndpointSuffix=core.windows.net", storageContainerName);var blobContent = "2007-01-01 00:00:00.0000000,2592,Several trees down\n2007-01-01 00:00:00.0000000,4171,Winter Storm";await container.UploadBlobAsync("test.csv", BinaryData.FromString(blobContent));
    SettingField description
    storageConnectionStringThe connection string of the programmatically created storage account.
  2. Run a test query in Azure Data Explorer.

    var kustoUri = $"https://{kustoClusterName}.{locationSmallCase}.kusto.windows.net";var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri){ InitialCatalog = kustoDatabaseName, FederatedSecurity = true, ApplicationClientId = clientId, ApplicationKey = clientSecret, Authority = tenantId};using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder)){ var query = $"{kustoTableName} | take 10"; using var reader = kustoClient.ExecuteQuery(query) as DataTableReader2; // Print the contents of each of the result sets. while (reader != null && reader.Read()) { Console.WriteLine($"{reader[0]}, {reader[1]}, {reader[2]}"); }}

Next steps

  • To learn other methods to create a cluster, see Create an Azure Data Explorer cluster and database.
  • To learn more about ingestion methods, see Azure Data Explorer data ingestion.
  • Learn common operators of Kusto Query Language (KQL).
Top Articles
Latest Posts
Article information

Author: Nicola Considine CPA

Last Updated: 09/06/2023

Views: 6835

Rating: 4.9 / 5 (69 voted)

Reviews: 92% of readers found this page helpful

Author information

Name: Nicola Considine CPA

Birthday: 1993-02-26

Address: 3809 Clinton Inlet, East Aleisha, UT 46318-2392

Phone: +2681424145499

Job: Government Technician

Hobby: Calligraphy, Lego building, Worldbuilding, Shooting, Bird watching, Shopping, Cooking

Introduction: My name is Nicola Considine CPA, I am a determined, witty, powerful, brainy, open, smiling, proud person who loves writing and wants to share my knowledge and understanding with you.