Background

I am currently working on my bachelor thesis and basically my task is to optimise a given code in Go, i.e. make it run as fast as possible. First, I optimised the serial function and then tried to introduce parallelism via goroutines. After researching on the internet I now understand the difference between concurrency and parallelism thanks to the following slides from talks.golang. I visited some parallel programming courses where we parallelised a c/c++ code with help of pthread/openmp, thus I tried to apply these paradigms in Go. That said, in this particular case I am optimising a function which computes the moving average of a slice with length `len:=n+(window_size-1)` (it equals either 9393 or 10175), hence we have `n` windows of which we compute the corresponding arithmetic average and save that properly in the output slice.

Note that this task is inherently embarrassing parallel.

My optimisation attempts and results

In `moving_avg_concurrent2` I split up the slice into `num_goroutines` smaller pieces and ran each with one goroutine. This function performed with one goroutine, out of some reason (could not find out why yet, but we are getting tangent here), better than `moving_avg_serial4` but with more than one goroutine it started to perform worse than `moving_avg_serial4`.
In `moving_avg_concurrent3` I adopted the master/worker paradigm. The performance was worse than `moving_avg_serial4` when using one goroutine. Here we at least I got a better performance when increasing `num_goroutines` but still not better than `moving_avg_serial4`.
To compare the performances of `moving_avg_serial4`, `moving_avg_concurrent2` and `moving_avg_concurrent3` I wrote a benchmark and I tabulated the results:

``````fct & num_goroutines | timing in ns/op | percentage
---------------------------------------------------------------------
serial4    |         4357893 |   100.00%
concur2_1  |         5174818 |   118.75%
concur2_4  |         9986386 |   229.16%
concur2_8  |        18973443 |   435.38%
concur2_32 |        75602438 |  1734.84%
concur3_1  |        32423150 |   744.01%
concur3_4  |        21083897 |   483.81%
concur3_8  |        16427430 |   376.96%
concur3_32 |        15157314 |   347.81%
``````

Question

Since as mentioned above this problem is embarrassingly parallel I was expecting to see a tremendous performance increase but that was not the case.

Why does `moving_avg_concurrent2` not scale at all?

And why is `moving_avg_concurrent3` that much slower than `moving_avg_serial4`?

I know that goroutines are cheap but still are not free, but is it possible that this generates that much overhead such that we are even slower than `moving_avg_serial4`?

Code

Functions:

``````// returns a slice containing the moving average of the input (given, i.e. not optimised)
func moving_avg_serial(input []float64, window_size int) []float64 {
first_time := true
var output = make([]float64, len(input))
if len(input) > 0 {
var buffer = make([]float64, window_size)
// initialise buffer with NaN
for i := range buffer {
buffer[i] = math.NaN()
}
for i, val := range input {
old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
buffer[int((math.Mod(float64(i), float64(window_size))))] = val
if !NaN_in_slice(buffer) && first_time {
sum := 0.0
for _, entry := range buffer {
sum += entry
}
output[i] = sum / float64(window_size)
first_time = false
} else if i > 0 && !math.IsNaN(output[i-1]) && !NaN_in_slice(buffer) {
output[i] = output[i-1] + (val-old_val)/float64(window_size) // solution without loop
} else {
output[i] = math.NaN()
}
}
} else { // empty input
fmt.Println("moving_avg is panicking!")
panic(fmt.Sprintf("%v", input))
}
return output
}

// returns a slice containing the moving average of the input
// reordering the control structures to exploid the short-circuit evaluation
func moving_avg_serial4(input []float64, window_size int) []float64 {
first_time := true
var output = make([]float64, len(input))
if len(input) > 0 {
var buffer = make([]float64, window_size)
// initialise buffer with NaN
for i := range buffer {
buffer[i] = math.NaN()
}
for i := range input {
//            fmt.Printf("in mvg_avg4: i=%v\n", i)
old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
buffer[int((math.Mod(float64(i), float64(window_size))))] = input[i]
if first_time && !NaN_in_slice(buffer) {
sum := 0.0
for j := range buffer {
sum += buffer[j]
}
output[i] = sum / float64(window_size)
first_time = false
} else if i > 0 && !math.IsNaN(output[i-1]) /* && !NaN_in_slice(buffer)*/ {
output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // solution without loop
} else {
output[i] = math.NaN()
}
}
} else { // empty input
fmt.Println("moving_avg is panicking!")
panic(fmt.Sprintf("%v", input))
}
return output
}

