Implementing multithreading or multiprocessing in Python is really easy. However when running a function in a thread and collect its return value, needs a trick. There are multiple solutions to do so, here is shown just one way of many possible ways.

Choosing between multithreading or multiprocessing depends on your application. The threading module uses threads, the multiprocessing module uses processes. The difference is that threads run in the same memory space and run on a single core, while processes have separate memory and run on multiple cores. This makes it a bit harder to share objects between processes with multiprocessing. Since threads use the same memory, precautions have to be taken or two threads will write to the same memory at the same time. If you want to use multiple cores to do parallel processing, then you should choose multiprocessing.

First let’s define our original (dummy) function that we want to process in parallel:

import time
from threading import Thread
from multiprocessing import Process
from multiprocessing import Queue

def some_function(n_loops, t):
    y = (t + 1)*1000
    for x in range(n_loops):
        y += x
        print('thread: {}, x: {}, y: {}'.format(t, x, y))
        time.sleep(1)
    return y

 

Multithreading

Lets start with multithreading. To get the function output, we have to convert the return value to a function input argument as pointer reference. This can be archieved with giving a list as function argument, where we store the return value. Define a function wrapper that stores the return value into a result list:

def mt_worker(results, idx, f, *args): 
    results[idx] = f(*args)

Then, init the Threads, run and join:

def mt_run(n_loops, n_threads):
    threads = [None]*n_threads
    results = [None]*n_threads
    
    # Init and start processes
    for t in range(n_threads):
        threads[t] = Thread(target=mt_worker, args=(results, t, some_function, n_loops, t))
        threads[t].start()
    
    # Wait for the processes to finish
    for i in range(n_threads):
        threads[i].join()
        
    print('Results: ', results)

Et voila, all function outputs of the threads are collected and stored in the results list.

Multiprocessing

With multiprocessing, the idea is similar. However, we store the output now in a multiprocessing.Queue. Let’s define first the wrapper function to catch the original function return value.

def mp_worker(queue, f, *argv):
    queue.put(f(*argv))

Then, initiate the queue and processes, start the processes, collect the function outputs from the queue and wait for the processes to finish:

def mp_run(n_loops, n_threads):
    
    processes = [None]*n_threads
    results = [None]*n_threads
    queue = Queue()
    
    # Init and start processes
    for t in range(n_threads):
        p = Process(target=mp_worker, args=(queue, some_function, n_loops, t))
        processes[t] = p
        p.start()
    
    # Collect process output from the queue
    for t in range(n_threads):
        results[t] = queue.get()
    
    # Wait for the processes to finish
    for p in processes:
        p.join()
    
    print('Results: ', results)

Results

All is set up, let’s run the code.

n_threads = 5
n_loops = 4

# Multithreading
mt_run(n_loops, n_threads)

# Multiprocessing
mp_run(n_loops, n_threads)

The output should look something like this:

thread: 0, x: 0, y: 1000
thread: 1, x: 0, y: 2000
thread: 2, x: 0, y: 3000
thread: 3, x: 0, y: 4000
thread: 0, x: 1, y: 1001
thread: 1, x: 1, y: 2001
thread: 2, x: 1, y: 3001
thread: 3, x: 1, y: 4001
thread: 0, x: 2, y: 1003
thread: 1, x: 2, y: 2003
thread: 2, x: 2, y: 3003
thread: 3, x: 2, y: 4003
thread: 0, x: 3, y: 1006
thread: 1, x: 3, y: 2006
thread: 2, x: 3, y: 3006
thread: 3, x: 3, y: 4006
thread: 0, x: 4, y: 1010
thread: 1, x: 4, y: 2010
thread: 2, x: 4, y: 3010
thread: 3, x: 4, y: 4010
Results:  [2010, 1010, 3010, 4010]

concurrent.future

The concurrent.future framework does the above described magic for you and makes it even easier to implement multithreading or multiprocessing. For example, a thread pool is allocated and initiated with

pool = ThreadPoolExecutor(max_workers=8)

followed by

a = pool.submit(some_function, n_loops=10, t=i)

and you can store the return value of your function. Below is an example:

import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor


def some_function(n_loops, t):
    y = 0
    for x in range(n_loops):
        y = (t + 1) * 1000 + x
        print('process: {}, z: {}'.format(t, y))
        time.sleep(1)
    return y


def multi_threading():
    pool = ThreadPoolExecutor(max_workers=8)
    res = []
    for i in range(10):
        a = pool.submit(some_function, n_loops=10, t=i)
        res.append(a)
    for i in range(10):
        print(res[i].result())


def multi_processing():
    n_cpus = multiprocessing.cpu_count()
    pool = ProcessPoolExecutor(max_workers=n_cpus)
    res = []
    for i in range(10):
        a = pool.submit(some_function,  n_loops=10, t=i)
        res.append(a)
    for i in range(10):
        print(res[i].result())


if __name__ == '__main__':
    multi_threading()
    multi_processing()