Copying data on Azure in code

by DotNetNerd 5. February 2020 09:04

I have recently been looking at copying entire collections of data on Azure, in a way that should run as either Azure Functions or Webjobs. This is useful for backups, and simply moving data between environments. I didn't come across too many good samples of how to do this, so I expect it can be a useful topic for others who need to do the same thing.

Copying blob storage containers

As always we first need the basics in place. So we spin up CloudBlobClients for the source and the target.

CloudStorageAccount sourceStorageConnectionString = CloudStorageAccount.Parse(_connectionStringSettings.AzureStorageConnection);
CloudStorageAccount targetStorageConnectionString = CloudStorageAccount.Parse(_connectionStringSettings.AzureStorageBackupConnection);
CloudBlobClient sourceCloudBlobClient = sourceStorageConnectionString.CreateCloudBlobClient();
CloudBlobClient targetCloudBlobClient = targetStorageConnectionString.CreateCloudBlobClient();

Then we get the CloudBlobContainers and ensure that the target exists.

string container = "my-super-duper-dumpster";

CloudBlobContainer sourceContainer = sourceCloudBlobClient.GetContainerReference(container);
CloudBlobContainer targetContainer = targetCloudBlobClient.GetContainerReference(container);

await targetContainer.CreateIfNotExistsAsync();

Now we need a way to iterate through all the blobs. It is quite useful to be able to do this for a variety of operations. Besides copying, we can use it to delete or modify certain blobs. So for this purpose I created this method, that runs through each item and takes some action.


private async Task ForeachBlob(CloudBlobContainer container, Func<CloudBlockBlob, Task> action)
{
    BlobContinuationToken blobContinuationToken = null;
    do
    {
        var resultSegment = await container.ListBlobsSegmentedAsync(
             prefix: null,
            useFlatBlobListing: true,
             blobListingDetails: BlobListingDetails.None,
            maxResults: null,
            currentToken: blobContinuationToken,
             options: null,
            operationContext: null
         );

        blobContinuationToken = resultSegment.ContinuationToken;
        foreach (CloudBlockBlob item in resultSegment.Results)
        {
            await action(item);                   
        }
    } while (blobContinuationToken != null);
}

With this in place we can start copying. In my case the containers are private, so we need to get a SAS token, so it takes a bit of code like this.

await ForeachBlob(sourceContainer, async item => {
    CloudBlockBlob sourceBlob = sourceContainer.GetBlockBlobReference(item.Name);

    var policy = new SharedAccessBlobPolicy
    {
        Permissions = SharedAccessBlobPermissions.Read,
        SharedAccessStartTime = DateTime.UtcNow.AddMinutes(-15),
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1)
     };
    string sourceBlobToken = sourceBlob.GetSharedAccessSignature(policy);
    string sourceBlobSAS = string.Format("{0}{1}", sourceBlob.Uri, sourceBlobToken);
    CloudBlobContainer targetContainer = targetCloudBlobClient.GetContainerReference(container);
    await targetContainer.CreateIfNotExistsAsync();
    CloudBlockBlob targetBlob = targetContainer.GetBlockBlobReference(item.Name);
    await targetBlob.DeleteIfExistsAsync();
    await targetBlob.StartCopyAsync(new Uri(sourceBlobSAS));
});

That gives us all the pieces we need. One small addition that I made was to time and log each operation I did. Putting Stopwatch start, stop etc calls around everything becomes quite ugly and drowns out the more important part of the code, so I made a runner like this, that can handle the minutiae of timing and logging.

public class Runner<T>
{
    readonly ILogger<T> _logger;

    public Runner(ILogger<T> logger)
    {
        _logger = logger;
    }
    public async Task Run(Func<Task> func, Func<Stopwatch, string> logTextFactory)
    {
        var sw = Stopwatch.StartNew();
        await func();
        sw.Stop();
        var logText = logTextFactory(sw);
        _logger.LogInformation(logText);         
    }
}

Then all it takes is to e.g. move the code that does the actual copying into its own method and wrap it like so.

await _runner.Run(async () => {
    await CopyBlobs(targetCloudBlobClient, container, sourceContainer);
}, sw => $"Copying from \"{sourceContainer.Uri}\" to \"{targetContainer.Uri}\" took {sw.Elapsed.ToString("c")}");

Copying CosmosDB collections

In the same way as I copied blob storage containers, I also needed to copy CosmosDB collections. It is simpler, especially because of the BulkExecutor which has been made for these kinds of scenarios, but I will include a sample here for completeness.

Again we do the basics and spin up source and target clients.

var jsonSerializerSettings = new JsonSerializerSettings { DateParseHandling = DateParseHandling.DateTimeOffset };
           
DocumentClient sourceClient = new DocumentClient(
    new Uri(_connectionStringSettings.DocumentDBConnection), _settings.DocumentStorageSettings.DocumentDbKey,
    jsonSerializerSettings, new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp });

DocumentClient targetClient = new DocumentClient(
    new Uri(_connectionStringSettings.DocumentDBBackupConnection), _settings.DocumentStorageSettings.DocumentDbBackupKey,
    jsonSerializerSettings, new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp });

With those in place we set up the BulkExecutor.

targetClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
targetClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;

IBulkExecutor bulkExecutor = new BulkExecutor(targetClient, documentCollection);
await bulkExecutor.InitializeAsync();

targetClient.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
targetClient.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;

Then we are ready to read all the documents in batches and pass each to the bulk executor.

var documentQuery = sourceClient.CreateDocumentQuery(
    UriFactory.CreateDocumentCollectionUri(_settings.DocumentStorageSettings.DatabaseName, "MySuperDuperCollection"),
    new FeedOptions() {/*MaxItemCount = 2000*/ }).AsDocumentQuery();

while (documentQuery.HasMoreResults)
{
    var feed = await documentQuery.ExecuteNextAsync();
   
    BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
      documents: feed.ToArray(),
      enableUpsert: false,
      disableAutomaticIdGeneration: true,
      maxConcurrencyPerPartitionKeyRange: null,
      maxInMemorySortingBatchSize: null);
}

Like in the first sample, we could wrap this part in a method and use the runner to do timing and logging.

To give a complete overview these are the libraries and versions used in my case.

image

Who am I?

My name is Christian Holm Diget, and I work as an independent consultant, in Denmark, where I write code, give advice on architecture and help with training. On the side I get to do a bit of speaking and help with miscellaneous community events.

Some of my primary focus areas are code quality, programming languages and using new technologies to provide value.

Microsoft Certified Professional Developer

Microsoft Most Valuable Professional

Month List

halk tv