Simple Examples of Parallel Computing of Cython

It is more convenient to calculate some properties during a molecular simulation process by accessing data from API of the molecular simulation program than calculating after the whole simulation progress and dumping all the coordinates. Especially, for sharply fluctuated properties such as Virial, RDF, etc., require massive frames to calculate. It is expensive to dump densely. For some Python-friendly molecular simulation softwares, e.g., lammps, hoomd-blue, galamost, etc.; it is easily to embed customized Python functions into the simulation control scripts. However, Python has a poor performance in massive calculations, numba.cuda.jit does dramatically boost up the program, however, due to unknown reasons, GPU-accelerated molecular simulation softwares like hoomd-blue and galamost produce errors during simulations if one uses numba.cuda.jit compiled functions. Therefore, I try to use Cython to generate C-functions to accelerate the calculations. Here I put a simple example of a pairwise property calculation function (RDF) using Cython-parallel method, it's also a learning note of mine:

# cython: language_level=3, boundscheck=False, wraparound=False, cdivision=True
import numpy as np
cimport numpy as np
import cython
from cython.parallel import prange, parallel
cimport openmp
from libc.math cimport floor,sqrt,pow
from libc.stdlib cimport malloc, free
import multiprocessing


cdef long * unravel_index_f(long i, long[:] dim) nogil:
    cdef long k, d
    d = dim.shape[0]
    cdef long * ret = <long *> malloc(d * sizeof(long))
    for k in range(d):
        ret[k] = i % dim[k]
        i = (i - ret[k]) / dim[k]
    return ret


cdef long ravel_index_f(long * vec, long[:] dim) nogil:
    cdef long ret, d, tmp, k
    d = dim.shape[0]
    ret = (vec[0] + dim[0]) % dim[0]
    tmp = dim[0]
    for k in range(1,d):
        ret += ((vec[k] + dim[k]) % dim[k]) * tmp
        tmp *= dim[k]
    return ret


cdef long jth_neighbour(long * veci, long * vecj, long[:] dim) nogil:
    cdef long ret, d, tmp, k
    d = dim.shape[0]
    ret = (veci[0] + vecj[0] - 1 + dim[0]) % dim[0]
    # re-ravel tmpi + tmpj - 1 to cell_j
    # -1 for indices from -1, -1, -1 to 1, 1, 1 rather than 0,0,0 to 2,2,2
    tmp = dim[0]
    for k in range(1, d):
        ret += ((veci[k] + vecj[k] - 1 + dim[k]) % dim[k]) * tmp
        tmp *= dim[k]
    return ret


cdef double pbc_dist(double[:] x, double[:] y, double[:] b) nogil:
    cdef long i, d
    cdef double tmp=0, r=0
    d = b.shape[0]
    for i in range(d):
        tmp = x[i]-y[i]
        tmp = tmp - b[i] * floor(tmp/b[i]+0.5)
        r = r + pow(tmp, 2)
    return sqrt(r)


cdef long cell_id(double[:] p, double[:] box, long[:] ibox) nogil:
    # In the Fortran way
    cdef long ret, tmp, i, n
    n = p.shape[0]
    ret = <long> floor((p[0] / box[0] + 0.5) * ibox[0])
    tmp = ibox[0]
    for i in range(1, n):
        ret = ret + tmp * <long> floor((p[i] / box[i] + 0.5) * ibox[i])
        tmp = tmp * ibox[i]
    return ret


cdef void linked_cl(double[:, :] pos, double[:] box, long[:] ibox, long[:] head, long[:] body) nogil:
    cdef long i, n, ic
    n = pos.shape[0]
    for i in range(n):
        ic = cell_id(pos[i], box, ibox)
        body[i] = head[ic]
        head[ic] = i


