Parallel processing of CSV in Elixir and Golang

Our today coding kata is parallel processing of streamed CSV data from a file. I’ll try it with Elixir and Go for fun and learning.

TL;DR For Elixir, parallel_stream will win against Task.async and Poolboy for simplicity. In Golang it’s straightforward goroutines and sync.WaitGroup. Both runs fast and safe. And its a lot of fun.

Elixir

First approach: process each row as an own process with Task.async

For each row we’ll create one new separate concurrent process, who will do the heavy load. Well, in our case just wait a little bit. Consequence: with 10,000 rows we’ll have 10,000 processes. Normally this won’t be a problem if we have enough RAM. We’ll need more with more rows. So the next requiremend is: limit the max RAM usage. Drop this idea.

Second approach: worker pool with Poolboy

Poolboy manages a pool of workers. One worker can process one row of our CSV. If there are no more free works remaining, reading from CSV will paused. Consequence: steady RAM usage and maximum n process depending on the pool konfiguration. But: Poolboy is a service and don’t wait until all processes are finished. So I guess this approach won’t work for CLI scripts. Any ideas? Drop this idea.

Third approach: hex.pm package parallel_stream

This package provides some functions for parallel processing, e.g. map. It is conceptional designed as stream which runs on n pippes (2x the CPU cores). So parallelism is possible with just one line of code. Instead of Stream.map we’ll just use ParallelStream.map.

File.stream!(file)
|> Stream.map(&split_row/1)
|> ParallelStream.map(&store_row/1)  # parallel processing
# |> Stream.map(&store_row/1)  # synchronous processing
|> Enum.into([])

Testing scenario: timing to approve parallelism

  • file with 1000 test rows (generated with mix generate test1000.csv 1000)
  • each row simulates a INSERT-SQL by waiting 10ms
  • running mix import test1000.csv with Stream.map -> 12 sec.
  • running with ParallelStream.map -> 1,2 Sek.

See here the code: ronnyhartenstein/elixir-parallel-csv-importer

Go

I won’t do timing tests, because goroutines do just work.

First approach: simple goroutings

The simplest thing is to use plain goroutines.

Code here

func main() {
	// [.. start timing, read CSV ..]
	i := 0
	ch := make(chan []string)
	for {
		record, err := reader.Read()
		// [.. error handling ..]
		i++
		go func(r []string) {
			processData(r)
			ch <- r
		}(record)
		fmt.Printf("\rgo %d", i)
	}
	for ; i > 0; i-- {
    <-ch
		fmt.Printf("\r\t\t\t\t| <- %d", i)
	}
	fmt.Printf("\n%2fs", time.Since(start).Seconds())
}

func processData([]string) {
  fmt.Printf("\r\t\t| proc %d", i)
	time.Sleep(1000 * time.Millisecond)
}

So this runs quite good. But counting the amount of goroutines could not be the final solution I think.

Second approach: sync.WaitGroup

We’ll use sync.WaitGroup to encapsulate the counting of goroutines and something more.

Package sync provides basic synchronization primitives such as mutual exclusion locks. Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication. sync

Code here

func main() {
	// [.. start timing, read CSV ..]
	i := 0
	ch := make(chan []string)
	var wg sync.WaitGroup
	for {
		record, err := reader.Read()
		//[.. error handling ..]
		i++
		wg.Add(1)
		go func(r []string, i int) {
			defer wg.Done()
			processData(i, r)
			ch <- r
		}(record, i)
		fmt.Printf("\rgo %d", i)
	}
	// closer
	go func() {
		wg.Wait()
		close(ch)
	}()
	// print channel results (necessary to prevent exit programm before)
	j := 0
	for range ch {
		j++
		fmt.Printf("\r\t\t\t\t | done %d", j)
	}
	fmt.Printf("\n%2fs", time.Since(start).Seconds())
}
func processData(i int, r []string) {
	time.Sleep(10 * time.Millisecond)
	fmt.Printf("\r\t\t| proc %d", i)
}

So it works quite in the same way. It’s just another approach with some benefits. Read more in Blogs here and here and of course in the great great book “The Go Programming Language” chap. “9.8 Goroutines and Threads”.

See the code here: ronnyhartenstein/golang-csv-parallel-processing

Boilerplate for parallel processing in Go

Here is a small boilerplate using Go’s WaitGroup to wait for all goroutines finishing.

package main

import (
	"sync"
)

func main() {
	tasks := []string{"a", "b", "c"}

	ch := make(chan bool)
	// start syncing
	var wg sync.WaitGroup
	for _, task := range tasks {
		wg.Add(1)
		go func(t string) {
			defer wg.Done()
			ok := processData(t)
			ch <- ok
		}(task)
	}
	// closer
	go func() {
		wg.Wait()
		close(ch)
	}()
	// fetch all channel results (necessary to prevent exit programm before)
	for range ch {
		// TODO if you are interested in channel responses
	}
}

func processData(task string) bool {
	// TODO do things in this goroutine
	return true
}

Conclusion

With plain PHP, Python or Ruby script you’ll just process CSV serial. But in other languages like Elixir and Go with concurrency and threads you can do this in parallel and use all CPU cores. Okay, you have to deal with some things like mutual exclusion and so on, but its faster than serial.