For example : you want to collect information from a 1000 servers and the script runs for 2 minutes per server, then your script will run for more than a day.
If we could however process multiple servers at the same time, that would drastically reduce the overall process time.
In this example I'm processing and merging more than a 1000 csv files into 1 csv-file (All that in a few seconds, and I have manually added a fake latency of half a second per csv-file !
The concept
- 1 master script
- multiple child scripts (=jobs)
- each child script runs in it's own PowerShell context / session
- the master script adds the jobs to run in parallel
- the master script collects the child results
Runspaces
To be able to run scripts in parallel, we need a "runspace". It's a PowerShell concept that will help you with this model of running child scripts. These are the basic steps :
PS : let's call a child script "a job".
- Create a runspace
- Add a PowerShell context
- Add constants and functions to your PowerShell context
- Set the maximum of jobs it can contain
- Set the maximum of jobs it can run simulatanously
- Add jobs to the runspace
- Check regularly if jobs are finished and collect the results
Notes and challenges before we start coding
When I first started with runspaces, I thought it was awesome, but I quickly ran into challenges, mainly because I didn't fully understand the concept and/or didn't think it through.
Max jobs vs Max concurrent jobs
There is a difference.
- max jobs : the size of your "queue"
- if you don't set a maximum and you keep adding jobs, I notice your runspace will become slow.
- max concurrent jobs : how many items in our queue can we process at the same time
- you must tune this part, not too few, not too many
The master script has to be smart and divide it's time
Let's make the following analogy : You are the manager of a restaurant
- master script : you, the manager
- job script : eating visitor, served by waiter (we assume each table has a dedicated waiter)
- max jobs : the maximum number of tables you have
- max concurrent jobs : the number of waiters you have running around serving the tables. (the waiters work on their own, as long as you keep adding new visitors, they will keep serving)
- max jobs > max concurrent jobs : you want to keep the waiters working, so you make sure you have more tables than waiters. How many more ? Enough to keep them busy while you are doing other things than assigning tables.
You, as the manager, need to keep everything running. What are your challenges ?
- each time tables are free, you need to unlock the door and let new visitors in.
- each time the tables are full, you need to lock the door, keeping visitors out (you don't want to overcrowd your restaurant)
- each time someone has finished eating, you need to make sure they pay and leave (if you don't do this, you can't let new visitors come in)
- you are alone, so you need to do all of this at the same time
- you are a man, you can not multitask :) (master script = single thread)
How NOT to do it (although it seems logic)
- At first your tables are empty so you let all the customers in at the same time.
- Once you are done, you start checking empty tables and handling the bills.
- You keep doing this over and over...
This seems logical right ? Well, it's not. Why is this bad ?
- if you have fast waiters and fast eaters, they might catch up on you and when you've brought the last visitors to their tables, you realize everyone has finished eating, is waiting for the bill and the waiters are playing cards.
So how do you do this correctly ?
you alternate ! you make sure you divide your time assigning tables and do the bills.- while there are customers outside
- - do you have free tables => let customers in (max 5 at a time)
- - are there customers done, handle the checks and let them out
- No more visitors outside, good, all visitors are in
- While there are tables occupied
- - are there customers done, handle the check and let them out

