But I'm talking about invoking a loop-back (localhost) rest-call.
When done in the traditional way, usually for something simple like the default "acquire datasource" workflow, it's something like : invoke-rest - wait - check result.
Maybe even with a repeat-row, like Tim Kleingeld did with his lun migrations a few years back.
If we mean serious business, we need something more solid, more thought-through.
First of all, a big THANK YOU to José Fernandez Diaz for working out this concept. José is a contractor in the UK, working 24/7 with WFA and is pushing WFA to its limits.
Runspaces to the riscue
The concept and this template is using runspaces, so definitely check out my previous post about runspaces.
We just wrap this concept in a WFA command.
Purpose
The purpose of this runner-template is to process massive amounts of workflow-calls. Imagine that you wrote a monster workflow to provision volumes (with snapmirror, snapvault, ... the full monty)
And now you need to run this 100 times. Maybe it's so massive that you need to build in auto-resumes, timeout-handling, etc...
Some real-life examples where I'm using this already
- provisioning for massive migrations
- post enabling snapmirrors
- creating storageX policies
- creating DFS links
The Concept & Database driven
This is how it works
- Load a dataset from somewhere (a database is an excellent choice)
- For the whole dataset invoke workflows in parallel
- Set concurrency (limit the amount of slots => read the runspace post to understand !)
- Set timeout, maybe the dataset is sooo massive, we want to limit the runtime
- Handle workflow results (a database for example, to keep track of the status/result)
- Maybe auto resume (maybe your sub-workflow has dependencies and sometimes a simple resume is enough - call it a retry)
- Enable logging (again, a database is an excellent choice)
The Code
At the end of this post I've added a sample DAR. But because I don't want to add any dependencies, I'm using a dummy workflow and some dummy data.
In the code below I've added lots of comments. This is advanced stuff, I know. PM me if you are stuck.
For this code to work, you also need to add loop-back credentials so the workflow can talk localhost.
By default I assume you add credentials for "localhost". The WFA server is also defaulted to localhost.
In the code below I've added lots of comments. This is advanced stuff, I know. PM me if you are stuck.
For this code to work, you also need to add loop-back credentials so the workflow can talk localhost.
By default I assume you add credentials for "localhost". The WFA server is also defaulted to localhost.
Param( [parameter(Mandatory = $true, HelpMessage = "Database server")] [string]$Dbserver, [parameter(Mandatory = $true, HelpMessage = "Database Name")] [string]$DbName, [parameter(Mandatory = $false, HelpMessage = "database credentials from WFA stored credentials")] [string]$DbCredential, [parameter(Mandatory = $false, HelpMessage = "WFA server")] [string]$WfaServer = "localhost", [parameter(Mandatory = $false, HelpMessage = "WFA server credentials from WFA stored credentials")] [string]$Wfacredential = "localhost", [parameter(Mandatory = $true, HelpMessage = "Workflow name to run")] [string]$WorkflowName ################################# # ADD MORE CUSTOM PARAMETERS ################################# ) ############################### # constants ############################### $ErrorActionPreference = "Stop" $maxThreads = 6 # concurrent jobs $maxJobs = 10 # max jobs in waiting queue $jobCheckSleepTimer = 20 # time between job checks $jobAddSleepTimer = 2 # time between job adds $httpRetrySleepTimer = 30 # time to wait and start resume/retry $getWorkflowResultSleepTimer = 10 # time to retry to get the workflowResult $maxRunningTimeMinutes = 120 # Time when the last workflow will be scheduled, maxtime stop $timeout = new-timespan -Minutes 5 # Workflow timeout $resumeTimeout = new-timespan -Minutes 3 # resume time $maxInvokeWorkflowTries = 1 # how many times do we try to start the workflow $maxRunWorkflowTries = 1 # how many times do we try to run the workflow (if > 1, resumes will be attempted) $retryRunErrorMatches = "please retry","Temporary", "Unauthorized" # if the workflow returns an error like these, we retry multiple times $processJobsInterval = 5 # process jobs after each batch of x jobs ############################### # function definitions ############################### # SAMPLE FUNCTION TO EXECUTE MS SQL - YOU COULD ADD ONE FOR MYSQL, .... # NOTICE THE RETRY PART, IN CASE YOU HAVE A LOCK-ERROR FOR EXAMPLE # WE USE THIS TO INTERACT WITH THE DATABASE, MAYBE SET THE STATUS OF SOMETHING function ExecuteNonQuery{ param([string]$sqlQuery) $maxtries = 5 $tries = 0 $complete = $false do{ try{ $SqlCmd = New-Object System.Data.SqlClient.SqlCommand $SqlCmd.CommandText = $sqlQuery $SqlCmd.Connection = $sqlConnection $result = $SqlCmd.ExecuteNonQuery() $complete = $true } catch { $errorMessage = $_.exception.message get-wfalogger -warn -message $("Execute non query failed: " + $sqlQuery) get-wfalogger -warn -message $("Error message: " + $errormessage) $tries++ if($tries -ge $maxtries) { throw $_ } get-wfalogger -warn -message $("Re-trying query") start-sleep -seconds 3 } } until($complete) return $result } # A SAMPLE FUNCTION TO ADD DATABASE LOGGING # IN THIS EXAMPLE I'M ADDING NOTES TO MIGRATION ITEMS # AGAIN, A RETRY IS BUILT IN function writeSQLnote { param( $migrationID,$note ) $storedProcedureName = "sp_new_t_note" $SqlCmd = $SqlConnection.CreateCommand() $storedProcedureFullName = $SQLDBName + ".dbo." + $storedProcedureName $SqlCmd.CommandText = $storedProcedureFullName $SqlCmd.CommandType=[System.Data.CommandType]::StoredProcedure $SqlCmd.Parameters.AddWithValue("@note", $note) | out-null $SqlCmd.Parameters.AddWithValue("@migration_id", $migrationID) | out-null $SqlCmd.Parameters.AddWithValue("@updated_by", "wfa_runner") | out-null $SqlConnection.Open() $dummy = $SqlCmd.ExecuteNonQuery() $SqlConnection.Close() } # THIS FUNCTION IS OPTIONALLY USED FOR THE FOLLOWING SITUATIONS # WHAT IF MY WORKFLOW HAS A SILENT CONTINUE SOMEWHERE # A WARNING OF SOME KIND THAT DOESN'T FAIL, BUT MIGHT REQUIRE # SOME ATTENTION. SO I CREATED A WFA SCHEME "workflowjob" # AND IN MY SILENTLY FAILING COMMAND, I'VE ADD DATABASE LOGGING # WHERE I KEEP TRACK OF THE WFA-JOBID, A TYPE (FAIL/SUCCES/WARNING) # AND A CODE (IDENTIFY WHERE SOMETHING WENT WRONG) function CheckWorkflowJobStatus{ param ( [parameter(Mandatory=$true, HelpMessage="Status Code to Check")] [string]$StatusCode, [parameter(Mandatory=$true, HelpMessage="Status Type to Check")] [ValidateSet('success','fail')] [string]$StatusType, [parameter(Mandatory=$true, HelpMessage="JobId")] [int]$JobId ) # check in db $cmd1 = "SELECT 1 FROM workflowjob.jobstatus WHERE StatusType='$StatusType' AND StatusCode='$StatusCode' AND JobId=$JobId" $maxtries=3 $tries=1 do{ try{ Get-WFALogger -Info -message $("Checking $StatusCode [$StatusType] in db") $result = Invoke-MySqlQuery -query $cmd1 return ($result[0]) }catch{ $tries++ Start-Sleep -Seconds 3 } }while($tries -le $maxtries) Get-WFALogger -Info -message $("Failed to execute query $cmd1") return 0 } # THIS FUNCTION IS A MORE ADVANCED INVOKE-RESTMETHOD FUNCTION. José Fernandez Diaz HAS # ENCOUNTERED SOME ISSUES / LIMITS WITH THE COMMON invoke-restmethod (I'LL TAKE HIS WORD FOR IT) function Invoke-HttpCommand() { # A replacement for invoke-restmethod, which is sometimes causing troubles param( [string] $target, [string] $username = $null, [string] $password = $null, [string] $verb, [string] $content ) #Write-host "$target" -ForegroundColor Cyan #Write-host "$content" -ForegroundColor Magenta # prepare http client [System.Net.ServicePointManager]::ServerCertificateValidationCallback = {$true} $webRequest = [System.Net.WebRequest]::Create($target) $encodedContent = [System.Text.Encoding]::UTF8.GetBytes($content) $auth = [System.Text.Encoding]::UTF8.GetBytes($username + ":" + $password) $base64 = [System.Convert]::ToBase64String($auth) $webRequest.Headers.Add("AUTHORIZATION", "Basic $base64"); $webRequest.PreAuthenticate = $true $webRequest.Method = $verb if ($encodedContent.length -gt 0) { $webRequest.ContentLength = $encodedContent.length $requestStream = $webRequest.GetRequestStream() $requestStream.Write($encodedContent, 0, $encodedContent.length) $requestStream.Close() } try { # invoke the call [System.Net.WebResponse] $resp = $webRequest.GetResponse(); } catch { try { $ex = $_ $rs = ([System.Net.WebResponse]$_.Exception.InnerException.Response).getresponseStream() [System.IO.StreamReader] $sr = New-Object System.IO.StreamReader -argumentList $rs; [string] $errorContent = $sr.ReadToEnd(); $ex = ([string]$_ + "`r`n" + [string]$errorContent) } finally { Throw $ex } } if ($resp -ne $null) { $rs = $resp.GetResponseStream(); [System.IO.StreamReader] $sr = New-Object System.IO.StreamReader -argumentList $rs; [string] $results = $sr.ReadToEnd(); [System.Net.ServicePointManager]::ServerCertificateValidationCallback = {$false} return $results; } [System.Net.ServicePointManager]::ServerCertificateValidationCallback = {$false} return ''; } # A LOGGING FUNCTION # FEEL FREE TO REPLACE THIS WITH MORE ADVANCED LOGGING # SEE MY POST ON LOG4NET LOGGING TO EVENTVIEWER AND ROLLING LOGFILE function log { # alternative log function, infact, add whatever logger you want here. # in this case we just wrap get-wfalogger param( [ValidateSet("info", "warning", "error")] [string]$severity = "info", [string]$message = "" ) $message = $message -replace "`"","'" [string]$wfaseverity = $severity if ($severity -eq "warning") { $wfaseverity = "warn" } $expression = ("get-wfalogger -{0} -message `"{1}`"" -f $wfaseverity,$message) try{ invoke-expression $expression -erroraction Stop }catch{ write-host "error with expression : $expression" -ForegroundColor magenta } } # THIS IS TO GET RUNSPACE JOB INFO # CHECK OUT MY POST ON RUNSPACES function getJobStats{ param( $max, $maxJobs ) $running = @($jobs).Count $finished = $max - $running $reallyRunning = @($jobs | ?{ # really important line here. check result first ! if($_.Result){ -not $_.Result.IsCompleted }else{ $false } }).Count if($max -eq 0){ $percent = 100 }else{ $percent = [int]($finished/$max*100) } $o = @{ running = $running open = $reallyRunning finished = $finished percent = $percent hasFreeSlot = ($reallyRunning -lt $maxJobs) isFinished = (-not $running) } New-Object -TypeName psobject -Property $o } # THIS FUNCTION HANDLES YOUR WORKFLOW RESULTS # YOU WILL WANT TO MODIFY THIS PART !! # I'VE ADD SOME SAMPLES ALREADY function processFinishedJobs(){ log -message "...processing jobs..." # handle finished jobs ForEach ($job in @($jobs | ?{$_.Result.IsCompleted})){ try { $result = "success" $jobOutput = $job.job.EndInvoke($job.result); $wfaJobId = ($jobOutput -split ";", 3)[0] $executionStatus = ($jobOutput -split ";", 3)[1] $executionReturn = [xml]($jobOutput -split ";", 3)[2] # here we check some silent continue problem, optional and just an example # the idea is that subworkflow was success, but might have flagged something # we use a mysql scheme as communication platform, see function CheckWorkflowJobStatus $myCustomErrorOrWarningFound=CheckWorkflowJobStatus -StatusType 'fail' -StatusCode 'somewhere_in_my_workflow' -JobId $wfaJobId if($myCustomErrorOrWarningFound -gt 0){ # note I'm logging some custom fields here # when we add a job to the job-list we add some custom metadata # perfect to use the metadata for logging log -severity "error" -message ("Subworkflow for [{0}][{1}] is partially successful. This {2} went wrong. Check details for WFA job: {3}" -f $job.source.field1,$job.source.field2,$job.source.field3, $wfaJobId ) $result = "my_custom_result" } # here we check partially successful # a workflow can be partially successful # note I'm logging some custom fields here # when we add a job to the job-list we add some custom metadata # perfect to use the metadata for logging if($executionStatus -eq "PARTIALLY_SUCCESSFUL") { log -severity "error" -message ("Subworkflow for [{0}][{1}] is partially successful. Check details for WFA job: {2}" -f $job.source.field1,$job.source.field2,$wfaJobId ) $result = "partially_success" } # anyhow, when we reach this, the workflow finished # note I'm logging some custom fields here # when we add a job to the job-list we add some custom metadata # perfect to use the metadata for logging log -message ("Completed subworkflow for {0} {1}" -f $job.source.field1,$job.source.field2) } catch { # Oops, the workflow failed... shit happens :) # note I'm logging some custom fields here # when we add a job to the job-list we add some custom metadata # perfect to use the metadata for logging $result = "fail" $errorMessage = $_.exception.message log -severity "warning" -message ("Subworkflow for [{0}][{1}] failed. Check the job status. Reported error: {2}" -f $job.source.field1,$job.source.field2, $errorMessage ) } # here I use my sample function, where I add a note in my database # customize as you please writeSQLnote -migrationId $job.source.id -note $result # here you can grab the return parameters and perhaps add you meta data to it # we finally push the results an array #$results += New-Object PSObject -Property @{ # "result" = $jobOutput; # "field2" = $job.source.field2; # "field1" = $job.source.field1 #} $job.job.Dispose() $job.job = $Null $job.result = $Null $jobs.remove($job) } } # XML ENCODING FUNCTIONS # WE USE XML IN OUR REST-CALL'S # WE NEED TO MAKE SURE PROPER ENCODING IS USED function prepareForXMLstring { # encodes a string for xml use param( $in ) return ($in -replace ("&", "&")) -replace ("`"", """) } function prepareForXMLboolean { # encodes a boolean for xml use param( $in ) if ([boolean]$in) { return "true" } return "false" } # THIS FUNCTION NEEDS TO BE CUSTOMIZE # HERE WE PREP OUR WORKFLOW POST-BODY # USE THE XML ENCODING FUNCTIONS function getWorkflowBody { # prepare the userinput rest body param( $source ) # get input data, coming from database $field1 = prepareForXMLstring ("`"" + $source.field1 + "`"") $field2 = prepareForXMLstring ("`"" + $source.field2 + "`"") # prep input data, just some sample here $post = [string]"<workflowInput><userInputValues>" $post = $post + '<userInputEntry key="field1" value="' + $field1 + '"/>' $post = $post + '<userInputEntry key="field2" value="' + $field2 + '"/>' $post = $post + '<userInputEntry key="field3" value="true"/>' # ADD MORE FIELDS $post = $post + "</userInputValues>" # ADD COMMENT, WHAT IS THIS WORKFLOW CALL ABOUT ? $ExecutionComment = "subworkflow, blabla : field1 " + $source.field1 + ", path " + $source.field2 if ($ExecutionComment) { $post = $post + "<comments>" + (prepareForXMLstring $ExecutionComment) + "</comments>" } $post = $post + "</workflowInput>" $post = [xml]$post return $post.outerXML } # FUNCTION WHERE WE GRAB OUR DATA # CUSTOMIZE THIS, I'M SHOWING A SAMPLE # TO GRAB DATA FROM A MSSQL DATABASE # YOU COULD GRAB FROM MySQL AND USE CLASSIC # WFA FUNCTION invoke-mysqlquery function getOurMainData { # Get data to process from the database param( [string]$filerName ) $SQLquery = @" use my_database; SELECT id, field1, field2, ... FROM my_table WHERE my_filters "@ $SqlCmd = New-Object System.Data.SqlClient.SqlCommand $SqlCmd.CommandText = $SQLquery # maybe customize using the -f flag and the classic {} placeholders $SqlCmd.Connection = $SqlConnection $reader = $SqlCmd.ExecuteReader() # Reads the dataset and stores them in results $results = @() log -message $("Reading results") while ($reader.Read()) { $row = @{} for ($i = 0; $i -lt $reader.FieldCount; $i++) { $row[$reader.GetName($i)] = $reader.GetValue($i) } $results += new-object psobject -property $row } $reader.close() $outHash = @{} foreach ($record in $results) { $outHash[$record.id] = $record } return $outHash } # HERE IS WHERE THE MAGIC HAPPENS # WHERE ARE HANDLING WORKFLOW CALL HERE # WE WILL CALL THE WORKFLOW AND WAIT FOR THE OUTCOME # IS A CLASSIC WFA REST CALL HANDLER # ScriptBlock to run on every job $WorkflowCall = { Param ( $WfaServer, $WorkflowUID, $User, $Password, $Body ) # create a stopwatch $stopwatch = [diagnostics.stopwatch]::StartNew() # run the workflow $url = "https://" + $WfaServer + "/rest/workflows/" + $WorkflowUID + "/jobs" $try = 0 $done = $false do { try { $results = Invoke-HttpCommand $url $User $Password "POST" $Body $jobs = [xml] $results $done = $true } catch { # catch errors - retry if needed # HERE WE ENABLE RETRY IN CASE OF SOME TEMP ERRORS LIKE AUTHENTICATION, etc... SEE CONSTANTS ABOVE $errormessage = $_.exception.message $errormatches = "(" + ($retryRunErrorMatches-join "|") + ")" # does it match a retry scenario ? if (($errorMessage -imatch $errormatches) -and $try -le $maxInvokeWorkflowTries) { # wait a bit before retry, maybe wfa was too busy. start-sleep -seconds $httpRetrySleepTimer $try++ } else { # in other case, rethrow error throw } } } until($done) # get the workflow result (async) $currentJobId = $jobs.job.JobId $jobUrl = $jobUrl = $url + "/" + $jobs.job.JobId $try = 0 do { # try to get the workflowresult start-sleep -seconds $getWorkflowResultSleepTimer try { $try++ $results = Invoke-HttpCommand $JobUrl $user $password "GET" } catch { # stop trying if we hit the timeout if ($stopwatch.elapsed -le $resumeTimeout) { continue } else { throw $_ } } # get the result info $job = [xml] $results $jobStatus = $job.job.jobStatus.jobStatus # what is the current status ? switch ($jobStatus) { "SCHEDULED" {break; } "PLANNING" {break; } "PENDING" {break; } "EXECUTING" {break; } "ABORTING" {throw "ABORTING CHILD" ; break; } "CANCELED" {throw "CANCELED CHILD" ; break; } "COMPLETED" { # completed ? Return the result and stop. return ($currentJobId + ";" + $jobStatus + ";" + $job.job.jobStatus.returnParameters.outerxml) break } "PARTIALLY_SUCCESSFUL" { # partially completed ? Return the result and stop. return ($currentJobId + ";" + $jobStatus + ";" + $job.job.jobStatus.returnParameters.outerxml) break } "FAILED" { # failed ? Maybe we can resume ? # are we allowed to try a resume ? if ($try -lt $maxRunWorkflowTries -and $stopwatch.elapsed -le $resumeTimeout) { $lasterror = "" $lasterror = $job.job.jobStatus.errorMessage ###################################################################### # ADD LOGIC HERE TO THROW AN ERROR IN CASE IT'S POINTLESS TO RESUME ! # THE WORKFLOW ERROR FINAL AND WE DON'T WANT ANY RESUMES ###################################################################### # if ($lasterrror -eq "..."){ # throw "No resume possible - last error : $lasterror" # } # THE BELOW IS OPTIONAL, MAYBE YOU HAVE RESUME SCENARIO'S # resume $currentJobId = Get-WfaRestParameter -name "JobId" $url = $JobUrl + "/resume" $resumePost = "<workflowInput>" + $job.job.jobStatus.userInputValues.OuterXml + "<comments>" + $job.job.jobStatus.comment + " - Resumed" + "</comments>" + "</workflowInput>" $resumePost = [xml]$resumePost try { $try++ $results = Invoke-HttpCommand $url $user $password "POST" $resumePost.outerXML } catch { # if the resume fails, let's not try again. Just throw $exceptionError = $_.exception.message if ([boolean]$lasterror) { $exceptionError += ". Previous error message: " + $lasterror } $errorMessage = "WFA job: " + $jobs.job.JobId + " failed with error: " + $exceptionError throw $errorMessage } # wait a bit to allow the resume to kick in. start-sleep -seconds $httpRetrySleepTimer } else { # not allowed to resume ? Then return why it failed... $errorMessage = "WFA job: " + $jobs.job.JobId + " failed with error: " + $lasterror throw $errorMessage } break } # unknown return status - throw it default {throw ("--Unexpected Message - {0} - {1}" -f $job.job.JobId,$jobStatus) ; break; } } } until ($stopwatch.elapsed -gt $timeout) # run until finished or until timeout # if we got here, it means we stopped trying and hit the time out # we stop monitoring, and throw the error. $errorMessage = "Job " + $currentJobId + " timed out while waiting." if ($lasterror) { $errorMessage = $errorMessage + " Last error message: " + $lasterror } else { $errorMessage = $errorMessage + " Job has not been resumed." } throw $errorMessage } ####################################### # START MAIN CODE ####################################### # get credentials $dbCred = Get-WfaCredentials -Host $DbCredential $sqlUser = ($dbCred.getnetworkcredential()).username $sqlPassword = ($dbCred.getnetworkcredential()).Password $wfaCred = Get-WfaCredentials -Host $WfaCredentials $wfaUser = ($wfaCred.getnetworkcredential()).username $wfaPassword = ($wfaCred.getnetworkcredential()).Password # get workflow information $url = "https://" + $WfaServer + "/rest/workflows?name=" + $WorkflowName $uidResult = Invoke-HttpCommand $url $wfaUser $wfaPassword GET $workflowUID = ([xml]$uidResult).collection.workflow.uuid # connecting to the SQL database # OPTIONAL, WE NEED TO GET OUR DATA SOMEWHERE # IN THIS SAMPLE, WE GRAB FROM SQL SERVER $SQLServer = $dbserver $SQLDBName = $dbName $SqlConnection = New-Object System.Data.SqlClient.SqlConnection $ConnectionString = "Server=$SQLServer;Database=$SQLDBName;Uid=$SQLUser;Pwd=$SQLPassword;" $SqlConnection.ConnectionString = $ConnectionString log -message "Connecting to SQL Server $SQLServer" $SqlConnection.Open() # get information to process # AGAIN, YOU CAN GET YOUR DATA FROM ANYWHERE YOUR WANT $sourceHash = getOurMainData # array for returnparameters and result $results = @() # sql no longer needed, closing $SqlConnection.close() ########################################## # Multithreading the API calls # Adding the runtime initial session state ########################################## # HERE WE START THE RUNSPACE CODE - LOOK AT MY RUNSPACE POST FOR MORE INFO # http://www.wfaguy.com/2018/03/run-powershell-scripts-in-parallel.html $iss = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault() #--------------------------------------- # Adding functions & variables if needed #--------------------------------------- # Add invoke-http command $definitionInvokeHttpCommandFunction = Get-Content Function:\Invoke-HttpCommand -ErrorAction Stop $sessionStateInvokeHttpCommandFunction = New-Object System.Management.Automation.Runspaces.SessionStateFunctionEntry -ArgumentList 'Invoke-HttpCommand', $definitionInvokeHttpCommandFunction $iss.Commands.Add($sessionStateInvokeHttpCommandFunction) # Add timeout variable $timeoutVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'timeout',$timeout,$Null $iss.Variables.Add($timeoutVariable) # Add resumeTimeout variable $resumeTimeoutVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'resumeTimeout',$resumeTimeout,$Null $iss.Variables.Add($resumeTimeoutVariable) # Add maxInvokeWorkflowTries variable $maxInvokeWorkflowTriesVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'maxInvokeWorkflowTries',$maxInvokeWorkflowTries,$Null $iss.Variables.Add($maxInvokeWorkflowTriesVariable) # Add maxRunWorkflowTries variable $maxRunWorkflowTriesVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'maxRunWorkflowTries',$maxRunWorkflowTries,$Null $iss.Variables.Add($maxRunWorkflowTriesVariable) # Add httpRetrySleepTimer variable $httpRetrySleepTimerVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'httpRetrySleepTimer',$httpRetrySleepTimer,$Null $iss.Variables.Add($httpRetrySleepTimerVariable) # Add retryRunErrorMatches variable $retryRunErrorMatchesVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'retryRunErrorMatches',$retryRunErrorMatches,$Null $iss.Variables.Add($retryRunErrorMatchesVariable) # Add getWorkflowResultSleepTimer variable $getWorkflowResultSleepTimerVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'getWorkflowResultSleepTimer',$getWorkflowResultSleepTimer,$Null $iss.Variables.Add($getWorkflowResultSleepTimerVariable) # create the runspacepool $RunspacePool = [runspacefactory]::CreateRunspacePool( 1, #Min Runspaces $maxThreads, #Max Runspaces $iss, # our defined session state $host #PowerShell host ) $RunspacePool.Open() # Let's time everything $timeout = new-timespan -Minutes $maxRunningTimeMinutes $stopwatch = [diagnostics.stopwatch]::StartNew() # runtime job containers [System.Collections.ArrayList]$jobs = @() $i = 0 $max = $sourceHash.count # START ! $activity = "Adding jobs with max $maxThreads threads and $maxJobs jobs" log -message $activity # A counter needed to check where we are in the runtime process $processJobsCounter = 0 # loop our data and start adding jobs foreach ($source in $sourceHash.getenumerator()) { # Do we still have time ? if ($stopwatch.elapsed -lt $timeout) { ############################################ # if there is time, send another job # get your body data # add some logging and create a job ############################################ $i++ $jobstats = getJobStats -max $max -maxJobs $maxJobs $status = "Running {0} jobs | Adding {1}/{2}" -f $jobstats.open,$i,$max # i'm adding progress on the wfa command Set-WfaCommandProgress -total $max -current $i -ProgressPercentage ([int]($i/$max*100)) -note $status # log -message $status -PercentComplete ($i/$max*100) # Wait for a slot to become available While (-not $jobstats.hasFreeSlot){ Start-Sleep -Seconds $jobCheckSleepTimer $jobstats = getJobStats -max $max -maxJobs $maxJobs } # ADD SOME CUSTOM LOGGING HERE, SAY WHAT YOU ARE DOING # I'M USING MY CUSTOM INFO HERE, ACQUIRED FROM THE DATABASE log -message ("Running my custom job for {0} {1}" -f $source.value.field1, $source.value.field2) $body = getWorkflowBody $source.value # create a ps job $job = [powershell]::Create() # add the script [void]$job.AddScript($WorkflowCall) # pass the parameters [void]$job.AddParameter("WfaServer", $WfaServer) [void]$job.AddParameter("WorkflowUID", $workflowUID) [void]$job.AddParameter("User", $wfaUser) [void]$job.AddParameter("Password", $wfaPassword) [void]$job.AddParameter("Body", $body) # add to the runpool $job.RunspacePool = $RunspacePool # store the job # IMPORTANT - I'M ADDING CUSTOM METADATA IN THE PROPERTY SOURCE # IT'S WHERE WE WILL BE USING IT EVERYWHERE IN THE LOGGING (where is reference source.field1, source.field2) $oJob = New-Object PSObject -Property @{ source = $source.value # add custom metadata - add as many as you want job = $job result = $job.BeginInvoke() } [void]$jobs.Add($oJob) # process finished jobs while adding jobs $processJobsCounter++ if($processJobsCounter -ge $processJobsInterval){ processFinishedJobs $processJobsCounter=0 } # sleep a bit, if you don't want overload wfa with too many requests start-sleep -seconds $jobAddSleepTimer } else { log -severity "warning" -message $("Runner Timeout reached, stop adding jobs") # Break the outer loop Break } } ########################### # processing jobs / adding finished ########################### $activity = "All jobs are added, continue processing results" log -message $activity $jobstats = getJobStats -max $max -maxJobs $maxJobs While (-not $jobstats.isFinished){ $status = "Running {0} jobs | All jobs are added" -f $jobstats.open Set-WfaCommandProgress -total $max -current $max -ProgressPercentage 100 -note $status #log -message $status -PercentComplete $jobstats.percent # process finished jobs - adding finished processFinishedJobs Start-Sleep -Seconds $jobCheckSleepTimer $jobstats = getJobStats -max $max -maxJobs $maxJobs } ########################### # processing final jobs ########################### $activity = "Processing final jobs" log -message $activity # process final remaining finished jobs processFinishedJobs # do something with $results # REMEMBER WE PUSHED THE RETURNPARAMETS in $results ? # YOU COULD DO SOMETHING WITH IT Set-WfaCommandProgress -total $max -current $max -ProgressPercentage 100 -note "Completed" $RunspacePool.Close() $RunspacePool.Dispose() log -message ("Runner finished - I ran for {0}" -f $stopwatch.Elapsed.toString())
Sample Workflow
Here you can download an example. Note that it uses a dummy workflow and dummy data.
I also stripped all the SQL Server parts to remove all dependencies.
First add localhost credentials.
I also stripped all the SQL Server parts to remove all dependencies.
First add localhost credentials.





Download the sample dar here
No comments :
Post a Comment