// returns a slice containing the moving average of the input
// splitting up slice into smaller pieces for the goroutines but without using the serial version, i.e. we only have NaN's in the beginning, thus hope to reduce some overhead
// still does not scale (decreasing performance with increasing size and num_goroutines)
func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 {
var output = make([]float64, window_size-1, len(input))
for i := 0; i < window_size-1; i++ {
output[i] = math.NaN()
}
if len(input) > 0 {
num_items := len(input) - (window_size - 1)
var barrier_wg sync.WaitGroup
n := num_items / num_goroutines
go_avg := make([][]float64, num_goroutines)
for i := 0; i < num_goroutines; i++ {
go_avg[i] = make([]float64, 0, num_goroutines)
}

for i := 0; i < num_goroutines; i++ {
go func(go_id int) {
defer barrier_wg.Done()

// computing boundaries
var start, stop int
start = go_id*int(n) + (window_size - 1) // starting index
// ending index
if go_id != (num_goroutines - 1) {
stop = start + n // Ending index
} else {
stop = num_items + (window_size - 1) // Ending index
}

loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size)

loc_avg = make([]float64, stop-start)
current_sum := 0.0
for i := start - (window_size - 1); i < start+1; i++ {
current_sum += input[i]
}
loc_avg[0] = current_sum / float64(window_size)
idx := 1

for i := start + 1; i < stop; i++ {
loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size)
idx++
}

go_avg[go_id] = append(go_avg[go_id], loc_avg...)

}(i)
}
barrier_wg.Wait()

for i := 0; i < num_goroutines; i++ {
output = append(output, go_avg[i]...)
}

} else { // empty input
fmt.Println("moving_avg is panicking!")
panic(fmt.Sprintf("%v", input))
}
return output
}

// returns a slice containing the moving average of the input
// change of paradigm, we opt for a master worker pattern and spawn all windows which each will be computed by a goroutine
func compute_window_avg(input, output []float64, start, end int) {
sum := 0.0
size := end - start
for _, val := range input[start:end] {
sum += val
}
output[end-1] = sum / float64(size)
}

func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 {
var output = make([]float64, window_size-1, len(input))
for i := 0; i < window_size-1; i++ {
output[i] = math.NaN()
}
if len(input) > 0 {
num_windows := len(input) - (window_size - 1)
var output = make([]float64, len(input))
for i := 0; i < window_size-1; i++ {
output[i] = math.NaN()
}

pending := make(chan *Work)
done := make(chan *Work)

// creating work
go func() {
for i := 0; i < num_windows; i++ {
pending <- NewWork(compute_window_avg, input, output, i, i+window_size)
}
}()

// start goroutines which work through pending till there is nothing left
for i := 0; i < num_goroutines; i++ {
go func() {
Worker(pending, done)
}()
}

// wait till every work is done
for i := 0; i < num_windows; i++ {
<-done
}

return output

} else { // empty input
fmt.Println("moving_avg is panicking!")
panic(fmt.Sprintf("%v", input))
}
return output
}
``````

Benchmarks:

``````//############### BENCHMARKS ###############
var import_data_res11 []float64
func benchmarkMoving_avg_serial(b *testing.B, window int) {
var r []float64
for n := 0; n < b.N; n++ {
}
import_data_res11 = r
}

var import_data_res14 []float64
func benchmarkMoving_avg_serial4(b *testing.B, window int) {
var r []float64
for n := 0; n < b.N; n++ {
}
import_data_res14 = r
}

