Skip to main content

Notifications

Announcements

No record found.

Dynamics 365 Community / Blogs / Learn with Subs / Using Fan-in/out (Durable F...

Using Fan-in/out (Durable Functions) models to integrate with D365F&O

Subhad365 Profile Picture Subhad365 7 User Group Leader
 


¿Amigos, Commo estas?
Fan-out/fan-in refers to the pattern of executing multiple functions concurrently and then performing some aggregation on the results. This article explains a sample that uses Durable Functions to implement a fan-in/fan-out scenario. The sample is a durable function that backs up all or some of an app's site content into Azure Storage.
In layman's words: Azure functions are stateless executions, they would not able to retain anything between different calls. Durable functions allow them to retain things betweeen the calls.
This article is about leveraging Durable functions to perform CRUD operations to D365F&O, that too in a real time/near real time condition. 
Here is the scenario:
Background:
We have a large volume of incoming data payload coming through HTTP request body, and we need to make it to flow to Finance&Operations application. The following article explains how to implement a Duarable function, on such a situation, step by step.This solution also applies to Files coming to containers or information coming to Message queues or Event Grids. In Azure function, we call them 'Bindings' (triggers), and we need to change the bindings, depending on our need.
Basically, the operation works as a multithreaded batch, with each thread working independently of the other, making the overall process performance to boost up to a very, very high degree.
As an example: I would be using the following payload for Sales Order Pools:
[
     "Pools":
    {
        "dataAreaId": "usmf",
        "PoolId": "011",
        "PoolName": "Wholesale order pool11"        
    },
    {
        "dataAreaId": "usmf",
        "PoolId": "012",
        "PoolName": "Wholesale order pool12"
    },
    {
        "dataAreaId": "usmf",
        "PoolId": "013",
        "PoolName": "Wholesale order pool13"
    }
]

The basic concept behind the CRUD
Now let me explain the rest of the process:
a. Token generation logic: we are going to generate a short lived bearer-token, and pass it on to FnO for further executions. 
b. Storing/retrieving Azure App based parameters: this is most tricky part of the solution. Typically, all developers have a habit of storing environment variables at configuration level. But unfortunately, we cannot take the configuration to server, during the deployment. We can see what all the choices could we have here.   
c. Posting the payload to D365FnO for furtherance. 
Additionally (not shown here), you can include logic to validate the incoming data of various fields' data given, to take necessary action, before committing to D365FnO.
For ease of understanding, let me cover the 'point-b' first -- as 'point a' has dependencies on it. 
d. Finally we would be creating our Durable function.
Assumptions:
a. We are referring to a recurring integration URL in D365FnO, which has already been created/ready.
b. We are using here an XML payload -- just as an example.


Storing/retrieving Azure App based parameters
We all know to obtain a token from Windows Live URL, we need these mandatory parameters: Client Id, Client secret, grant type, scope (D365FnO URL) and of course, not to mention, the window live token URL. We can achieve this directly, by keeping the values of the parameters openly in the code:





However, its never recommended to write these variables openly in the code. Instead, what we can do is:
1. We can keep them in Azure Key Vault by defining them as secrets:





2. Defining the secrets like this, can let you access them, by the following process:
3. In your function app project, define a new class called: KeyVaultManager. Go back to Azure Portal >> in your Azure function app which you will use to deploy your function >> Go to Configuration >> under the Application settings tab >>  you can define variable called 'VaultUri' (will show shortly, why we need this):



How to get the value of this variable: go back to the KeyVault which you created to store your Azure App Registration Parameters >> copy the value of Vault URI 




4. We can now come back to our code and start writing the logic to retrieve the value of these App registration parameters >> in the class KeyVaultManager (created above) >> use the following method:
 public string GetSecretValue(string secretName)
        {
            
            string kvUri = Environment.GetEnvironmentVariable("VAULTURI");
            var client = new SecretClient(new Uri(kvUri), new DefaultAzureCredential());
            var secret = client.GetSecretAsync(secretName).GetAwaiter().GetResult().Value;
            return secret.Value;
        }
Look at how the variable of the VaultURI is fetched. Its playing with "Azure.Security.KeyVault.Secrets" to fetch the value of the variable of the name = secretName, passed as parameter to the method.
For our testing purpose, we can also define the VaultURI at the 'local.settings.JSON' file:





However, as stated previously, problem with function app deployment is: the publish doesn't copy the local settings file to server. Hence is this arrangement.
Please note: Alternately, you can keep all your App registration parameters in the configuration settings of your function app, instead of Key Vaults. In fact, this is a better arrangement, than keeping and fetching staffs from Azure Key-Vaults. There is a 'await' in the above method of GetSecretValue, which eventually could result in some performance tradeoffs. 

Token generation logic
Once the above step is complete, we can focus now on creating the token. We can create a new class as under the same namespace of your function app: TokenGenerator.
You can define the following method here, that can obtain the various parameters defined for Azure App, store them inside a collection class and then do a POST to the token generation URL:
public string GetToken(ILogger log)
        {
            string responseInString = String.Empty;
            using (var wb = new WebClient())
            {
                KeyVaultManager kvManager = new KeyVaultManager();


                string tokenUrl = kvManager.GetSecretValue("tokenurluat");


                var data = new NameValueCollection();
                data["client_id"] = kvManager.GetSecretValue("clientiduat");
                data["client_secret"] = kvManager.GetSecretValue("secretvaluat");


                data["grant_type"] = "client_credentials";
                data["Scope"] = kvManager.GetSecretValue("scopeurl");


                var response = wb.UploadValues(tokenUrl, "POST", data);
                responseInString = this.getTokenValue(Encoding.UTF8.GetString(response));
            }
            return responseInString;
        }
As you can guess, the return response is a JSON object, out of which we must take out the value of token, under the node 'access-token'. 





For ease of calculation, I have added one more method called: getTokenValue, which can be something like this:
public string getTokenValue(string responsePayload)
        {
            dynamic json= JObject.Parse(responsePayload);
            string accessToken = json.access_token;


            return accessToken;
        }
Its referring to 'dynamic' datatype, which evaluates/resolves any payload at the runtime, and is used to fetch the value of a node called 'access_token'.
Posting the payload to D365FnO for furtherance
And finally, we can now focus on creating the logic to pass on the obtained token to call our D365FnO URLs. You can come back to the main body of your function app. Here also, I have created a method that accepts the payload and then calls to D365FnO URL, by attaching the token:
public static string Call2EnqueueMethod(string token, string payload, ILogger log)
        {
            string URL = <You can define a key vault secret to hold the D365FnO Enqueue URL and fetch it by the process described above>;
            HttpWebRequest request = (HttpWebRequest)WebRequest.Create(URL);
            request.Method = "POST";
            request.ContentType = "application/JSON";
            request.ContentLength = payload.Length;
            StreamWriter requestWriter = new StreamWriter(request.GetRequestStream(), System.Text.Encoding.ASCII);
            request.Headers.Add("Authorization", string.Format("Bearer {0}", token));
            requestWriter.Write(payload);
            requestWriter.Close();
            string response = String.Empty;
            try
            {
                WebResponse webResponse = request.GetResponse();
                Stream webStream = webResponse.GetResponseStream();
                StreamReader responseReader = new StreamReader(webStream);
                response = responseReader.ReadToEnd();


                responseReader.Close();


            }
            catch (Exception e)
            {
                log.LogInformation(e.Message);
            }


            return response;
        }
Look at the parameters: its passing on the token, and the XML payload obtained in the body of the HTTP. The bearer tokens are entertained, only when you add the keyword 'Bearer' at the start of the token for identification:
request.Headers.Add("Authorization", string.Format("Bearer {0}", token));
Hence it has been added in the header.
Lastly you can call this method, from the main body of your function app as:
public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
            ILogger log)
        {
            log.LogInformation("C# HTTP trigger function processed a request.");
            TokenGenerator generator = new TokenGenerator();
            string token = generator.GetToken(log);
            if (token != string.Empty)
            {
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
                string responseMessage = Call2EnqueueMethod(token, requestBody, log);
                return new OkObjectResult(responseMessage);
            }
            else
            {
                return new OkObjectResult("Token generation failed");
            }            
        }   
