You can share data with workers in the ThreadPoolExecutor using a function argument, global variable, and variable defined in the worker thread initialization function.
In this tutorial, you will discover how to share data from the main thread with tasks executed by workers in the ThreadPoolExecutor.
Let’s get started.
Need to Share Data From Main Thread With Worker Threads
The ThreadPoolExecutor provides a pool of reusable worker threads using the executor design pattern.
Tasks executed in new threads are executed concurrently in Python, making the ThreadPoolExecutor appropriate for I/O-bound tasks.
A ThreadPoolExecutor can be created directly or via the context manager interface and tasks can be issued one-by-one via the submit() method or in batch via the map() method.
For example:
1 2 3 4 5 6 7 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # issue a task future = tpe.submit(task) # the the task result once the task is done result = future.result() |
You can learn more about the ThreadPoolExecutor in the tutorial:
It is common to need to share data from the main thread with workers in the ThreadPoolExecutor.
For example, the main thread may load data or prepare data in some way that is needed in the execution of each task.
How can we share data in the main thread with each task executed in the ThreadPoolExecutor?
Run loops using all CPUs, download your FREE book to learn how.
How to Share Data With Each Task in the ThreadPoolExecutor.
There are 3 main ways to share data from the main thread with tasks in the ThreadPoolExecutor, they are:
- Via an argument to the task function.
- Via a global variable defined in the main thread.
- Via a global variable defined in the worker initializer function.
Although these are the three main methods, we can conceive of additional ways of sharing data between the main thread and tasks executed in the ThreadPoolExecutor.
Perhaps another way that we can share data from the main thread with all workers in the ThreadPoolExecutor is to have the main thread store the data in some way (such as in a file or database), then have each worker independently load the data as needed.
- Via loading data from a shared resource like a file or database.
Another way we could share data is to first share a data structure like a list or a queue, then have the main thread push data into the structure and have the tasks executed in the worker thread pull data from the data structure.
- Via reading data from a shared data structure, like a queue.
Now that we know about the main approaches to sharing data from the main thread with tasks executed in the ThreadPoolExecutor, let’s look at some worked examples.
Example of Sharing Data Using a Task Function Argument
We can explore an example of sharing data from the main thread with tasks in the ThreadPoolExecutor by passing it as an argument to the task function.
In this example, we will define a task that receives data as an argument and reports the data received before blocking for a moment. The main thread will define the data, create the thread pool and execute the task using workers in the pool.
This is the conventional approach of sharing data between the main thread and tasks executed in the ThreadPoolExecutor and assumes that we have control over the task function, allowing us to specify any and all data needed by the task as arguments.
Firstly, we can define the task function. The function takes a data value as an argument, prints the value, then sleeps for a moment to simulate effort.
The task() function below implements this.
1 2 3 4 5 6 |
# task executed in the thread pool def task(value): # report the data print(f'Data in Task: {value}') # block for a moment sleep(1) |
Next, the main thread can define the shared data and report the value that we can use for confirmation.
1 2 3 4 5 |
... # define the data data = '12345' # report data in the main thread print(f'Data in Main: {data}') |
Next, the main thread will create the ThreadPoolExecutor using the context manager interface with the default number of worker threads.
The task is then issued using the submit() method and the returned Future object is ignored. The main thread then blocks until the task is complete and the ThreadPoolExecutor is shutdown by the context manager.
1 2 3 4 5 6 |
... # create the thread pool with ThreadPoolExecutor() as tpe: # issue a task to the thread pool _ = tpe.submit(task, data) # wait for all tasks to complete |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# SuperFastPython.com # example of sharing data using a function argument from concurrent.futures import ThreadPoolExecutor from time import sleep # task executed in the thread pool def task(value): # report the data print(f'Data in Task: {value}') # block for a moment sleep(1) # define the data data = '12345' # report data in the main thread print(f'Data in Main: {data}') |
Running the example first defines the shared data and reports its value.
In this case, the shared data is a string of 5 integer values.
Next, the thread pool is created and started and the task is issued and the data is provided as an argument. The main thread then blocks, waiting for all tasks in the thread pool to be complete and the resources of the thread pool to be released.
The task executes in the thread pool receiving the data as an argument. The data is reported and the task blocks for a moment.
We can see that the reported data matches that defined in the main thread.
The task resumes and the task ends. The thread pool closes and releases all threads, allowing the main thread to continue and terminate the program.
This highlights how we can share data from the main thread with tasks executed by workers in the ThreadPoolExecutor using a function argument.
1 2 |
Data in Main: 12345 Data in Task: 12345 |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Sharing Data Using a Global Variable Defined in Main
We can explore how to share data from the main thread with tasks executed in the ThreadPoolExecutor using a global variable.
This approach may be appropriate where we may be able to define global variables and change the content of the task function, but not the function signature (e.g. we cannot add function arguments).
In this case, we can update the above example so that the task function accesses the data defined as a global variable in the main thread. This can be achieved by declaring the “data” variable in the scope of the function using the global expression.
1 2 3 |
... # declare the global variable global data |
If you are new to declaring global variables in Python, see the API documentation:
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 |
# task executed in the thread pool def task(): # declare the global variable global data # report the data print(f'Data in Task: {data}') # block for a moment sleep(1) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of sharing data using a global variable in main from concurrent.futures import ThreadPoolExecutor from time import sleep # task executed in the thread pool def task(): # declare the global variable global data # report the data print(f'Data in Task: {data}') # block for a moment sleep(1) # define the data data = '12345' # report data in the main thread print(f'Data in Main: {data}') # create the thread pool with ThreadPoolExecutor() as tpe: # issue a task to the thread pool _ = tpe.submit(task) # wait for all tasks to complete |
Running the example first defines the shared data as a global variable and reports its value.
In this case, the shared data is a string of 5 integer values.
Next, the thread pool is created and started and the task is issued. The main thread then blocks, waiting for all tasks in the thread pool to be complete and the resources of the thread pool to be released.
The task executes in the thread pool. The global variable is declared, allowing the task to access the data directly. The data is reported and the task blocks for a moment.
We can see that the reported data matches that defined in the main thread.
The task resumes and the task ends. The thread pool closes and releases all threads, allowing the main thread to continue and terminate the program.
This highlights how we can share data from the main thread with tasks executed by workers in the ThreadPoolExecutor using a shared global variable.
1 2 |
Data in Main: 12345 Data in Task: 12345 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Sharing Data Using Worker Initializer
We can explore the case of sharing data from the main thread with tasks executed in the ThreadPoolExecutor using a global variable defined for each worker thread alone.
This can be achieved using a custom worker thread initialization function that is executed by each worker thread created in the ThreadPoolExecutor.
If you are new to worker thread initialization functions, see the tutorial:
We can define a worker thread initialization function that takes the shared data as a function argument. The function then declares a new global variable for the thread executing the initialization function and stores the argument in the global variable.
The worker_init() function below implements this.
1 2 3 4 5 6 |
# worker initialization function def worker_init(value): # declare a global variable for the worker global worker_value # define the worker global variable worker_value = value |
The task function is then able to access the shared global variable “worker_value” directly.
The task() function below implements this.
1 2 3 4 5 6 |
# task executed in the thread pool def task(): # report the data print(f'Data in Task: {worker_value}') # block for a moment sleep(1) |
Finally, the main thread must configure the ThreadPoolExecutor to call the custom initialization function with an argument for each new worker thread that is created.
This can be achieved using the “initializer” and “initargs” arguments to the ThreadPoolExecutor constructor.
1 2 3 4 5 |
... # create the thread pool with ThreadPoolExecutor(initializer=worker_init, initargs=(data,)) as tpe: # issue a task to the thread pool _ = tpe.submit(task) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of sharing data using a global variable defined in worker init from concurrent.futures import ThreadPoolExecutor from time import sleep # worker initialization function def worker_init(value): # declare a global variable for the worker global worker_value # define the worker global variable worker_value = value # task executed in the thread pool def task(): # report the data print(f'Data in Task: {worker_value}') # block for a moment sleep(1) # define the data data = '12345' # report data in the main thread print(f'Data in Main: {data}') # create the thread pool with ThreadPoolExecutor(initializer=worker_init, initargs=(data,)) as tpe: # issue a task to the thread pool _ = tpe.submit(task) # wait for all tasks to complete |
Running the example first defines the shared data as a global variable and reports its value.
In this case, the shared data is a string of 5 integer values.
Next, the thread pool is created and configured to execute the custom worker thread initialization function with the data as an argument. for each worker thread that is created in the pool.
The task is issued to the pool. The main thread then blocks, waiting for all tasks in the thread pool to be complete and the resources of the thread pool to be released.
The task executes in the thread pool. In this case, a new worker thread is created (worker threads are created on-demand), which then executes the custom initialization function. This global variable is defined for the worker thread and the argument data is stored in the global variable.
The task then executes, reporting the value of the global variable and blocking for a moment to simulate effort.
We can see that the reported data matches that defined in the main thread.
The task resumes and the task ends. The thread pool closes and releases all threads, allowing the main thread to continue and terminate the program.
This highlights how we can share data from the main thread with tasks executed by workers in the ThreadPoolExecutor using a shared global variable defined in the worker thread initialization function.
1 2 |
Data in Main: 12345 Data in Task: 12345 |
A weakness of this approach is that each new thread created in the ThreadPoolExecutor will re-declare and redefine the global variable.
This is not needed.
Instead, we can have the first thread that is created define a global variable that can then be accessed by all worker threads in the pool.
This can be achieved by first checking for the existence of the global variable in the list of global variables, and if it exists, do not declare it again.
The updated worker_init() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# worker initialization function def worker_init(value): # check if the variable has already been defined if 'worker_value' in globals(): # do not define the variable again return # declare a global variable for the worker global worker_value # define the worker global variable worker_value = value # report that the global variable has been defined once print('Variable defined') |
We can update the task function to report the worker thread name along with the shared data, to confirm that multiple worker threads were created and are being used to execute tasks.
1 2 3 4 5 6 |
# task executed in the thread pool def task(): # report the data print(f'Data in Task: {worker_value} [thread={current_thread().name}]') # block for a moment sleep(1) |
We can then issue many tasks to the thread pool for execution to test whether the global variable is only defined once, and is still accessible to all tasks executed by all worker threads.
1 2 3 |
... # issue many tasks to the thread pool _ = [tpe.submit(task) for _ in range(10)] |
Tying this together, the complete example with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of sharing data using a global variable defined in worker init from concurrent.futures import ThreadPoolExecutor from time import sleep # worker initialization function def worker_init(value): # check if the variable has already been defined if 'worker_value' in globals(): # do not define the variable again return # declare a global variable for the worker global worker_value # define the worker global variable worker_value = value # report that the global variable has been defined once print('Variable defined') # task executed in the thread pool def task(): # report the data print(f'Data in Task: {worker_value}') # block for a moment sleep(1) # define the data data = '12345' # report data in the main thread print(f'Data in Main: {data}') # create the thread pool with ThreadPoolExecutor(initializer=worker_init, initargs=(data,)) as tpe: # issue many tasks to the thread pool _ = [tpe.submit(task) for _ in range(10)] # wait for all tasks to complete |
Running the example, we can see that the “Variable defined” message is only reported once. This means that the global variable is only defined once in the initialization function, even though many worker threads were created.
Next, we can see that all 10 tasks are able to access the shared global variable, regardless of the thread used to execute it.
1 2 3 4 5 6 7 8 9 10 11 12 |
Data in Main: 12345 Variable defined Data in Task: 12345 [thread=ThreadPoolExecutor-0_0] Data in Task: 12345 [thread=ThreadPoolExecutor-0_1] Data in Task: 12345 [thread=ThreadPoolExecutor-0_2] Data in Task: 12345 [thread=ThreadPoolExecutor-0_3] Data in Task: 12345 [thread=ThreadPoolExecutor-0_4] Data in Task: 12345 [thread=ThreadPoolExecutor-0_5] Data in Task: 12345 [thread=ThreadPoolExecutor-0_6] Data in Task: 12345 [thread=ThreadPoolExecutor-0_7] Data in Task: 12345 [thread=ThreadPoolExecutor-0_8] Data in Task: 12345 [thread=ThreadPoolExecutor-0_9] |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to share data from the main thread with tasks executed by workers in the ThreadPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Alexey Lin on Unsplash
Do you have any questions?