var import_data_res16 []float64
func benchmarkMoving_avg_concurrent2(b *testing.B, window, num_goroutines int) {
var r []float64
for n := 0; n < b.N; n++ {
r = moving_avg_concurrent2(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
}
import_data_res16 = r
}

var import_data_res17 []float64
func benchmarkMoving_avg_concurrent3(b *testing.B, window, num_goroutines int) {
var r []float64
for n := 0; n < b.N; n++ {
r = moving_avg_concurrent3(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
}
import_data_res17 = r
}

func BenchmarkMoving_avg_serial_261x10(b *testing.B) {
benchmarkMoving_avg_serial(b, 261*10)
}

func BenchmarkMoving_avg_serial4_261x10(b *testing.B) {
benchmarkMoving_avg_serial4(b, 261*10)
}

func BenchmarkMoving_avg_concurrent2_261x10_1(b *testing.B) {
benchmarkMoving_avg_concurrent2(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent2_261x10_8(b *testing.B) {
benchmarkMoving_avg_concurrent2(b, 261*10, 8)
}

func BenchmarkMoving_avg_concurrent3_261x10_1(b *testing.B) {
benchmarkMoving_avg_concurrent3(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent3_261x10_8(b *testing.B) {
benchmarkMoving_avg_concurrent3(b, 261*10, 8)
}
//############### BENCHMARKS end ###############
``````

Remarks:
This is my very first post, I am still learning, so any constructive criticism is also welcome.

Fact #0: Pre-mature optimisation efforts often have negative yields showing they are just a waste of time & efforts

Why?
A single “wrong” SLOC may devastate performance into more than about +37%
or may improve performance to spend less than -57% of the baseline processing time

``````51.151µs on MA(200) [10000]float64    ~ 22.017µs on MA(200) [10000]int
70.325µs on MA(200) [10000]float64
``````

Why `[]int`-s?
You see it on your own above – this is the bread and butter for HPC/fintech efficient sub-[us] processing strategies ( and we still speak in terms of just `[SERIAL]` process scheduling ).

This one may test on any scale – but rather test first ( here ) your own implementations, on the very the same scale – `MA(200) [10000]float64` setup – and post your baseline durations in `[us]` to view the initial process performance and to compare apples-to-apples, having the posted `51.2 [us]` threshold to compare against.

Next comes the harder part:

Fact #1: This task is NOT embarrasingly parallel

Yes, one may go and implement a Moving Average calculation, so that it indeed proceeds through the heaps of data using some intentionally indoctrinated “just”-`[CONCURRENT]` processing approach ( irrespective whether being due to some kind of error, some authority’s “advice”, professional blindness or just from a dual-Socrates-fair ignorance ) which obviously does not mean that the nature of the convolutional stream-processing, present inside the Moving Average mathematical formulation, has forgotten to be a pure `[SERIAL]` process, just due to an attempt to enforce it get calculated inside some degree of “just”-`[CONCURRENT]` processing.

( Btw. The Hard Computer-Scientists and dual-domain nerds will also object here, that Go-language is by design using best Rob Pike’s skills into having a framework of concurrent coroutines, not any true-`[PARALLEL]` process-scheduling, even though Hoare’s CSP-tools, available in the language concept, may add some salt and pepper and introduce a stop-block type of inter-process communication tools, that will block “just”-`[CONCURRENT]` code sections into some hardwired CSP-p2p-synchronisation. )

Fact #2: Go distributed ( for any kind of speedup ) only AT THE END

Having a poor level of performance in `[SERIAL]` does not set any yardstick. Having a reasonable amount of performance tuning in single-thread, only then one may benefit from going distributed ( still having to pay additional serial costs, which makes Amdahl Law ( rather Overhead-strict-Amdahl Law ) get into the game ).

If one can introduce such low level of additional setup-overheads and still achieve any remarkable parallelism, scaled into the non-`[SEQ]` part of the processing, there and only there comes a chance to increase the process effective-performance.

It is not hard to loose much more than to gain in this, so always benchmark the pure-`[SEQ]` against the potential tradeoffs between a `non-[SEQ] / N[PAR]_processes` theoretical, overhead-naive speedup, for which one will pay a cost of a sum of all add-on-`[SEQ]`-overheads, so if and only if :

``````(         pure-[SEQ]_processing      [ns]
+        ( non-[SEQ]_processing      [ns] / N[PAR]_processes )
) << (  pure-[SEQ]_processing      [ns]
+ ( non-[SEQ]_processing      [ns] / 1 )
)
``````

Not having this jet-fighters advantage of both the surplus height and Sun behind you, never attempt going into any kind of HPC / parallelisation attempts – they will never pay for themselves not being remarkably `<<` better, than a smart `[SEQ]`-process.

Epilogue: on overhead-strict Amdahl’s Law interactive experiment UI

One animation is worth million words.

So,
assume a process-under-test, which has both a `[SERIAL]` and a `[PARALLEL]` part of the process schedule.

Let `p` be the `[PARALLEL]` fraction of the process duration `~ ( 0.0 .. 1.0 )` thus the `[SERIAL]` part does not last longer than `( 1 - p )`, right?

So, let’s start interactive experimentation from such a test-case, where the `p == 1.0`, meaning all such process duration is spent in just a `[PARALLEL]` part, and the both initial serial and the terminating parts of the process-flow ( that principally are always `[SERIAL]` ) have zero-durations `( ( 1 - p ) == 0. )`

Assume the system does no particular magic and thus needs to spend some real steps on intialisation of each of the `[PARALLEL]` part, so as to run it on a different processor `( (1), 2, .., N )`, so let’s add some overheads, if asked to re-organise the process flow and to marshal + distribute + un-marshal all the necessary instructions and data, so as the intended process now can start and run on `N` processors in parallel.

These costs are called `o` ( here initially assumed for simplicity to be just constant and invariant to `N`, which is not always the case in real, on silicon / on NUMA / on distributed infrastructures ).

By clicking on the Epilogue headline above, an interactive environment opens and is free for one’s own experimentation.

With `p == 1. && o == 0. && N > 1` the performance is steeply growing to current achievable `[PARALLEL]`-hardware O/S limits for a still monolytical O/S code-execution ( where still no additional distribution costs for MPI- and similar depeche-mode distributions of work-units ( where one would immediately have to add indeed a big number of `[ms]`, while our so far best just `[SERIAL]` implementation has obviously did the whole job in less than just ~ 22.1 [us] ) ).

But except such artificially optimistic case, the job does not look so cheap to get efficiently parallelised.

• Try having not a zero, but just about ~ 0.01% the setup overhead costs of `o`, and the line starts to show some very different nature of the overhead-aware scaling for even the utmost extreme `[PARALLEL]` case ( having still `p == 1.0` ), and having the potential speedup somewhere about just near the half of the initially super-idealistic linear speedup case.

• Now, turn the `p` to something closer to reality, somewhere less artificially set than the initial super-idealistic case of `== 1.00` `--> { 0.99, 0.98, 0.95 }` and … bingo, this is the reality, where process-scheduling ought be tested and pre-validated.

What does that mean?

As an example, if an overhead ( of launching + final joining a pool of coroutines ) would take more than ~ `0.1%` of the actual `[PARALLEL]` processing section duration, there would be not bigger speedup of 4x ( about a 1/4 of the original duration in time ) for 5 coroutines ( having p ~ 0.95 ), not more than 10x ( a 10-times faster duration ) for 20 coroutines ( all assuming that a system has 5-CPU-cores, resp. 20-CPU-cores free & available and ready ( best with O/S-level CPU-core-affinity mapped processes / threads ) for uninterrupted serving all those coroutines during their whole life-span, so as to achieve any above expected speedups.

Not having such amount of hardware resources free and ready for all of those task-units, intended for implementing the `[PARALLEL]`-part of the process-schedule, the blocking/waiting states will introduce additional absolute wait-states and the resulting-performance adds these new-`[SERIAL]`-blocking/waiting sections to the overall process-duration and the initially wished-to-have speedups suddenly cease to exist and performance factor falls well under `<< 1.00` ( meaning that the effective run-time was due to the blocking-states way slower, than the non-parallelised just-`[SERIAL]` workflow ).

This may sound complicated for new keen experimentators, however we may put it in a reversed perspective. Given the whole process of distribution the intended `[PARALLEL]` pool-of-tasks is known not to be shorter than, say, about a `10 [us]`, the overhead-strict graphs show, there needs to be at least about `1000 x 10 [us]` of non-blocking computing intensive processing inside the `[PARALLEL]` section so as not to devastate the efficiency of the parallelised-processing.

If there is not a sufficiently “fat”-piece of processing, the overhead-costs ( going remarkably above the above cited threshold of `~ 0.1%` ) then brutally devastate the net-efficiency of the successfully parallellised-processing ( but having performed at such unjustifiably high relative costs of the setup vs the limited net effects of any-`N`-processors, as was demonstrated in the available live-graphs ).

There is no surprise for distributed-computing nerds, that the overhead `o` comes with also additional dependencies – on `N` ( the more processes, the more efforts are to be spent to distribute work-packages ), on marshalled data-BLOBs’ sizes ( the larger the BLOBs, the longer the MEM-/IO-devices remain blocked, before serving the next process to receive a distributed BLOB across such device/resource for each of the target `2..N`-th receiving process ), on avoided / CSP-signalled, channel-mediated inter-process coordinations ( call it additional per-incident blocking, reducing the `p` further and further below the ultimately nice ideal of `1.` ).

So, the real-world reality is rather very far from the initially idealised, nice and promising `p` `== 1.0`, `( 1 -` `p` `) == 0.0` and `o` `== 0.0`

As obvious from the very beginning, try to beat rather the `22.1 [us]` `[SERIAL]` threshold, than trying to beat this, while getting worse and worse, if going `[PARALLEL]` where realistic overheads and scaling, using already under-performing approaches, does not help a single bit.