Logging
If you want to add logging and you want 1 single log-file, you will bump into "locks", because everyone talks to the same log. So make sure your logging is thread-safe.here is a small log function. Note that logging to the screen is relatively slow, so limit logging to screen if possible. This function is however threadsafe (notice the threading.mutex part).
function log{ param( [string]$action, [string]$color = "Cyan", [string]$status = " .. ", [switch]$onlyToFile = $false ) if(-not $silent -and -not $onlyToFile){ if($status){ $statusColor = "white" if($status -eq "OK"){ $status = " OK " $statusColor = "green" } if($status -eq "FAIL"){ $statusColor = "YELLOW" } write-host "[" -NoNewline -ForegroundColor white write-host $status -NoNewline -ForegroundColor $statusColor write-host "] " -NoNewline -ForegroundColor white } Write-Host -ForegroundColor $color "$action" } $timestamp = get-date -Format "yyyyMMddhhmmss" $mtx = New-Object System.Threading.Mutex($false, "LogfileMutex") [void]$mtx.WaitOne() Add-Content $logfile -Value "[$timestamp][$status] $action" $mtx.ReleaseMutex() }
Starting your code
ps : if you don't want RTFM, scroll all the way down and download the sample script and sample files.
Prepare your runspace
# create a default sessionstate - a powershell session for our run pool # we can add functions to it as well, like our log function $iss = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault() # add our log function in the powershell sessionstate $definitionLogFunction = Get-Content Function:\log -ErrorAction Stop $sessionStateLogFunction = New-Object System.Management.Automation.Runspaces.SessionStateFunctionEntry -ArgumentList 'log', $definitionLogFunction $iss.Commands.Add($sessionStateLogFunction) # add the needed variables to the runspace session $logFileVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'logfile',(resolve-path $logFile),$Null $iss.Variables.Add($logFileVariable) # create the runspacepool $RunspacePool = [runspacefactory]::CreateRunspacePool( 1, #Min Runspaces $maxThreads, #Max Runspaces $iss, # our defined session state $host #PowerShell host ) $RunspacePool.Open()
Let's explain a bit :
- $iss : prepare powershell context (initial session state)
- add our logging function to the context so our job scripts can log
- add our logfile location so our job script knows where to log
- ... add other functions and variables if you want
- PS : there is a lot more you can add to the context... google is your friend
Prepare your job-script (child-script)
# PARSE SINGLE FILE $parseCsv = [scriptblock]::Create({ param( $path ) try{ # import csv $results = Import-Csv -Path $path -Delimiter "," # add a column with the file path to the results $results = $results | select *,@{name="path";expression={$path}} # simulate slow script # comment next line if you don't want to simulate slow processing # but adding some latency nicely proves that runspaces overcome latencies sleep -Milliseconds 500 }catch{ log "Failed to parse [$path]" -status "FAIL" -onlyToFile }finally{ # return the csv content $results } # a list to maintain the running jobs [System.Collections.ArrayList]$jobs = @() })
Let's explain a bit :
- the script is a variable of the type "scriptblock"
- we can have parameters (like path)
- always nice to add try/catch/finally
- in my example I'm reading a single csv-file and i add the csv file path as an extra property
- I also add some sleep, pure to demo that when having latencies with runspaces, it's not impacting that much, since we run in parallel. Also, adding the latency nicely fills up my job-queue, otherwise the csv parsing goes sooo fast, I can never fill the queue.
- I also create an arraylist, where I will keep track of all the jobs I'll be adding (customer attendance list, who's in the restaurant ?)
Collect my files to parse
# lets process all pipeinput & arrays # we assume that we accept pipeline input # we basically create one big array of files we need to parse # we do this recursively # in this example we will end up with > 1000 csv files to parse foreach($rp in $rootPath){ log -action "querying $rp" $PathExists = $false try{ $PathExists = Test-Path $rp }catch{ log -action "path $rp is unavailable - skipping" -color red -status "FAIL" } if($PathExists){ try{ Get-ChildItem -Path $rp -Filter *.csv -Recurse -ErrorAction SilentlyContinue -Force | %{ $path = $_.FullName $name = $_.Name $filesToParse+=$_.FullName } }catch{ log -action "path $rp is available, but failed to query - skipping" -color red -status "FAIL" break } }else{ log -action "path $rp is unavailable - skipping" -color red -status "FAIL" } }
Let's explain
- I like add checks/tests and try/catch, so I make sure the path exists
- I also like the PowerShell pipeline, so I'm actually allowing multiple paths from the pipeline
- But in essence, I just get the filelist (recursive) and I maintain an array (filesToParse) of the filepaths
Main process of adding jobs
########################### # Adding jobs + processing ########################### $activity = "Adding jobs with max $maxThreads threads and $maxJobs jobs" log $activity -color Yellow $processJobsCounter = 0 # loop all files foreach($f in $filesToParse){ # visualize progress $i++ $jobstats = getJobStats -max $max -maxJobs $maxJobs $status = "Running {0} jobs | Adding {1}/{2}" -f $jobstats.open,$i,$max write-progress -Activity $activity -Status $status -PercentComplete ($i/$max*100) # Wait for a slot to become available While (-not $jobstats.hasFreeSlot){ Start-Sleep -Milliseconds $slotTimer $jobstats = getJobStats -max $max -maxJobs $maxJobs } # Invoke job (parallel) $Job = [System.Management.Automation.PowerShell]::Create() [void]$Job.AddScript($parseCsv) [void]$Job.AddParameter("path",$f) $Job.RunspacePool = $RunspacePool $oJob = New-Object PSObject -Property @{ path = $d.FullName # add custom metadata - add as many as you want - can be a hash-table job = $Job result = $Job.BeginInvoke() } [void]$jobs.Add($oJob) # process finished jobs while adding jobs $processJobsCounter++ if($processJobsCounter -ge $processJobsInterval){ processFinishedJobs $processJobsCounter=0 } } ########################### # processing jobs / adding finished ########################### $activity = "Processing jobs with max $maxThreads threads and $maxJobs jobs" log $activity -color Yellow While (-not $jobstats.isFinished){ $status = "Running {0} jobs | All jobs are added" -f $jobstats.open write-progress -Activity $activity -Status $status -PercentComplete $jobstats.percent # process finished jobs - adding finished processFinishedJobs Start-Sleep -Milliseconds $sleepTimer $jobstats = getJobStats -max $max -maxJobs $maxJobs } ########################### # processing final jobs ########################### $activity = "Processing final jobs" log $activity -color Yellow # process final remaining finished jobs processFinishedJobs Write-Progress -PercentComplete 100 -Activity $activity -Completed log ("Parsing finished - Script ran for {0}" -f $stopwatch.Elapsed.toString()) -color white -status "TIME"
Let's explain :
- First I loop my filelist array
- I wait for a slot to become available
- I add my child-script to the runspace (adding a parameter, the csv-file path)
- Here I create a little job-object that holds the result, the job & some custom information, that will come in handy later when I process the results. You need this to know what item you are processing at that time. You can add hash-tables so you add a lot of custom info there i you want to. I just add the file-path in my case.
- I add my job object to my arraylist (it's like keeping a customerlist of who's currently in the restaurant)
- Every x jobs, I check for finished jobs (does someone ask for the bill ?)
- Then I keep processing running jobs (all customers are in the restaurant, just bill-processing now)
- Finally I process 1 more time (needed ! these are the last customers that just finished eating)
Getting jobstatus
# GET JOB STATS function getJobStats{ param( $max, $maxJobs ) $running = @($jobs).Count $finished = $max - $running $reallyRunning = @($jobs | ?{ # really import line here. check result first ! if($_.Result){ -not $_.Result.IsCompleted }else{ $false } }).Count $percent = if($max -gt 0){ [int]($finished/$max*100) }else{ 100 } $o = @{ running = $running open = $reallyRunning finished = $finished percent = $percent hasFreeSlot = ($reallyRunning -lt $maxJobs) isFinished = (-not $running) } New-Object -TypeName psobject -Property $o }
Let's explain :
- This is a little function to help you check for finished jobs
- Meanwhile I calculate the progress, percentage, running, hasFreeSlot, ...
- Note that there is a difference between running and reallyrunning
- running are the jobs in the queue (people in the restaurant)
- really running are the jobs that are NOT finished yet (people still eating)
- I return a little psobject with all this info, perfect to make your decisions and to show progress
Processing Finished Jobs
# HANDLE FINISHED JOBS function processFinishedJobs(){ # handle finished jobs # use a streamwriter => fastest file writes # and because we process every x csv files, we do faster writes as we don't need to reopen the # file for each single item # uncomments next line if you want to see when dumping happens # log -action "dumping results to $tempFile..." -color cyan -status "proc" $stream = new-object system.IO.StreamWriter((resolve-path $tempFile),$true) ForEach ($job in @($jobs | ? {$_.Result.IsCompleted})){ # we get the job result by triggering endinvoke on the jobhandle $jobOutput = $job.Job.EndInvoke($Job.Result); # open target text file try{ # loop the results $jobOutput | %{ # add to the result file $text = ("{0}`t{1}`t{2}" -f $_.path,$_.name,$_.value) $stream.WriteLine($text) } }catch{ log $_.Exception.Message -status "FAIL" -onlyToFile } # kill the job, adding room for new jobs $job.Job.Dispose() $job.Job = $Null $job.Result = $Null $jobs.remove($job) } # close the file $stream.Close() }
Let's explain :
- Here i process all completed job and I grab the output of the job
- In my case, I stream to a text-file
- I use streamwriter, which is way faster. And because I process every once in a while, it's even faster. (It's like having the cash-register open while processing all outstanding bills in the restaurant, instead of opening and closing it for each customer)
- I finally dispose the job and remove it from the arraylist (remove customer from my attendance-list, he/she has paid and has left the restaurant)
The full script
[cmdletbinding()] param( [parameter(Mandatory=$false,ValueFromPipeline=$True)] [string[]]$rootPath, [parameter(Mandatory=$true,HelpMessage="Full path of temp file")] [string]$tempFile, [parameter(Mandatory=$true,HelpMessage="Path of the log file")] [string]$logFile, [parameter(HelpMessage="Run in silent mode")] [switch]$silent, [parameter(HelpMessage="Number of parallel threads")] [int]$maxThreads=100, [parameter(HelpMessage="Number of jobs to be queued")] [int]$maxJobs=120, [parameter(HelpMessage="Number of ms for slot-free-waiting")] [int]$slotTimer=50, [parameter(HelpMessage="Number of ms for loop-waiting")] [int]$sleepTimer=500, [parameter(HelpMessage="Process completed jobs every x number of items")] [int]$processJobsInterval=50 ) Begin{ ############################ # FUNCTIONS ############################ function log{ param( [string]$action, [string]$color = "Cyan", [string]$status = " .. ", [switch]$onlyToFile = $false ) if(-not $silent -and -not $onlyToFile){ if($status){ $statusColor = "white" if($status -eq "OK"){ $status = " OK " $statusColor = "green" } if($status -eq "FAIL"){ $statusColor = "YELLOW" } write-host "[" -NoNewline -ForegroundColor white write-host $status -NoNewline -ForegroundColor $statusColor write-host "] " -NoNewline -ForegroundColor white } Write-Host -ForegroundColor $color "$action" } $timestamp = get-date -Format "yyyyMMddhhmmss" $mtx = New-Object System.Threading.Mutex($false, "LogfileMutex") [void]$mtx.WaitOne() Add-Content $logfile -Value "[$timestamp][$status] $action" $mtx.ReleaseMutex() } # GET JOB STATS function getJobStats{ param( $max, $maxJobs ) $running = @($jobs).Count $finished = $max - $running $reallyRunning = @($jobs | ?{ # really import line here. check result first ! if($_.Result){ -not $_.Result.IsCompleted }else{ $false } }).Count $percent = if($max -gt 0){ [int]($finished/$max*100) }else{ 100 } $o = @{ running = $running open = $reallyRunning finished = $finished percent = $percent hasFreeSlot = ($reallyRunning -lt $maxJobs) isFinished = (-not $running) } New-Object -TypeName psobject -Property $o } # HANDLE FINISHED JOBS function processFinishedJobs(){ # handle finished jobs # use a streamwriter => fastest file writes # and because we process every x csv files, we do faster writes as we don't need to reopen the # file for each single item # uncomments next line if you want to see when dumping happens # log -action "dumping results to $tempFile..." -color cyan -status "proc" $stream = new-object system.IO.StreamWriter((resolve-path $tempFile),$true) ForEach ($job in @($jobs | ? {$_.Result.IsCompleted})){ # we get the job result by triggering endinvoke on the jobhandle $jobOutput = $job.Job.EndInvoke($Job.Result); # open target text file try{ # loop the results $jobOutput | %{ # add to the result file $text = ("{0}`t{1}`t{2}" -f $_.path,$_.name,$_.value) $stream.WriteLine($text) } }catch{ log $_.Exception.Message -status "FAIL" -onlyToFile } # kill the job, adding room for new jobs $job.Job.Dispose() $job.Job = $Null $job.Result = $Null $jobs.remove($job) } # close the file $stream.Close() } # PARSE SINGLE FILE $parseCsv = [scriptblock]::Create({ param( $path ) try{ # import csv $results = Import-Csv -Path $path -Delimiter "," # add a column with the file path to the results $results = $results | select *,@{name="path";expression={$path}} # simulate slow script # comment next line if you don't want to simulate slow processing # but adding some latency nicely proves that runspaces overcome latencies sleep -Milliseconds 500 }catch{ log "Failed to parse [$path]" -status "FAIL" -onlyToFile }finally{ # return the csv content $results } }) ############################ # START CODE ############################ # start timer $stopwatch=[system.diagnostics.stopwatch]::StartNew() # variables $ErrorActionPreference = "stop" $filesToParse = @() # LOGGING $out = new-item $logfile -Type file -Force log "START" -color Yellow # create the dump file # add the header of the file # we make it tab-delimited log "Creating bulk insert file" -color Magenta $out = New-item "$tempfile" -Type file -Force Add-Content -Path $tempFile -Value (("path","name","value") -join "`t") # we will run in parallel - prep all # create a default sessionstate - a powershell session for our run pool # we can add functions to it as well, like our log function $iss = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault() # add our log function in the powershell sessionstate $definitionLogFunction = Get-Content Function:\log -ErrorAction Stop $sessionStateLogFunction = New-Object System.Management.Automation.Runspaces.SessionStateFunctionEntry -ArgumentList 'log', $definitionLogFunction $iss.Commands.Add($sessionStateLogFunction) # add the needed variables to the runspace session $logFileVariable = New-object System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList 'logfile',(resolve-path $logFile),$Null $iss.Variables.Add($logFileVariable) # create the runspacepool $RunspacePool = [runspacefactory]::CreateRunspacePool( 1, #Min Runspaces $maxThreads, #Max Runspaces $iss, # our defined session state $host #PowerShell host ) $RunspacePool.Open() # a list to maintain the running jobs [System.Collections.ArrayList]$jobs = @() log ("Runspace is prepared - Script ran for {0}" -f $stopwatch.Elapsed.toString()) -color white -status "TIME" } process{ # lets process all pipeinput & arrays # we assume that we accept pipeline input # we basically create one big array of files we need to parse # we do this recursively # in this example we will end up with > 1000 csv files to parse foreach($rp in $rootPath){ log -action "querying $rp" $PathExists = $false try{ $PathExists = Test-Path $rp }catch{ log -action "path $rp is unavailable - skipping" -color red -status "FAIL" } if($PathExists){ try{ Get-ChildItem -Path $rp -Filter *.csv -Recurse -ErrorAction SilentlyContinue -Force | %{ $path = $_.FullName $name = $_.Name $filesToParse+=$_.FullName } }catch{ log -action "path $rp is available, but failed to query - skipping" -color red -status "FAIL" break } }else{ log -action "path $rp is unavailable - skipping" -color red -status "FAIL" } } log ("All csv-filenames are collected - Script ran for {0}" -f $stopwatch.Elapsed.toString()) -color white -status "TIME" } End{ log ("Merging Csv files - Script ran for {0}" -f $stopwatch.Elapsed.toString()) -color white -status "TIME" $max=$filesToParse.count $isThereData = ($max -gt 0) # progress counters $i = 0 if($isThereData){ ########################### # Adding jobs + processing ########################### $activity = "Adding jobs with max $maxThreads threads and $maxJobs jobs" log $activity -color Yellow $processJobsCounter = 0 # loop all files foreach($f in $filesToParse){ # visualize progress $i++ $jobstats = getJobStats -max $max -maxJobs $maxJobs $status = "Running {0} jobs | Adding {1}/{2}" -f $jobstats.open,$i,$max write-progress -Activity $activity -Status $status -PercentComplete ($i/$max*100) # Wait for a slot to become available While (-not $jobstats.hasFreeSlot){ Start-Sleep -Milliseconds $slotTimer $jobstats = getJobStats -max $max -maxJobs $maxJobs } # Invoke job (parallel) $Job = [System.Management.Automation.PowerShell]::Create() [void]$Job.AddScript($parseCsv) [void]$Job.AddParameter("path",$f) $Job.RunspacePool = $RunspacePool $oJob = New-Object PSObject -Property @{ path = $d.FullName # add custom metadata - add as many as you want - can be a hash-table job = $Job result = $Job.BeginInvoke() } [void]$jobs.Add($oJob) # process finished jobs while adding jobs $processJobsCounter++ if($processJobsCounter -ge $processJobsInterval){ processFinishedJobs $processJobsCounter=0 } } ########################### # processing jobs / adding finished ########################### $activity = "Processing jobs with max $maxThreads threads and $maxJobs jobs" log $activity -color Yellow While (-not $jobstats.isFinished){ $status = "Running {0} jobs | All jobs are added" -f $jobstats.open write-progress -Activity $activity -Status $status -PercentComplete $jobstats.percent # process finished jobs - adding finished processFinishedJobs Start-Sleep -Milliseconds $sleepTimer $jobstats = getJobStats -max $max -maxJobs $maxJobs } ########################### # processing final jobs ########################### $activity = "Processing final jobs" log $activity -color Yellow # process final remaining finished jobs processFinishedJobs Write-Progress -PercentComplete 100 -Activity $activity -Completed log ("Parsing finished - Script ran for {0}" -f $stopwatch.Elapsed.toString()) -color white -status "TIME" }else{ log "Nothing to process" -color Magenta } # cleanup $RunspacePool.Close() $RunspacePool.Dispose() # end $stopwatch.stop() log ("Finished - Script ran for {0}" -f $stopwatch.Elapsed.toString()) -color white -status "TIME" }
Here you can download the zip file. It contains the scrip and a directory of csv files you can run against.
Download zip file
Run the script like this :
.\runspace_sample.ps1 -rootPath .\temp -tempFile .\temp.txt -logFile .\log.txt [ .. ] START [ .. ] Creating bulk insert file [TIME] Runspace is prepared - Script ran for 00:00:00.0452503 [ .. ] querying .\temp [TIME] All csv-filenames are collected - Script ran for 00:00:00.2537204 [TIME] Merging Csv files - Script ran for 00:00:00.2614211 [ .. ] Adding jobs with max 100 threads and 120 jobs [ .. ] Processing jobs with max 100 threads and 120 jobs [ .. ] Processing final jobs [TIME] Parsing finished - Script ran for 00:00:10.8915885 [TIME] Finished - Script ran for 00:00:10.9471179
Excellent Post! Very valuable concept, thanks for sharing!
ReplyDeletethe mutex in the logger fucntion blocked me after running 6 threads. May i know why?
ReplyDeleteI just need to run two scripts (A.ps1 &B.ps1) in Parallel. How i can do this
ReplyDeleteUse start-job in this case.
Deletehttps://docs.microsoft.com/en-us/powershell/module/microsoft.powershell.core/start-job?view=powershell-6