Wednesday, March 28, 2018

WFA sub-workflow runner template (parallel workflows)

Invoking a workflow in a workflow.  It's been done before.  Of course we can have subworkflows (1 level deep).
But I'm talking about invoking a loop-back (localhost) rest-call.

When done in the traditional way, usually for something simple like the default "acquire datasource" workflow, it's something like : invoke-rest - wait - check result.

Maybe even with a repeat-row, like Tim Kleingeld did with his lun migrations a few years back.

If we mean serious business, we need something more solid, more thought-through.



First of all, a big THANK YOU to José Fernandez Diaz for working out this concept.  José is a contractor in the UK, working 24/7 with WFA and is pushing WFA to its limits.

Runspaces to the riscue

The concept and this template is using runspaces, so definitely check out my previous post about runspaces.

We just wrap this concept in a WFA command.

Purpose

The purpose of this runner-template is to process massive amounts of workflow-calls.  Imagine that you wrote a monster workflow to provision volumes (with snapmirror, snapvault, ... the full monty)

And now you need to run this 100 times.  Maybe it's so massive that you need to build in auto-resumes, timeout-handling, etc...

Some real-life examples where I'm using this already

  • provisioning for massive migrations
  • post enabling snapmirrors
  • creating storageX policies
  • creating DFS links

The Concept & Database driven

This is how it works
  • Load a dataset from somewhere (a database is an excellent choice)
  • For the whole dataset invoke workflows in parallel
  • Set concurrency (limit the amount of slots => read the runspace post to understand !)
  • Set timeout, maybe the dataset is sooo massive, we want to limit the runtime
  • Handle workflow results (a database for example, to keep track of the status/result)
  • Maybe auto resume (maybe your sub-workflow has dependencies and sometimes a simple resume is enough - call it a retry)
  • Enable logging (again, a database is an excellent choice)

The Code

At the end of this post I've added a sample DAR.  But because I don't want to add any dependencies, I'm using a dummy workflow and some dummy data.

In the code below I've added lots of comments.  This is advanced stuff, I know.  PM me if you are stuck.

For this code to work, you also need to add loop-back credentials so the workflow can talk localhost.
By default I assume you add credentials for "localhost".  The WFA server is also defaulted to localhost.

Param(
    [parameter(Mandatory = $true, HelpMessage = "Database server")]
    [string]$Dbserver,

    [parameter(Mandatory = $true, HelpMessage = "Database Name")]
    [string]$DbName,
  
    [parameter(Mandatory = $false, HelpMessage = "database credentials from WFA stored credentials")]
    [string]$DbCredential,   

    [parameter(Mandatory = $false, HelpMessage = "WFA server")]
    [string]$WfaServer = "localhost",  

    [parameter(Mandatory = $false, HelpMessage = "WFA server credentials from WFA stored credentials")]
    [string]$Wfacredential = "localhost",  

    [parameter(Mandatory = $true, HelpMessage = "Workflow name to run")]
    [string]$WorkflowName

    #################################
    # ADD MORE CUSTOM PARAMETERS
    #################################


)

###############################
# constants
###############################

$ErrorActionPreference       = "Stop"
$maxThreads                  = 6                            # concurrent jobs
$maxJobs                     = 10                           # max jobs in waiting queue
$jobCheckSleepTimer          = 20                           # time between job checks
$jobAddSleepTimer            = 2                            # time between job adds
$httpRetrySleepTimer         = 30                           # time to wait and start resume/retry
$getWorkflowResultSleepTimer = 10                           # time to retry to get the workflowResult
$maxRunningTimeMinutes       = 120                          # Time when the last workflow will be scheduled, maxtime stop
$timeout                     = new-timespan -Minutes 5     # Workflow timeout
$resumeTimeout               = new-timespan -Minutes 3     # resume time
$maxInvokeWorkflowTries      = 1                            # how many times do we try to start the workflow
$maxRunWorkflowTries         = 1                            # how many times do we try to run the workflow (if > 1, resumes will be attempted)
$retryRunErrorMatches        = "please retry","Temporary", "Unauthorized" # if the workflow returns an error like these, we retry multiple times
$processJobsInterval         = 5                            # process jobs after each batch of x jobs

###############################
# function definitions
###############################