Ok, now we will be calling this class method over our Durable function:
Start with Visual studio and create a Durable Function Orchestrator like this:




Step 1:
Create a class to generate token like this:

using Azure.Identity;
using Azure.Security.KeyVault.Secrets;
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Azure.Security.KeyVault.Secrets;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;

namespace DF_SalesPool_MultiThreadedDemo
{
    internal class GenerateToken
    {
        public string GetSecretValue(string secretName)

        {
            string kvUri = Environment.GetEnvironmentVariable("VAULTURI");

            var client = new SecretClient(new Uri(kvUri), new DefaultAzureCredential());

            var secret = client.GetSecretAsync(secretName).GetAwaiter().GetResult().Value;

            return secret.Value;

        }

        public string GetToken(ILogger log)
        {
            string responseInString = String.Empty;

            using (var wb = new WebClient())
            {
                GenerateToken kvManager = new GenerateToken();

                string tokenUrl = kvManager.GetSecretValue("tokenurluat");



                var data = new NameValueCollection();

                data["client_id"] = kvManager.GetSecretValue("clientiduat");

                data["client_secret"] = kvManager.GetSecretValue("secretvaluat");



                data["grant_type"] = "client_credentials";

                data["Scope"] = kvManager.GetSecretValue("scopeurl");



                var response = wb.UploadValues(tokenUrl, "POST", data);

                responseInString = this.getTokenValue(Encoding.UTF8.GetString(response));

            }

            return responseInString;

        }

        public string getTokenValue(string responsePayload)
        {
            dynamic json = JObject.Parse(responsePayload);

            string accessToken = json.access_token;

            return accessToken;

        }
    }
}

Step 2:
Come back to you Azure function class, and create another class, that will accepot the JSON input string and split it as lists in as deseralized objects in the class:

using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;

namespace DF_SalesPool_MultiThreadedDemo
{
    public class PayloadElements
    {
        public string PoolRecords { get; set; }
        public List<PoolRecords> PoolList { get; set; }

    }

