Hi, I'm Bradley! 👋

Automation Developer

Multi-Threading with PowerShell

4 minutes
September 7, 2023

I was provided an automation project to get data out of an external web resource so our data mangement department could build PowerBI dashboards and reports off that data. My job was simple - data extraction. This particular external web resource provides a SOAP API for data extraction. I know what you’re already thinking - SOAP is outdated and should be replaced with REST. I fully agree, however, it’s what I was provided to work with…

When I began making my API calls to get a look at the data, I was retreiving data sometimes in the hundreds of nodes, and sometimes in the 10’s of thousands. I originally began creating my PowerShell functions to process the data in a single-threaded format - as scripts generally are. Using Measure-Command, I began to realize that this format was just too slow for what I wanted it to do.

Using my largest dataset (15k+) - I began to refactor my functions for multi-threaded processing. I knew that my code needed to be thread-safe code and work in such a way that I ensure all records are being processed and none are skipped.

First, I needed to split my large dataset into more manageable chunks for processing:

1
2
3
4
$counter = { Value = 0 }
$groupSize = 3000

$splitInputData = $pendingDataset | Group-Object -Property { [math]::Floor($counter.Value++ / $groupSize) }

The above code splits my dataset (15,342) into as many groups as possbile, with each group having a maximum count of 3000 objects - which in this case is 6 groups. 5 groups of 3000, and one group of 342.

Next was how I tackle the processing of each group. I knew that I wanted to process each group simultaneously, but I wasn’t sure how I could - Then I came across Synchronized Hashtables.

A synchronized hashtable is a thread-safe hashtable that allows access from multiple sources, queueing each request until the current request has completed and locking the hashtable during each request (similar to a ROWLOCK in SQL).

6
$returnData = [hastable]::Synchronized(@{})

Next was to create the RunSpaces necessary to process the groups in parallel:

 8
 9
10
11
12
13
# Create the RunspacePool and Open
$maxRunspaces = $splitInputDataset.Count

$sessionState = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()
$runspacePool = [RunspaceFactory]::CreateRunspacePool(1, $maxRunspaces, $sessionState, $host)
$runspacePool.Open()

Here I create a RunspacePool - a minimum of 1 Runspace, but a maximum of the total number of groups I have.

Now I process the groups between 6 threads in parallel:

15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Process the data
$splitInputDataset | ForEach-Object {
    $powerShell = [PowerShell]::Create()
    $powerShell.RunspacePool = $runspacePool
    $powerShell.AddScript($script).AddArgument($_.Group).AddArgument($returnData).AddArgument($linkElementValue) | Out-Null

    $jobObj = [PSCustomObject]@{
        # Adds the Runspace details
        Runspace = $powerShell.BeginInvoke()
        # Shows the PowerShell code that was executed in the Runspace
        PowerShell = $powerShell
    }

    $jobs.Add($jobObj) | Out-Null
}
while (($jobs.Runspace.IsCompleted -contains $false)) {
    # Wait until all threads are complete
    Start-Sleep -Milliseconds 100
}

$runspacePool.Close()

This code block creates a Runspace for each group and processes that group with the PowerShell code specified in the variable $script. Here is an example of what is in $script:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$script = {
    Param($unProcessedData, $returnData, $linkElementValue)
    $return = @()

    # Verifies each property that I need for SQLBulkCopy exists and if not - adds it with a $null value.
    $unProcessedData | ForEach-Object {
        if (!($_.PsObject.Property.Name -match 'FirstName')) {
            $_ | Add-Member -MemberType NoteProperty -Name 'FirstName' Value $null
        }
        if (!($_.PsObject.Property.Name -match 'LastName')) {
            $_ | Add-Member -MemberType NoteProperty -Name 'LastName' Value $null
        }
        if (!($_.PsObject.Property.Name -match 'Email')) {
            $_ | Add-Member -MemberType NoteProperty -Name 'Email' Value $null
        }
        # If I need to link SQL tables together via PK/FK Relationship with an element from the parent table
        if (!($_.PsObject.Property.Name -match 'LinkedProperty')) {
            $_ | Add-Member -MemberType NoteProperty -Name 'Email' Value $linkElementValue
        }

        # Add all objects to a local array first - cuts down on accessing the synchronized hashtable thousands of times. This way, I only access the synchronized hashtable once per thread.
        $return += $_ | Select-Object -Property FirstName,LastName,Email,LinkedProperty
    }

    # Append the local array to the synchronized hashtable
    $returnData.ProcessedRows += $return
}

Learning multi-threaded processing with PowerShell was extremely enlightening and has unlocked a new ability for me and my team!