# SAMPLE FUNCTION TO EXECUTE MS SQL - YOU COULD ADD ONE FOR MYSQL, ....
# NOTICE THE RETRY PART, IN CASE YOU HAVE A LOCK-ERROR FOR EXAMPLE
# WE USE THIS TO INTERACT WITH THE DATABASE, MAYBE SET THE STATUS OF SOMETHING
function ExecuteNonQuery{

    param([string]$sqlQuery)
    
    
    $maxtries = 5
    $tries = 0
    $complete = $false
    
    do{
    
        try{
            $SqlCmd = New-Object System.Data.SqlClient.SqlCommand
            $SqlCmd.CommandText = $sqlQuery
            $SqlCmd.Connection = $sqlConnection
            $result = $SqlCmd.ExecuteNonQuery() 
            $complete = $true
        }
        catch
        {
        
            $errorMessage = $_.exception.message
        
            get-wfalogger -warn -message $("Execute non query failed: " + $sqlQuery)
            get-wfalogger -warn -message $("Error message: " + $errormessage)
            $tries++
            if($tries -ge $maxtries)
            {
            
                throw $_
            
            }
            get-wfalogger -warn -message $("Re-trying query")
            start-sleep -seconds 3
        }
        
    }
    until($complete)
    return $result
}



# A SAMPLE FUNCTION TO ADD DATABASE LOGGING
# IN THIS EXAMPLE I'M ADDING NOTES TO MIGRATION ITEMS
# AGAIN, A RETRY IS BUILT IN
function writeSQLnote
{
    param(
        $migrationID,$note
    
    )


    $storedProcedureName = "sp_new_t_note"
    $SqlCmd = $SqlConnection.CreateCommand()
    $storedProcedureFullName = $SQLDBName + ".dbo." + $storedProcedureName
    $SqlCmd.CommandText = $storedProcedureFullName
    $SqlCmd.CommandType=[System.Data.CommandType]::StoredProcedure

    $SqlCmd.Parameters.AddWithValue("@note", $note) | out-null 
    $SqlCmd.Parameters.AddWithValue("@migration_id", $migrationID) | out-null
    $SqlCmd.Parameters.AddWithValue("@updated_by", "wfa_runner") | out-null
    $SqlConnection.Open()
    $dummy = $SqlCmd.ExecuteNonQuery()
    $SqlConnection.Close() 

}

# THIS FUNCTION IS OPTIONALLY USED FOR THE FOLLOWING SITUATIONS
# WHAT IF MY WORKFLOW HAS A SILENT CONTINUE SOMEWHERE
# A WARNING OF SOME KIND THAT DOESN'T FAIL, BUT MIGHT REQUIRE 
# SOME ATTENTION.   SO I CREATED A WFA SCHEME "workflowjob"
# AND IN MY SILENTLY FAILING COMMAND, I'VE ADD DATABASE LOGGING
# WHERE I KEEP TRACK OF THE WFA-JOBID, A TYPE (FAIL/SUCCES/WARNING)
# AND A CODE (IDENTIFY WHERE SOMETHING WENT WRONG)
function CheckWorkflowJobStatus{
 param (

  [parameter(Mandatory=$true, HelpMessage="Status Code to Check")]
  [string]$StatusCode,

  [parameter(Mandatory=$true, HelpMessage="Status Type to Check")]
  [ValidateSet('success','fail')]
  [string]$StatusType,
  
  [parameter(Mandatory=$true, HelpMessage="JobId")]
  [int]$JobId
 )

 # check in db
 $cmd1 = "SELECT 1 FROM workflowjob.jobstatus WHERE StatusType='$StatusType' AND StatusCode='$StatusCode' AND JobId=$JobId"

 $maxtries=3
 $tries=1
 do{
  try{
   Get-WFALogger -Info -message $("Checking $StatusCode [$StatusType] in db")            
            $result = Invoke-MySqlQuery -query $cmd1
            return ($result[0])
  }catch{
   $tries++
   Start-Sleep -Seconds 3
  }
    }while($tries -le $maxtries)
    Get-WFALogger -Info -message $("Failed to execute query $cmd1") 
    return 0
}

