[SOLVED] Conditional combination of arrays row by row

Issue

The task is to combine two arrays row by row (construct the permutations) based on the resulting multiplication of two corresponding vectors. Such as:

Row1_A,
Row2_A,
Row3_A,

Row1_B,
Row2_B,
Row3_B,

The result should be: Row1_A_Row1_B, Row1_A_Row2_B, Row1_A_Row3_B, Row2_A_Row1_B, etc..

Given the following initial arrays:

n_rows = 1000
A = np.random.randint(10, size=(n_rows, 5))
B = np.random.randint(10, size=(n_rows, 5))
P_A = np.random.rand(n_rows, 1)
P_B = np.random.rand(n_rows, 1)

Arrays P_A and P_B are corresponding vectors to the individual arrays, which contain a float. The combined rows should only appear in the final array if the multiplication surpasses a certain threshold, for example:

lim = 0.8

I have thought of the following functions or ways to solve this problem, but I would be interested in faster solutions. I am open to using numba or other libraries, but ideally I would like to improve the vectorized solution using numpy.

Method A

def concatenate_per_row(A, B):
    m1,n1 = A.shape
    m2,n2 = B.shape

    out = np.zeros((m1,m2,n1+n2),dtype=A.dtype)
    out[:,:,:n1] = A[:,None,:]
    out[:,:,n1:] = B
    return out.reshape(m1*m2,-1)

%%timeit

A_B = concatenate_per_row(A, B)
P_A_B = (P_A[:, None]*P_B[None, :])
P_A_B = P_A_B.flatten()

idx = P_A_B > lim

A_B = A_B[idx, :]
P_A_B = P_A_B[idx]

37.8 ms ± 660 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Method B

%%timeit

A_B = []
P_A_B = []

for i in range(len(P_A)):
    P_A_B_i = P_A[i]*P_B
    idx = np.where(P_A_B_i > lim)[0]
    
    if len(idx) > 0:
        P_A_B.append(P_A_B_i[idx])

        A_B_i = np.zeros((len(idx), A.shape[1] + B.shape[1]), dtype='int')
        A_B_i[:, :A.shape[1]] = A[i]
        A_B_i[:, A.shape[1]:] = B[idx, :]
        A_B.append(A_B_i)

A_B = np.concatenate(A_B)
P_A_B = np.concatenate(P_A_B)

9.65 ms ± 291 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Solution

First of all, there is a more efficient algorithm. Indeed, you can pre-compute the size of the output array so the values can be directly written in the final output arrays rather than stored temporary in lists. To find the size efficiently, you can sort the array P_B and then do a binary search so to find the number of value greater than lim/P_A[i,0] for all possible i (P_B*P_A[i,0] > lim is equivalent to P_B > lim/P_A[i,0]). The number of item filtered for each i can be temporary stored so to quickly loop over the filtered items.

Moreover, you can use Numba to significantly speed the computation of the main loop up.

Here is the resulting code:

@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])')
def compute(A, B, P_A, P_B):
    assert P_A.shape[1] == 1
    assert P_B.shape[1] == 1

    P_B_sorted = np.sort(P_B.reshape(P_B.size))
    counts = len(P_B) - np.searchsorted(P_B_sorted, lim/P_A[:,0], side='right')
    n = np.sum(counts)
    mA, mB = A.shape[1], B.shape[1]
    m = mA + mB

    A_B = np.empty((n, m), dtype=np.int_)
    P_A_B = np.empty((n, 1), dtype=np.float64)
    k = 0

    for i in range(P_A.shape[0]):
        if counts[i] > 0:
            idx = np.where(P_B > lim/P_A[i, 0])[0]
            assert counts[i] == len(idx)

            start, end = k, k + counts[i]
            A_B[start:end, :mA] = A[i, :]
            A_B[start:end, mA:] = B[idx, :]
            P_A_B[start:end, :] = P_B[idx, :] * P_A[i, 0]
            k += counts[i]

    return A_B, P_A_B

Here are performance results on my machine:

Original:                35.6 ms
Optimized original:      18.2 ms
Proposed (with order):    0.9 ms
Proposed (no ordering):   0.3 ms

The algorithm proposed above is 20 times faster than the original optimized algorithm. It can be made even faster. Indeed, if the order of items do not matter you can use an argsort so to reorder both B and P_B. This enable you not to compute idx every time in the hot loop and select directly the last elements from B and P_B (that are guaranteed to be higher than the threshold but not in the same order than the original code). Because the selected items are stored contiguously in memory, this implementation is much faster. In the end, this last implementation is about 60 times faster than the original optimized algorithm. Note that the proposed implementations are significantly faster than the original ones even without Numba.

Here is the implementation that do not care about the order of the items:

@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])')
def compute(A, B, P_A, P_B):
    assert P_A.shape[1] == 1
    assert P_B.shape[1] == 1

    nA, mA = A.shape
    nB, mB = B.shape
    m = mA + mB

    order = np.argsort(P_B.reshape(nB))
    P_B_sorted = P_B[order, :]
    B_sorted = B[order, :]
    counts = nB - np.searchsorted(P_B_sorted.reshape(nB), lim/P_A[:,0], side='right')
    nRes = np.sum(counts)

    A_B = np.empty((nRes, m), dtype=np.int_)
    P_A_B = np.empty((nRes, 1), dtype=np.float64)
    k = 0

    for i in range(P_A.shape[0]):
        if counts[i] > 0:
            start, end = k, k + counts[i]
            A_B[start:end, :mA] = A[i, :]
            A_B[start:end, mA:] = B_sorted[nB-counts[i]:, :]
            P_A_B[start:end, :] = P_B_sorted[nB-counts[i]:, :] * P_A[i, 0]
            k += counts[i]

    return A_B, P_A_B

Answered By – Jérôme Richard

Answer Checked By – Timothy Miller (BugsFixing Admin)

Leave a Reply

Your email address will not be published. Required fields are marked *