The task is to combine two arrays row by row (construct the permutations) based on the resulting multiplication of two corresponding vectors. Such as:
The result should be: Row1_A_Row1_B, Row1_A_Row2_B, Row1_A_Row3_B, Row2_A_Row1_B, etc..
Given the following initial arrays:
n_rows = 1000 A = np.random.randint(10, size=(n_rows, 5)) B = np.random.randint(10, size=(n_rows, 5)) P_A = np.random.rand(n_rows, 1) P_B = np.random.rand(n_rows, 1)
Arrays P_A and P_B are corresponding vectors to the individual arrays, which contain a float. The combined rows should only appear in the final array if the multiplication surpasses a certain threshold, for example:
lim = 0.8
I have thought of the following functions or ways to solve this problem, but I would be interested in faster solutions. I am open to using numba or other libraries, but ideally I would like to improve the vectorized solution using numpy.
def concatenate_per_row(A, B): m1,n1 = A.shape m2,n2 = B.shape out = np.zeros((m1,m2,n1+n2),dtype=A.dtype) out[:,:,:n1] = A[:,None,:] out[:,:,n1:] = B return out.reshape(m1*m2,-1) %%timeit A_B = concatenate_per_row(A, B) P_A_B = (P_A[:, None]*P_B[None, :]) P_A_B = P_A_B.flatten() idx = P_A_B > lim A_B = A_B[idx, :] P_A_B = P_A_B[idx]
37.8 ms ± 660 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit A_B =  P_A_B =  for i in range(len(P_A)): P_A_B_i = P_A[i]*P_B idx = np.where(P_A_B_i > lim) if len(idx) > 0: P_A_B.append(P_A_B_i[idx]) A_B_i = np.zeros((len(idx), A.shape + B.shape), dtype='int') A_B_i[:, :A.shape] = A[i] A_B_i[:, A.shape:] = B[idx, :] A_B.append(A_B_i) A_B = np.concatenate(A_B) P_A_B = np.concatenate(P_A_B)
9.65 ms ± 291 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
First of all, there is a more efficient algorithm. Indeed, you can pre-compute the size of the output array so the values can be directly written in the final output arrays rather than stored temporary in lists. To find the size efficiently, you can sort the array
P_B and then do a binary search so to find the number of value greater than
lim/P_A[i,0] for all possible
P_B*P_A[i,0] > lim is equivalent to
P_B > lim/P_A[i,0]). The number of item filtered for each
i can be temporary stored so to quickly loop over the filtered items.
Moreover, you can use Numba to significantly speed the computation of the main loop up.
Here is the resulting code:
@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])') def compute(A, B, P_A, P_B): assert P_A.shape == 1 assert P_B.shape == 1 P_B_sorted = np.sort(P_B.reshape(P_B.size)) counts = len(P_B) - np.searchsorted(P_B_sorted, lim/P_A[:,0], side='right') n = np.sum(counts) mA, mB = A.shape, B.shape m = mA + mB A_B = np.empty((n, m), dtype=np.int_) P_A_B = np.empty((n, 1), dtype=np.float64) k = 0 for i in range(P_A.shape): if counts[i] > 0: idx = np.where(P_B > lim/P_A[i, 0]) assert counts[i] == len(idx) start, end = k, k + counts[i] A_B[start:end, :mA] = A[i, :] A_B[start:end, mA:] = B[idx, :] P_A_B[start:end, :] = P_B[idx, :] * P_A[i, 0] k += counts[i] return A_B, P_A_B
Here are performance results on my machine:
Original: 35.6 ms Optimized original: 18.2 ms Proposed (with order): 0.9 ms Proposed (no ordering): 0.3 ms
The algorithm proposed above is 20 times faster than the original optimized algorithm. It can be made even faster. Indeed, if the order of items do not matter you can use an
argsort so to reorder both
P_B. This enable you not to compute
idx every time in the hot loop and select directly the last elements from
P_B (that are guaranteed to be higher than the threshold but not in the same order than the original code). Because the selected items are stored contiguously in memory, this implementation is much faster. In the end, this last implementation is about 60 times faster than the original optimized algorithm. Note that the proposed implementations are significantly faster than the original ones even without Numba.
Here is the implementation that do not care about the order of the items:
@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])') def compute(A, B, P_A, P_B): assert P_A.shape == 1 assert P_B.shape == 1 nA, mA = A.shape nB, mB = B.shape m = mA + mB order = np.argsort(P_B.reshape(nB)) P_B_sorted = P_B[order, :] B_sorted = B[order, :] counts = nB - np.searchsorted(P_B_sorted.reshape(nB), lim/P_A[:,0], side='right') nRes = np.sum(counts) A_B = np.empty((nRes, m), dtype=np.int_) P_A_B = np.empty((nRes, 1), dtype=np.float64) k = 0 for i in range(P_A.shape): if counts[i] > 0: start, end = k, k + counts[i] A_B[start:end, :mA] = A[i, :] A_B[start:end, mA:] = B_sorted[nB-counts[i]:, :] P_A_B[start:end, :] = P_B_sorted[nB-counts[i]:, :] * P_A[i, 0] k += counts[i] return A_B, P_A_B
Answered By – Jérôme Richard
Answer Checked By – Timothy Miller (BugsFixing Admin)