# THIS FUNCTION IS A MORE ADVANCED INVOKE-RESTMETHOD FUNCTION.  José Fernandez Diaz HAS
# ENCOUNTERED SOME ISSUES / LIMITS WITH THE COMMON invoke-restmethod (I'LL TAKE HIS WORD FOR IT)
function Invoke-HttpCommand() {
    # A replacement for invoke-restmethod, which is sometimes causing troubles
    param(
        [string] $target,
        [string] $username = $null,
        [string] $password = $null, 
        [string] $verb, 
        [string] $content
    )
    #Write-host "$target" -ForegroundColor Cyan
    #Write-host "$content" -ForegroundColor Magenta
    # prepare http client 
    [System.Net.ServicePointManager]::ServerCertificateValidationCallback = {$true}
    $webRequest = [System.Net.WebRequest]::Create($target)
    $encodedContent = [System.Text.Encoding]::UTF8.GetBytes($content)
    $auth = [System.Text.Encoding]::UTF8.GetBytes($username + ":" + $password)
    $base64 = [System.Convert]::ToBase64String($auth)
    $webRequest.Headers.Add("AUTHORIZATION", "Basic $base64"); 
    $webRequest.PreAuthenticate = $true
    $webRequest.Method = $verb
    if ($encodedContent.length -gt 0) {
        $webRequest.ContentLength = $encodedContent.length
        $requestStream = $webRequest.GetRequestStream()
        $requestStream.Write($encodedContent, 0, $encodedContent.length)
        $requestStream.Close()
    }
    try {
        # invoke the call
        [System.Net.WebResponse] $resp = $webRequest.GetResponse();
    }
    catch {

        try {
            $ex = $_
            $rs = ([System.Net.WebResponse]$_.Exception.InnerException.Response).getresponseStream()
            [System.IO.StreamReader] $sr = New-Object System.IO.StreamReader -argumentList $rs;
            [string] $errorContent = $sr.ReadToEnd();
            $ex = ([string]$_ + "`r`n" + [string]$errorContent)
        }
        finally {
            Throw $ex
        }
    }

    if ($resp -ne $null) {
        $rs = $resp.GetResponseStream();
        [System.IO.StreamReader] $sr = New-Object System.IO.StreamReader -argumentList $rs;
        [string] $results = $sr.ReadToEnd();
        [System.Net.ServicePointManager]::ServerCertificateValidationCallback = {$false}
        return $results;
    }
    [System.Net.ServicePointManager]::ServerCertificateValidationCallback = {$false}
    return '';
}