def rdf(double[:,:] x, double[:,:] y, double[:] box, double bs, long nbins):
    cdef long i, j, k, l, n, m, d3, d, ic, jc
    cdef np.ndarray[np.double_t, ndim=2] ret
    cdef long[:] head, body, ibox, dim
    cdef double r, r_cut
    cdef long ** j_vecs
    cdef long * veci
    cdef int num_threads, thread_num
    num_threads = multiprocessing.cpu_count()
    r_cut = nbins * bs
    n = x.shape[0]
    d = x.shape[1]
    m = y.shape[0]
    d3 = 3 ** d
    ibox = np.zeros((d,), dtype=np.int64)
    for i in range(d):
        ibox[i] = <long> floor(box[i] / r_cut + 0.5)
    head = np.zeros(np.multiply.reduce(ibox), dtype=np.int64) - 1
    body = np.zeros((m,), dtype=np.int64) - 1
    linked_cl(y, box, ibox, head, body)
    ret = np.zeros((num_threads, nbins), dtype=np.float)
    dim = np.zeros((d,), dtype=np.int64) + 3
    j_vecs = <long **> malloc(sizeof(long *) * d3)
    for i in range(d3):
        j_vecs[i] = unravel_index_f(i, dim)  
    with nogil, parallel(num_threads=num_threads):
        for i in prange(n, schedule='dynamic'):
            ic = cell_id(x[i], box, ibox)
            thread_num = openmp.omp_get_thread_num()
            veci = unravel_index_f(ic, ibox)
            for j in range(d3):
                jc = jth_neighbour(veci, j_vecs[j], ibox)
                k = head[jc]
                while k != -1:
                    r = pbc_dist(x[i], y[k], box)
                    if r < r_cut:
                        l = <long> (r/bs)
                        ret[thread_num, l]+=1
                    k = body[k]
            free(veci)  # arrays created by malloc should be freed. (in unravel_index_f)
    free(j_vecs)   # without calling free(), memory leak would happen.
    return np.sum(ret, axis=0)

Notes:

  1. def means Python function and cdef means C functions;
  2. Arrays declared by double[:] etc. are memoryslice and can be conveniently initialized by np.zeros or reading an np.ndarray as function parameter;
  3. Without GIL, any calling of Python function is forbidden, the array must be initialized and declared in the C method;
  4. In prange loop, variables are thread-local (see the generated .c file, in section #pragma omp parallel), there is no explicit way to call an atomic operation in Cython (using with gil results in very low efficiency), hence the result is generated as ret = np.zeros((n, nbins), ... so that in each thread i that ret[i] are independent, or generate ret as (num_threads, data structure...), just keep independent amongst threads and in ith loop, call openmp.omp_get_thread_num() to get the current thread's id;
  5. <long> is a type casting.

This RDF function uses a linked cell list algorithm to reduce the calculation from $O(n^2)$ to $O(n)$, a small ranged RDF ($r<r_\mathrm{cut}$) is calculated, the RDF on $r\ge r_\mathrm{cut}$ can be calculated using an FFT algorithm by setting $r_\mathrm{bin}<\frac{r_\mathrm{cut}}{2}$

The second example is calculation of histogram N-d array by modulus of indices: $\int f(\mathbf{r})\delta(|\mathbf{r}|-r)\mathrm{d}\mathbf{r}$:

# cython: language_level=3, boundscheck=False, wraparound=False, cdivision=True
import numpy as np
cimport numpy as np
import cython
from cython.parallel import prange, parallel
cimport openmp
from libc.math cimport floor,sqrt,pow
from libc.stdlib cimport malloc, free
import multiprocessing


cdef double unravel_index_f_r(long i, long[:] dim) nogil:
    # unravel in Fortran way
    cdef long k, d, tmp
    cdef double r
    d = dim.shape[0]
    for k in range(d):
        tmp = i % dim[k]
        r += <double> (tmp)**2
        i = (i - tmp) / dim[k]
    return sqrt(r)



def hist_to_r(double[:] x, long[:] shape, double dr, double bs, double rc):
    cdef long i, n, j, n_bins
    cdef np.ndarray[np.double_t, ndim=2] ret, count
    cdef int num_threads, thread_num
    n_bins = <long> (rc / bs)
    num_threads = multiprocessing.cpu_count()
    ret = np.zeros((num_threads, n_bins), dtype=np.float64)
    count = np.zeros((num_threads, n_bins), dtype=np.float64)
    n = x.shape[0]
    with nogil, parallel(num_threads=num_threads):
        for i in prange(n, schedule='dynamic'):
            j = <long> (unravel_index_f_r(i, shape)*dr/bs)
            thread_num = openmp.omp_get_thread_num()
            if j < n_bins:
                ret[thread_num, j] += x[i]
                count[thread_num, j] += 1
    return np.sum(ret,axis=0), np.sum(count,axis=0)

In function unravel_index_f_r, the index is unraveled in the Fortran way, therefore, calling hist_to_r requires x.ravel(order='F').

No comments:

Post a Comment

Back2Top ^