[SOLVED] using numba with np.concatenate is not efficient in parallel?

Issue

I’m having some trouble getting np.concatenate to parallelize efficiently. Here is a minimal working example. (I know that here I could sum a and b separately, but I am focussing on parallelizing the concatenate operation since this is what I need to do in my project. I would then do further operations on the concatenated array, such as sorting.)

However many cores I run this on, it always seems to take the same time (~10 seconds). If anything, it is slower on more cores. I tried releasing the GIL on cc using nogil=True in the decorator, but to no avail. Note that all cores are clearly in use during the calculation, even though there is no speed-up.

Can anybody help me?

Many thanks

from numba import prange, njit
import numpy as np


@njit()
def cc():

    r = np.random.rand(20)
    a = r[r < 0.5]
    b = r[r > 0.7]

    c = np.concatenate((a, b))

    return np.sum(c)


@njit(parallel=True)
def cc_wrap():
    n = 10 ** 7
    result = np.empty(n)
    for i in prange(n):
        result[i] = cc()

    return result

cc_wrap()

Solution

The main issue comes from contention of the allocator.

The cc function creates many implicit small temporary arrays. For example np.random.rand do that as well as r < 0.5 or even a = r[condition], not to mention np.concatenate. Temporary arrays generally needs to be allocated in the heap using a given allocator. The default allocator provided by the standard library is no guaranteed to scale well using multiple threads. Allocations do not perfectly scale because an expensive implicit synchronisation is needed between the threads allocating data. For example, a thread can allocate an array deleted by another one. In the worst case, the allocator could serialize the allocations/deletes. Because the cost of allocating data is huge compared to the work to perform on the allocated data, the overhead of the synchronization is predominant and the overall execution is serialized. Actually, it is even worse since the sequential time is already dominated by overheads.

Note that an aggressively optimizing compiler could allocate the arrays on the stack because they do not escape the function. However, this is unfortunately not visibly done by Numba. Moreover, the allocator could be tuned to scale very-well using per-thread pools assuming Numba threads never delete data allocated by other ones (which is probably the case although I am not totally sure). Still, the allocated memory pool needs to be requested to the Operating System that generally do not scale well too (especially Windows AFAIK).

The best solution is simply avoid creating implicit temporary arrays. This is possible using a per-worker local temporary array combined with a partitioning algorithm. Note that the functions can be compiled ahead-of-time to time by specifying types to Numba.

Here is the resulting implementation:

import numba as nb
import numpy as np
import random

@nb.njit('float64(float64[::1])')
def cc(tempBuffer):
    assert tempBuffer.size >= 20

    # View to the temporary scratch buffer
    r = tempBuffer[0:20]

    # Generate 20 random numbers without any allocation
    for i in range(20):
        r[i] = random.random()

    j = 0

    # Partition the array so to put values smaller than
    # a condition in the beginning.
    # After the loop, r[0:split] contains the filtered values.
    for i in range(r.size):
        if r[i] < 0.5:
            r[i], r[j] = r[j], r[i]
            j += 1

    split = j

    # Partition the rest of the array.
    # After the loop, r[split:j] contains the other filtered values.
    for i in range(j, r.size):
        if r[i] > 0.7:
            r[i], r[j] = r[j], r[i]
            j += 1

    # Note that extracting contiguous views it cheap as 
    # it does not create a new temporary array
    # a = r[0:split]
    # b = r[split:j]
    c = r[0:j]

    return np.sum(c)

@nb.njit('float64[:]()', parallel=True)
def cc_wrap():
    n = 10 ** 7
    result = np.empty(n)

    # Preallocate some space once for all threads
    globalTempBuffer = np.empty((nb.get_num_threads(), 64), dtype=np.float64)

    for i in nb.prange(n):
        threadId = nb.np.ufunc.parallel._get_thread_id()
        threadLocalBuffer = globalTempBuffer[threadId]
        result[i] = cc(threadLocalBuffer)

    return result

cc_wrap()

Note that the thread-local manipulation is a bit tricky and often not needed. In this case, there is visible a speed up only using the partitioning algorithm to reduce allocations. However, the overhead of the allocation is still pretty due to the very very small size of the temporary array and the number of allocations.

Note also that the r array is not strictly required in this code as random numbers can be summed in-place. This may not fit your need regarding your real-word use-cases. Here is a (much simpler implementation):

@nb.njit('float64()')
def cc():
    s = 0.0
    for i in range(20):
        e = random.random()
        if e < 0.5 or e > 0.7:
            s += e
    return s

@nb.njit('float64[:]()', parallel=True)
def cc_wrap():
    n = 10 ** 7
    result = np.empty(n)
    for i in nb.prange(n):
        result[i] = cc()
    return result

cc_wrap()

Here are timings on my 6-core machine:

# Initial (sequential):      8.1 s
# Initial (parallel):        9.0 s
# Array-based (sequential):  2.50 s
# Array-based (parallel):    0.41 s
# In-place (sequential):     1.09 s
# In-place (parallel):       0.19 s

In the end, the fastest parallel version is 47 times faster than the original one (and scale almost perfectly).

Answered By – Jérôme Richard

Answer Checked By – Willingham (BugsFixing Volunteer)

Leave a Reply

Your email address will not be published. Required fields are marked *