    public class PoolRecords
    {
        public string DataAreaId { get; set; }
        public string PoolId { get; set; }
        public string PoolName { get; set; }

        public PayloadElements SplitAndStore(string pools)
        {
            PayloadElements financialData = (PayloadElements)JsonConvert.DeserializeObject(pools, typeof(PayloadElements));
            var accounts = JsonConvert.DeserializeObject(pools) as JArray;

            foreach (var account in accounts)
            {
                if (financialData.PoolList == null)
                {
                    financialData.PoolList = new List<PoolRecords>();
                }

                financialData.PoolList.Add(JsonConvert.DeserializeObject<PoolRecords>(account.ToString()));
            }
            return financialData;
        }

    }
    
}


Step 3:
Now we are going to call the Durable function:
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Azure.Identity;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using OpenTelemetry.Logs;



namespace DF_SalesPool_MultiThreadedDemo
{
    public static class FunctionDemoSalesOrderPool
    {


        [FunctionName("FunctionDemoSalesOrderPool")]
        public static async Task<string> RunOrchestrator(
            [OrchestrationTrigger] IDurableOrchestrationContext context)
        {
            
            GenerateToken token = new GenerateToken();
            string tokenVl = token.GetToken();
            var jsonBody = context.GetInput<string>();


            PayloadElements output = await context.CallActivityAsync<PayloadElements>("GetListofRecords", jsonBody);
            List<PoolRecords> records = output.PoolList;
            var parallelTasks = new List<Task<string>>();

            for (int i = 0; i < records.Count; i++)
            {
                RequestObject obj = new RequestObject();
                obj.Payload = records[i].ToString();
                obj.TokenVal = tokenVl;
                Task <string> task = context.CallActivityAsync<string>("CreateRecord", obj);
                parallelTasks.Add(task);
            }
            await Task.WhenAll(parallelTasks);
            //Optionally you can process any further step: example in case of journals processing, you can call the posting routine here
            string result = await context.CallActivityAsync<string>("FinishCall", "");
            
            return result;
        }

        [FunctionName(nameof(FinishCall))]
        public static string FinishCall([ActivityTrigger] string result, ILogger log)
        {
            return "Process completed.";
        }

        [FunctionName(nameof(GetListofRecords))]
        public static PayloadElements GetListofRecords([ActivityTrigger] string json, ILogger log)
        {
            PoolRecords elements = new PoolRecords();
            PayloadElements element = elements.SplitAndStore(json);
            return element;
        }

        [FunctionName("CreateRecord")]
        public static async Task<string> CreateRecord(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] RequestObject obj,
            [DurableClient] IDurableOrchestrationClient starter,
            ILogger log)
        {
            string instanceId = await starter.StartNewAsync("CreateRecord", null);

            string URL = Environment.GetEnvironmentVariable("D365URL");

            HttpWebRequest request = (HttpWebRequest)WebRequest.Create(URL);
            string payload = obj.Payload;
            string token = obj.TokenVal

            request.Method = "POST";

            request.ContentType = "application/json";

            request.ContentLength = payload.Length;

            StreamWriter requestWriter = new StreamWriter(request.GetRequestStream(), System.Text.Encoding.ASCII);

            request.Headers.Add("Authorization", string.Format("Bearer {0}", token));

            requestWriter.Write(payload);

            requestWriter.Close();

            string response = String.Empty;
      
            try

            {

                WebResponse webResponse = request.GetResponse();

                Stream webStream = webResponse.GetResponseStream();

                StreamReader responseReader = new StreamReader(webStream);

                response = responseReader.ReadToEnd();

                responseReader.Close();
                
               

            }

            catch (Exception e)
            {
                
                log.LogInformation(e.Message);

            }


            return instanceId;

        }


    }
}

Look at the code, and see how it's working.

1. First we are staring with the method: FunctionDemoSalesOrderPool-- which is an Orchestrator function, It simply is the entry point from where everything starts.
2. We are getting individual records passed in an array and looping them in a for loop by calling another Azure function called GetListofRecords and calling another Azure function called CreateRecods to loop through them. 
3. I have created another class called: RequestObject which simply gets the payload and tokenvalue in variables and store them:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DF_SalesPool_MultiThreadedDemo
{
    internal class RequestObject
    {
        string payload;
        string tokenVal;

        public string Payload{get; set;}
        public string TokenVal{get; set;}


    }
}

We are passing this class as as a parameter between orchestrator and unwrappiing them in the createRecord method:  
string payload = obj.Payload;
string token = obj.TokenVal

4. Optionally we are fanning out the entire execution by calling an ending function called: FinishCall. This function is typically required when you want to do something at the end of the entire execution. For example, you are processing the sales order lines, and loading them into sales orders. You want to ensure they get invoiced, once all the lines are loaded. Hence you can add a dependency saying that:
await Task.WhenAll(parallelTasks);
Which I am doing in the orchestartor itself.

With that, let me conclude the topic here. Let me be back with another blog, soon. Till then much love and Namaste as always.

Comments

*This post is locked for comments