# A LOGGING FUNCTION
# FEEL FREE TO REPLACE THIS WITH MORE ADVANCED LOGGING
# SEE MY POST ON LOG4NET LOGGING TO EVENTVIEWER AND ROLLING LOGFILE
function log {
    # alternative log function, infact, add whatever logger you want here.
    # in this case we just wrap get-wfalogger
    param(
        [ValidateSet("info", "warning", "error")]
        [string]$severity = "info",
        [string]$message = ""
    )
    $message = $message -replace "`"","'"
    [string]$wfaseverity = $severity
    if ($severity -eq "warning") {
        $wfaseverity = "warn"
    }
    $expression = ("get-wfalogger -{0} -message `"{1}`"" -f $wfaseverity,$message)
    try{
        invoke-expression $expression -erroraction Stop  
    }catch{
        write-host "error with expression : $expression" -ForegroundColor magenta
    }  
       
}

# THIS IS TO GET RUNSPACE JOB INFO
# CHECK OUT MY POST ON RUNSPACES
function getJobStats{
    param(
        $max,
        $maxJobs
    )
    $running = @($jobs).Count
    $finished = $max - $running
    $reallyRunning = @($jobs | ?{
        # really important line here.  check result first !
        if($_.Result){
            -not $_.Result.IsCompleted
        }else{
            $false
        }
    }).Count
    if($max -eq 0){
        $percent = 100
    }else{
 $percent = [int]($finished/$max*100)
    }
    $o = @{
        running = $running
        open = $reallyRunning
        finished = $finished
        percent = $percent
        hasFreeSlot = ($reallyRunning -lt $maxJobs)
        isFinished = (-not $running)
    }
    New-Object -TypeName psobject -Property $o
}

# THIS FUNCTION HANDLES YOUR WORKFLOW RESULTS
# YOU WILL WANT TO MODIFY THIS PART !!
# I'VE ADD SOME SAMPLES ALREADY
function processFinishedJobs(){
    log -message "...processing jobs..."
    # handle finished jobs
    ForEach ($job in @($jobs | ?{$_.Result.IsCompleted})){

        try {
     $result = "success"
            $jobOutput = $job.job.EndInvoke($job.result);
            $wfaJobId = ($jobOutput -split ";", 3)[0]
            $executionStatus = ($jobOutput -split ";", 3)[1]
            $executionReturn = [xml]($jobOutput -split ";", 3)[2]
    
            # here we check some silent continue problem, optional and just an example
            # the idea is that subworkflow was success, but might have flagged something 
            # we use a mysql scheme as communication platform, see function CheckWorkflowJobStatus
            $myCustomErrorOrWarningFound=CheckWorkflowJobStatus -StatusType 'fail' -StatusCode 'somewhere_in_my_workflow' -JobId $wfaJobId
            if($myCustomErrorOrWarningFound -gt 0){
                # note I'm logging some custom fields here
                # when we add a job to the job-list we add some custom metadata
                # perfect to use the metadata for logging
                log -severity "error" -message ("Subworkflow for [{0}][{1}] is partially successful.  This {2} went wrong.  Check details for WFA job: {3}" -f $job.source.field1,$job.source.field2,$job.source.field3, $wfaJobId )
          $result = "my_custom_result"
            }

            # here we check partially successful
            # a workflow can be partially successful
            # note I'm logging some custom fields here
            # when we add a job to the job-list we add some custom metadata
            # perfect to use the metadata for logging            
            if($executionStatus -eq "PARTIALLY_SUCCESSFUL") {
                log -severity "error" -message ("Subworkflow for [{0}][{1}] is partially successful. Check details for WFA job: {2}" -f $job.source.field1,$job.source.field2,$wfaJobId )
          $result = "partially_success"
            }

            # anyhow, when we reach this, the workflow finished
            # note I'm logging some custom fields here
            # when we add a job to the job-list we add some custom metadata
            # perfect to use the metadata for logging            
            log -message ("Completed subworkflow for {0} {1}" -f $job.source.field1,$job.source.field2)
        }
        catch {
            # Oops, the workflow failed... shit happens :)
            # note I'm logging some custom fields here
            # when we add a job to the job-list we add some custom metadata
            # perfect to use the metadata for logging              
            $result = "fail"
            $errorMessage = $_.exception.message
            log -severity "warning" -message ("Subworkflow for [{0}][{1}] failed. Check the job status. Reported error: {2}" -f $job.source.field1,$job.source.field2, $errorMessage )
        }

        # here I use my sample function, where I add a note in my database
        # customize as you please
        writeSQLnote -migrationId $job.source.id -note $result

        # here you can grab the return parameters and perhaps add you meta data to it
        # we finally push the results an array

        #$results += New-Object PSObject -Property @{ 
        #    "result" = $jobOutput;
        #    "field2" = $job.source.field2;
        #    "field1" = $job.source.field1
        #}

        $job.job.Dispose()
        $job.job = $Null
        $job.result = $Null
        $jobs.remove($job)
    }
}

# XML ENCODING FUNCTIONS
# WE USE XML IN OUR REST-CALL'S
# WE NEED TO MAKE SURE PROPER ENCODING IS USED
function prepareForXMLstring {
    # encodes a string for xml use
    param(
        $in
    )
    return ($in -replace ("&", "&")) -replace ("`"", """)
}
function prepareForXMLboolean {
    # encodes a boolean for xml use
    param(
        $in
    )
    if ([boolean]$in) {
        return "true"
    }
    return "false"
}

# THIS FUNCTION NEEDS TO BE CUSTOMIZE
# HERE WE PREP OUR WORKFLOW POST-BODY
# USE THE XML ENCODING FUNCTIONS
function getWorkflowBody {
    # prepare the userinput rest body
    param(
        $source
    )
    
    # get input data, coming from database
    $field1 = prepareForXMLstring ("`"" + $source.field1 + "`"")
    $field2 = prepareForXMLstring ("`"" + $source.field2 + "`"")
    
    # prep input data, just some sample here
    $post = [string]"<workflowInput><userInputValues>"
    $post = $post + '<userInputEntry key="field1" value="' + $field1 + '"/>'
    $post = $post + '<userInputEntry key="field2" value="' + $field2 + '"/>'
    $post = $post + '<userInputEntry key="field3" value="true"/>'
    # ADD MORE FIELDS
    $post = $post + "</userInputValues>"

    # ADD COMMENT, WHAT IS THIS WORKFLOW CALL ABOUT ?
    $ExecutionComment = "subworkflow, blabla : field1 " + $source.field1 + ", path " + $source.field2
    if ($ExecutionComment) {
        $post = $post + "<comments>" + (prepareForXMLstring $ExecutionComment) + "</comments>"
    }
    $post = $post + "</workflowInput>"
    $post = [xml]$post
    return $post.outerXML
}

# FUNCTION WHERE WE GRAB OUR DATA
# CUSTOMIZE THIS, I'M SHOWING A SAMPLE
# TO GRAB DATA FROM A MSSQL DATABASE
# YOU COULD GRAB FROM MySQL AND USE CLASSIC
# WFA FUNCTION invoke-mysqlquery
function getOurMainData {
    # Get data to process from the database
    param(
        [string]$filerName
    )
    $SQLquery = @"
    use my_database;
    SELECT
        id,
        field1,
        field2,
        ...
    FROM 
        my_table
    WHERE 
        my_filters
"@

    $SqlCmd = New-Object System.Data.SqlClient.SqlCommand
    $SqlCmd.CommandText = $SQLquery  # maybe customize using the -f flag and the classic {} placeholders
    $SqlCmd.Connection = $SqlConnection

    $reader = $SqlCmd.ExecuteReader()

    # Reads the dataset and stores them in results
    $results = @()
    log -message $("Reading results") 
    while ($reader.Read()) {
        $row = @{}
        for ($i = 0; $i -lt $reader.FieldCount; $i++) {
            $row[$reader.GetName($i)] = $reader.GetValue($i)
        }
        $results += new-object psobject -property $row  
    }
    $reader.close()

    $outHash = @{}
    foreach ($record in $results) {
        $outHash[$record.id] = $record
    }

    return $outHash
}

# HERE IS WHERE THE MAGIC HAPPENS
# WHERE ARE HANDLING WORKFLOW CALL HERE
# WE WILL CALL THE WORKFLOW AND WAIT FOR THE OUTCOME
# IS A CLASSIC WFA REST CALL HANDLER
# ScriptBlock to run on every job
$WorkflowCall = {
    Param (
        $WfaServer, 
        $WorkflowUID, 
        $User, 
        $Password, 
        $Body
    )

    # create a stopwatch
    $stopwatch = [diagnostics.stopwatch]::StartNew()

    # run the workflow
    $url = "https://" + $WfaServer + "/rest/workflows/" + $WorkflowUID + "/jobs"
    $try = 0
    $done = $false
    do {
        try {
            $results = Invoke-HttpCommand $url $User $Password "POST" $Body
            $jobs = [xml] $results
            $done = $true
        }
        catch {
            # catch errors - retry if needed
            # HERE WE ENABLE RETRY IN CASE OF SOME TEMP ERRORS LIKE AUTHENTICATION, etc... SEE CONSTANTS ABOVE
            $errormessage = $_.exception.message
            $errormatches = "(" + ($retryRunErrorMatches-join "|") + ")"

            # does it match a retry scenario ?
            if (($errorMessage -imatch $errormatches) -and $try -le $maxInvokeWorkflowTries) {
                # wait a bit before retry, maybe wfa was too busy.
                start-sleep -seconds $httpRetrySleepTimer
                $try++                
            }
            else {
                # in other case, rethrow error
                throw
            }
        }
    }
    until($done)
        
    # get the workflow result (async)
    $currentJobId = $jobs.job.JobId
    $jobUrl = $jobUrl = $url + "/" + $jobs.job.JobId 
    $try = 0

    do {
        # try to get the workflowresult
        start-sleep -seconds $getWorkflowResultSleepTimer
        try {
            $try++
            $results = Invoke-HttpCommand $JobUrl $user $password "GET"
        }
        catch {
            # stop trying if we hit the timeout
            if ($stopwatch.elapsed -le $resumeTimeout) {
                continue
            }
            else {
                throw $_
            }
        }

        # get the result info
        $job = [xml] $results
        $jobStatus = $job.job.jobStatus.jobStatus
            
        # what is the current status ?
        switch ($jobStatus) {
            "SCHEDULED" {break; }
            "PLANNING" {break; }
            "PENDING" {break; }
            "EXECUTING" {break; }
            "ABORTING" {throw "ABORTING CHILD" ; break; }
            "CANCELED" {throw "CANCELED CHILD" ; break; }
            "COMPLETED" {
                # completed ?  Return the result and stop.
                return ($currentJobId + ";" + $jobStatus + ";" + $job.job.jobStatus.returnParameters.outerxml)
                break
            }
            "PARTIALLY_SUCCESSFUL" {
                # partially completed ?  Return the result and stop.
                return ($currentJobId + ";" + $jobStatus + ";" + $job.job.jobStatus.returnParameters.outerxml)
                break
            }               
            "FAILED" {
                # failed ?  Maybe we can resume ?
                # are we allowed to try a resume ?
                if ($try -lt $maxRunWorkflowTries -and $stopwatch.elapsed -le $resumeTimeout) {

                    $lasterror = ""
                    $lasterror = $job.job.jobStatus.errorMessage

                    ######################################################################
                    # ADD LOGIC HERE TO THROW AN ERROR IN CASE IT'S POINTLESS TO RESUME !
                    # THE WORKFLOW ERROR FINAL AND WE DON'T WANT ANY RESUMES
                    ######################################################################
                    # if ($lasterrror -eq "..."){
                    #     throw "No resume possible - last error : $lasterror"
                    # }

                    # THE BELOW IS OPTIONAL, MAYBE YOU HAVE RESUME SCENARIO'S
                    # resume
                    $currentJobId = Get-WfaRestParameter -name "JobId"
                    $url = $JobUrl + "/resume"
                    $resumePost = "<workflowInput>" + $job.job.jobStatus.userInputValues.OuterXml + "<comments>" + $job.job.jobStatus.comment + " - Resumed" + "</comments>" + "</workflowInput>"
                    $resumePost = [xml]$resumePost
                    try {
                        $try++
                        $results = Invoke-HttpCommand $url $user $password "POST" $resumePost.outerXML
                    }
                    catch {
                        
                        # if the resume fails, let's not try again.  Just throw
                        $exceptionError = $_.exception.message
                        if ([boolean]$lasterror) {
                            $exceptionError += ". Previous error message: " + $lasterror
                        }
                        
                        $errorMessage = "WFA job: " + $jobs.job.JobId + " failed with error: " + $exceptionError
                        throw $errorMessage
                    }

                    # wait a bit to allow the resume to kick in.
                    start-sleep -seconds $httpRetrySleepTimer
                }
                else {
                    # not allowed to resume ?  Then return why it failed...
                    $errorMessage = "WFA job: " + $jobs.job.JobId + " failed with error: " + $lasterror
                    throw $errorMessage
                }
                break
            }
            # unknown return status - throw it
            default {throw ("--Unexpected Message - {0} - {1}" -f $job.job.JobId,$jobStatus) ; break; }            
        }

    } until ($stopwatch.elapsed -gt $timeout) # run until finished or until timeout

    # if we got here, it means we stopped trying and hit the time out
    # we stop monitoring, and throw the error.
    $errorMessage = "Job " + $currentJobId + " timed out while waiting."
    if ($lasterror) {
        $errorMessage = $errorMessage + " Last error message: " + $lasterror
    }
    else {
        $errorMessage = $errorMessage + " Job has not been resumed."
    }
    throw $errorMessage

}

#######################################
# START MAIN CODE
#######################################

# get credentials 
$dbCred = Get-WfaCredentials -Host $DbCredential
$sqlUser = ($dbCred.getnetworkcredential()).username 
$sqlPassword = ($dbCred.getnetworkcredential()).Password
$wfaCred = Get-WfaCredentials -Host $WfaCredentials
$wfaUser = ($wfaCred.getnetworkcredential()).username 
$wfaPassword = ($wfaCred.getnetworkcredential()).Password

# get workflow information
$url = "https://" + $WfaServer + "/rest/workflows?name=" + $WorkflowName
$uidResult = Invoke-HttpCommand $url $wfaUser $wfaPassword GET
$workflowUID = ([xml]$uidResult).collection.workflow.uuid

# connecting to the SQL database
# OPTIONAL, WE NEED TO GET OUR DATA SOMEWHERE
# IN THIS SAMPLE, WE GRAB FROM SQL SERVER
$SQLServer = $dbserver
$SQLDBName = $dbName
$SqlConnection = New-Object System.Data.SqlClient.SqlConnection
$ConnectionString = "Server=$SQLServer;Database=$SQLDBName;Uid=$SQLUser;Pwd=$SQLPassword;"
$SqlConnection.ConnectionString = $ConnectionString
log -message "Connecting to SQL Server $SQLServer"
$SqlConnection.Open()

# get information to process
# AGAIN, YOU CAN GET YOUR DATA FROM ANYWHERE YOUR WANT
$sourceHash = getOurMainData

# array for returnparameters and result
$results = @()

# sql no longer needed, closing
$SqlConnection.close()

##########################################
# Multithreading the API calls
# Adding the runtime initial session state
##########################################

# HERE WE START THE RUNSPACE CODE - LOOK AT MY RUNSPACE POST FOR MORE INFO 
# http://www.wfaguy.com/2018/03/run-powershell-scripts-in-parallel.html

$iss = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()

#---------------------------------------
# Adding functions & variables if needed
#---------------------------------------

# Add invoke-http command
$definitionInvokeHttpCommandFunction = Get-Content Function:\Invoke-HttpCommand -ErrorAction Stop
$sessionStateInvokeHttpCommandFunction = New-Object System.Management.Automation.Runspaces.SessionStateFunctionEntry -ArgumentList 'Invoke-HttpCommand', $definitionInvokeHttpCommandFunction
$iss.Commands.Add($sessionStateInvokeHttpCommandFunction)
# Add timeout variable
$timeoutVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'timeout',$timeout,$Null
$iss.Variables.Add($timeoutVariable)
# Add resumeTimeout variable
$resumeTimeoutVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'resumeTimeout',$resumeTimeout,$Null
$iss.Variables.Add($resumeTimeoutVariable)
# Add maxInvokeWorkflowTries variable
$maxInvokeWorkflowTriesVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'maxInvokeWorkflowTries',$maxInvokeWorkflowTries,$Null
$iss.Variables.Add($maxInvokeWorkflowTriesVariable)
# Add maxRunWorkflowTries variable
$maxRunWorkflowTriesVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'maxRunWorkflowTries',$maxRunWorkflowTries,$Null
$iss.Variables.Add($maxRunWorkflowTriesVariable)
# Add httpRetrySleepTimer variable
$httpRetrySleepTimerVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'httpRetrySleepTimer',$httpRetrySleepTimer,$Null
$iss.Variables.Add($httpRetrySleepTimerVariable)
# Add retryRunErrorMatches variable
$retryRunErrorMatchesVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'retryRunErrorMatches',$retryRunErrorMatches,$Null
$iss.Variables.Add($retryRunErrorMatchesVariable)
# Add getWorkflowResultSleepTimer variable
$getWorkflowResultSleepTimerVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'getWorkflowResultSleepTimer',$getWorkflowResultSleepTimer,$Null
$iss.Variables.Add($getWorkflowResultSleepTimerVariable)


# create the runspacepool
$RunspacePool = [runspacefactory]::CreateRunspacePool(
    1, #Min Runspaces
    $maxThreads, #Max Runspaces
    $iss, # our defined session state
    $host #PowerShell host
)
$RunspacePool.Open()

# Let's time everything
$timeout = new-timespan -Minutes $maxRunningTimeMinutes
$stopwatch = [diagnostics.stopwatch]::StartNew()
 
# runtime job containers
[System.Collections.ArrayList]$jobs = @()
$i = 0
$max = $sourceHash.count

# START !
$activity = "Adding jobs with max $maxThreads threads and $maxJobs jobs"
log -message $activity

# A counter needed to check where we are in the runtime process
$processJobsCounter = 0

# loop our data and start adding jobs 
foreach ($source in $sourceHash.getenumerator()) {

    # Do we still have time ?
    if ($stopwatch.elapsed -lt $timeout) {

        ############################################
        # if there is time, send another job
        # get your body data
        # add some logging and create a job
        ############################################
        $i++
        $jobstats = getJobStats -max $max -maxJobs $maxJobs
        $status = "Running {0} jobs | Adding {1}/{2}" -f $jobstats.open,$i,$max
        # i'm adding progress on the wfa command
        Set-WfaCommandProgress -total $max -current $i -ProgressPercentage ([int]($i/$max*100)) -note $status
        # log -message  $status -PercentComplete ($i/$max*100)

        # Wait for a slot to become available
        While (-not $jobstats.hasFreeSlot){
            Start-Sleep -Seconds $jobCheckSleepTimer
            $jobstats = getJobStats -max $max -maxJobs $maxJobs
        }

        # ADD SOME CUSTOM LOGGING HERE, SAY WHAT YOU ARE DOING
        # I'M USING MY CUSTOM INFO HERE, ACQUIRED FROM THE DATABASE
        log -message ("Running my custom job for {0} {1}" -f $source.value.field1, $source.value.field2)
        $body = getWorkflowBody $source.value
        
        # create a ps job
        $job = [powershell]::Create()

        # add the script
        [void]$job.AddScript($WorkflowCall)

        # pass the parameters
        [void]$job.AddParameter("WfaServer", $WfaServer)
        [void]$job.AddParameter("WorkflowUID", $workflowUID)
        [void]$job.AddParameter("User", $wfaUser)
        [void]$job.AddParameter("Password", $wfaPassword)
        [void]$job.AddParameter("Body", $body)

        # add to the runpool
        $job.RunspacePool = $RunspacePool

        # store the job
        # IMPORTANT - I'M ADDING CUSTOM METADATA IN THE PROPERTY SOURCE
        # IT'S WHERE WE WILL BE USING IT EVERYWHERE IN THE LOGGING (where is reference source.field1, source.field2)
        $oJob = New-Object PSObject -Property @{
            source = $source.value # add custom metadata - add as many as you want
            job    = $job
            result = $job.BeginInvoke()
        }
        [void]$jobs.Add($oJob)

        # process finished jobs while adding jobs
        $processJobsCounter++
        if($processJobsCounter -ge $processJobsInterval){
            processFinishedJobs
            $processJobsCounter=0
        }

        # sleep a bit, if you don't want overload wfa with too many requests
        start-sleep -seconds $jobAddSleepTimer
    }
    else {
        log -severity "warning" -message $("Runner Timeout reached, stop adding jobs")
        # Break the outer loop
        Break
    }   
}

###########################
# processing jobs / adding finished
###########################
$activity = "All jobs are added, continue processing results"
log -message $activity
$jobstats = getJobStats -max $max -maxJobs $maxJobs
While (-not $jobstats.isFinished){

    $status = "Running {0} jobs | All jobs are added" -f $jobstats.open
    Set-WfaCommandProgress -total $max -current $max -ProgressPercentage 100 -note $status
    #log -message  $status -PercentComplete $jobstats.percent

    # process finished jobs - adding finished
    processFinishedJobs

    Start-Sleep -Seconds $jobCheckSleepTimer
    $jobstats = getJobStats -max $max -maxJobs $maxJobs
}

###########################
# processing final jobs
###########################

$activity = "Processing final jobs"
log -message $activity

# process final remaining finished jobs
processFinishedJobs

# do something with $results
# REMEMBER WE PUSHED THE RETURNPARAMETS in $results ?
# YOU COULD DO SOMETHING WITH IT

Set-WfaCommandProgress -total $max -current $max -ProgressPercentage 100 -note "Completed"

$RunspacePool.Close()
$RunspacePool.Dispose()

log -message ("Runner finished - I ran for {0}" -f $stopwatch.Elapsed.toString())


Sample Workflow

Here you can download an example.  Note that it uses a dummy workflow and dummy data.
I also stripped all the SQL Server parts to remove all dependencies.

First add localhost credentials.






Download the sample dar here


No comments :

Post a Comment