How to Make Your Data Processing Faster - Parallel Processing and JIT in Data Science

A presentation at CONNECT Asia 2019 in August 2019 in Singapore by Ong Chin Hwee

Slide 1

Slide 1

How to Make Your Data Processing Faster - Parallel Processing and JIT in Data Science Presented by: Ong Chin Hwee (@ongchinhwee) 31 August 2019 Women Who Code Connect Asia, Singapore

Slide 2

Slide 2

About me ● Current role: Data Engineer at ST Engineering ● Background in aerospace engineering and computational modelling ● Experience working on aerospace-related projects in collaboration with academia and industry partners ● Find me if you would like to chat about Industry 4.0 and flight + travel!

Slide 3

Slide 3

Scope of Talk I will talk about: 1. 2. 3. 4. 5. Bottlenecks in a data science project What is parallel processing? When should you go for parallelism? Parallel processing in data science JIT in data science

Slide 4

Slide 4

A typical data science workflow 1. 2. 3. 4. 5. 6. 7. 8. Define problem objective Data collection and pipeline Data parsing/preprocessing and Exploratory Data Analysis (EDA) Feature engineering Model training Model evaluation Visualization and Reporting Model deployment

Slide 5

Slide 5

What do you think are some of the bottlenecks in a data science project?

Slide 6

Slide 6

Bottlenecks in a data science project ● Lack of data / Poor quality data ● Data Preprocessing ○ The 80/20 data science dilemma ■ In reality, it’s closer to 90/10 ● The organization itself

Slide 7

Slide 7

Bottlenecks in a data science project ● Data Preprocessing ○ Pandas faces low performance and long runtime issues when dealing with large datasets (> 1 GB)

Slide 8

Slide 8

Bottlenecks in a data science project ● Data Preprocessing ○ Pandas faces low performance and long runtime issues when dealing with large datasets (> 1 GB) ○ Slow loops in Python ■ Loops are run on the interpreter, not compiled (unlike loops in C)

Slide 9

Slide 9

Bottlenecks in a data science project ● Data Preprocessing ○ Pandas faces low performance and long runtime issues when dealing with large datasets (> 1 GB) ○ Slow loops in Python ■ Loops are run on the interpreter, not compiled (unlike loops in C) ○ Not every data science team has extremely large volumes of data to justify using a Spark cluster

Slide 10

Slide 10

What is parallel processing?

Slide 11

Slide 11

Let’s imagine I own a bakery cafe.

Slide 12

Slide 12

Task 1: Toast 100 slices of bread Assumptions: 1. I’m using single-slice toasters. (Yes, they actually exist.) 2. Each slice of toast takes 2 minutes to make. 3. No overhead time. Image taken from: https://www.mitsubishielectric.co.jp/home/breadoven/product/to-st1-t/feature/index.html

Slide 13

Slide 13

Sequential Processing = 25 bread slices

Slide 14

Slide 14

Sequential Processing = 25 bread slices Processor/Worker: Toaster

Slide 15

Slide 15

Sequential Processing = 25 bread slices Processor/Worker: Toaster = 25 toasts

Slide 16

Slide 16

Sequential Processing Execution Time = 100 toasts × 2 minutes/toast = 200 minutes

Slide 17

Slide 17

Parallel Processing = 25 bread slices

Slide 18

Slide 18

Parallel Processing

Slide 19

Slide 19

Parallel Processing Processor (Core): Toaster

Slide 20

Slide 20

Parallel Processing Processor (Core): Toaster Task is executed using a pool of 4 toaster subprocesses. Each toasting subprocess runs in parallel and independently from each other.

Slide 21

Slide 21

Parallel Processing Processor (Core): Toaster Output of each toasting process is consolidated and returned as an overall output (which may or may not be ordered).

Slide 22

Slide 22

Parallel Processing Execution Time = 100 toasts × 2 minutes/toast ÷ 4 toasters = 50 minutes Speedup = 4 times

Slide 23

Slide 23

Synchronous vs Asynchronous Execution

Slide 24

Slide 24

What do you mean by “Asynchronous”?

Slide 25

Slide 25

Let’s get some ideas from the Kopi.JS folks. (since they do async programming more than the data folks)

Slide 26

Slide 26

Slide 27

Slide 27

me: One kopi pls (promise) Uncle: Ok, take this number and sit down, send to you when ready me: [sits down, surfs twitter] uncle: [walks over] order #23, here you go (async/await) uncle: Ok [makes kopi] me: [wait in place, surfs twitter] uncle: [kopi done] Here you go (Credits to: @sheldytox)

Slide 28

Slide 28

me: One kopi pls (promise) Uncle: Ok, take this number and sit down, send to you when ready me: [sits down, surfs twitter] uncle: [walks over] order #23, here you go (async/await) uncle: Ok [makes kopi] me: [wait in place, surfs twitter] uncle: [kopi done] Here you go (Credits to: @sheldytox) Another scenario, you wake up and order coffee via a delivery app. Do you wait by the phone for the coffee to arrive or do you go and do other things (while “awaiting” for the coffee to arrive)? (Credits to: @yingkh_tweets)

Slide 29

Slide 29

Task 2: Brew coffee Assumptions: 1. I can do other stuff while making coffee. 2. One coffee maker to make one cup of coffee. 3. Each cup of coffee takes 5 minutes to make. Image taken from: https://www.crateandbarrel.com/breville-barista-espresso-machine/s267619

Slide 30

Slide 30

Synchronous Execution Process 2: Brew a cup of coffee on coffee machine Duration: 5 minutes

Slide 31

Slide 31

Synchronous Execution Process 1: Toast a slice of bread on single-slice toaster after Process 2 is completed Duration: 2 minutes Process 2: Brew a cup of coffee on coffee machine Duration: 5 minutes

Slide 32

Slide 32

Synchronous Execution Process 1: Toast a slice of bread on single-slice toaster after Process 2 is completed Duration: 2 minutes Process 2: Brew a cup of coffee on coffee machine Duration: 5 minutes Output: 1 toast + 1 coffee Total Execution Time = 5 minutes + 2 minutes = 7 minutes

Slide 33

Slide 33

Asynchronous Execution While brewing coffee: Make some toasts:

Slide 34

Slide 34

Asynchronous Execution Output: 2 toasts + 1 coffee (1 more toast!) Total Execution Time = 5 minutes

Slide 35

Slide 35

When is it a good idea to go for parallelism?

Slide 36

Slide 36

When is it a good idea to go for parallelism? (or, “Is it a good idea to simply buy a 256-core processor and parallelize all your codes?”)

Slide 37

Slide 37

Practical Considerations ● Is your code already optimized? ○ Sometimes, you might need to rethink your approach. ○ Example: Use list comprehensions or map functions instead of for-loops for array iterations.

Slide 38

Slide 38

Practical Considerations ● Is your code already optimized? ○ Sometimes, you might need to rethink your approach. ● Problem architecture ○ Nature of problem limits how successful parallelization can be. ○ If your problem consists of processes which depend on each others’ outputs, maybe not.

Slide 39

Slide 39

Practical Considerations ● Is your code already optimized? ○ Sometimes, you might need to rethink your approach. ● Problem architecture ○ Nature of problem limits how successful parallelization can be. ● Overhead in parallelism ○ There will always be parts of the work that cannot be parallelized. → Amdahl’s Law ○ Extra time required for coding and debugging (parallelism vs sequential code)

Slide 40

Slide 40

Amdahl’s Law and Parallelism Amdahl’s Law states that the theoretical speedup is defined by the fraction of code p that can be parallelized: S: Theoretical speedup (theoretical latency) p: Fraction of the code that can be parallelized N: Number of processors (cores)

Slide 41

Slide 41

Amdahl’s Law and Parallelism If there are no parallel parts (p = 0): Speedup = 0

Slide 42

Slide 42

Amdahl’s Law and Parallelism If there are no parallel parts (p = 0): Speedup = 0 If all parts are parallel (p = 1): Speedup = N → ∞

Slide 43

Slide 43

Amdahl’s Law and Parallelism If there are no parallel parts (p = 0): Speedup = 0 If all parts are parallel (p = 1): Speedup = N → ∞ Speedup is limited by fraction of the work that is not parallelizable - will not improve even with infinite number of processors

Slide 44

Slide 44

Multiprocessing vs Multithreading Multiprocessing: System allows executing multiple processes at the same time using multiple processors

Slide 45

Slide 45

Multiprocessing vs Multithreading Multiprocessing: Multithreading: System allows executing multiple processes at the same time using multiple processors System executes multiple threads of sub-processes at the same time within a single processor

Slide 46

Slide 46

Multiprocessing vs Multithreading Multiprocessing: Multithreading: System allows executing multiple processes at the same time using multiple processors System executes multiple threads of sub-processes at the same time within a single processor Better option for processing large volumes of data Best suited for I/O operations

Slide 47

Slide 47

Parallel Processing in Data Science

Slide 48

Slide 48

Parallel Processing in Data Science Python is the most widely-used programming language in data science Distributed processing is one of the core concepts of Apache Spark Apache Spark is available in Python as PySpark

Slide 49

Slide 49

Parallel Processing in Data Science Data processing tends to be more compute-intensive → Switching between threads become increasingly inefficient → Global Interpreter Lock (GIL) in Python does not allow parallel thread execution

Slide 50

Slide 50

How to do Parallel Processing in Python?

Slide 51

Slide 51

Parallel Processing in Python concurrent.futures module ● High-level API for launching asynchronous parallel tasks ● Introduced in Python 3.2 as an abstraction layer over multiprocessing module ● Two modes of execution: ○ ThreadPoolExecutor() for multithreading ○ ProcessPoolExecutor() for multiprocessing

Slide 52

Slide 52

ProcessPoolExecutor vs ThreadPoolExecutor From the Python Standard Library documentation: For ProcessPoolExecutor, this method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor, chunksize has no effect.

Slide 53

Slide 53

Recap: map() map() takes as input: 1. The function that you would like to run, and 2. A list (iterable) where each element of the list is a single input to that function; and returns an iterator that yields the results of the function being applied to every element of the list.

Slide 54

Slide 54

map() in concurrent.futures Similarly, executor.map() takes as input: 1. The function that you would like to run, and 2. A list (iterable) where each element of the list is a single input to that function; and returns an iterator that yields the results of the function being applied to every element of the list.

Slide 55

Slide 55

“Okay, I tried using parallel processing but my processing code in Python is still slow. What else can I do?”

Slide 56

Slide 56

Compiled vs Interpreted Languages Written Code Compiler Execution Compiled Code in Target Language Loader Linker Machine Code (executable)

Slide 57

Slide 57

Compiled vs Interpreted Languages Written Code Compiler Execution Lower-level bytecode Virtual Machine

Slide 58

Slide 58

JIT Compilation Just-In-Time (JIT) compilation ● Converts source code into native machine code at runtime ● Is the reason why Java runs on a Virtual Machine (JVM) yet has comparable performance to compiled languages (C/C++ etc., Go)

Slide 59

Slide 59

JIT Compilation in Data Science

Slide 60

Slide 60

JIT Compilation in Data Science numba module ● Just-in-Time (JIT) compiler for Python that converts Python functions into machine code ● Can be used by simply applying a decorator (a wrapper) around functions to instruct numba to compile them ● Two modes of execution: ○ njit for no-Python mode (JIT only) ○ jit for object mode (JIT + Python interpreter)

Slide 61

Slide 61

Practical Implementation

Slide 62

Slide 62

Case: Image Processing Dataset: Shopee National Data Science Challenge (https://www.kaggle.com/c/ndsc-advanced) Size: 77.6GB of image files Data Quality: Images in the dataset are of different formats (some are RGB while others are RGBA) and different dimensions

Slide 63

Slide 63

Without Parallelism import sys import time N = 35000 # size of dataset to be processed start = 0 batch_size = 1000 partition = int(np.ceil(N/step)) partition_count = 0 imagearray_list = [None] * partition start_cpu_time = time.clock() start_wall_time = time.time()

Slide 64

Slide 64

Without Parallelism while start < N: end = start + batch_size if end > N: end = N imagearray_list[partition_count] = [arraypartition_calc(image) for image in range(start, end)] start += batch_size partition_count += 1

Slide 65

Slide 65

Without Parallelism while start < N: end = start + batch_size if end > N: end = N imagearray_list[partition_count] = [arraypartition_calc(image) for image in range(start, end)] start += batch_size partition_count += 1

Slide 66

Slide 66

Without Parallelism while start < N: end = start + batch_size if end > N: end = N Execution Speed: 3300 images after 7 hours = 471.43 images/hr imagearray_list[partition_count] = [arraypartition_calc(image) for image in range(start, end)] start += batch_size partition_count += 1

Slide 67

Slide 67

With Parallelism and JIT compilation from PIL import Image from numba import jit @jit def image_proc(index): ”’Convert + resize image”’ im = Image.open(define_imagepath(index)) im = im.convert(“RGB”) im_resized = np.array(im.resize((64,64))) return im_resized

Slide 68

Slide 68

With Parallelism and JIT compilation from PIL import Image from numba import jit Note: I can’t use no-Python mode (@njit) as PIL codes can’t seem to be compiled into machine code @jit def image_proc(index): ”’Convert + resize image”’ im = Image.open(define_imagepath(index)) im = im.convert(“RGB”) im_resized = np.array(im.resize((64,64))) return im_resized

Slide 69

Slide 69

With Parallelism and JIT compilation @jit def arraypartition_calc(start, batch_size): ”’Process images in partition/batch”’ end = start + batch_size if end > N: end = N partition_list = [image_proc(image) for image in range(start, end)] return partition_list

Slide 70

Slide 70

With Parallelism and JIT compilation @jit def arraypartition_calc(start, batch_size): ”’Process images in partition/batch”’ end = start + batch_size if end > N: end = N partition_list = [image_proc(image) for image in range(start, end)] return partition_list

Slide 71

Slide 71

With Parallelism and JIT compilation N = 35000 start = 0 batch_size = 1000 partition, mod = divmod(N, batch_size) if mod: partition_index = [i * batch_size for i in range(start // batch_size, partition + 1)] else: partition_index = [i * batch_size for i in range(start // batch_size, partition)]

Slide 72

Slide 72

With Parallelism and JIT compilation import sys import time from concurrent.futures import ProcessPoolExecutor start_cpu_time = time.clock() start_wall_time = time.time() with ProcessPoolExecutor() as executor: future = executor.map(arraypartition_calc, partition_index) imgarray_np = np.array([x for x in future])

Slide 73

Slide 73

With Parallelism and JIT compilation Execution Speed: 35000 images after 3.6 hours = 9722.22 images/hr import sys import time from concurrent.futures import ProcessPoolExecutor start_cpu_time = time.clock() start_wall_time = time.time() with ProcessPoolExecutor() as executor: future = executor.map(arraypartition_calc, partition_index) imgarray_np = np.array([x for x in future])

Slide 74

Slide 74

With Parallelism and JIT compilation Execution Speed: 35000 images after 3.6 hours = 9722.22 images/hr import sys import time from concurrent.futures import ProcessPoolExecutor start_cpu_time = time.clock() start_wall_time = time.time() No. of cores: 2 Speedup: 19.4 times with ProcessPoolExecutor() as executor: future = executor.map(arraypartition_calc, partition_index) imgarray_np = np.array([x for x in future])

Slide 75

Slide 75

With Parallelism and JIT compilation import sys import time from concurrent.futures import ProcessPoolExecutor start_cpu_time = time.clock() start_wall_time = time.time() with ProcessPoolExecutor() as executor: future = executor.map(arraypartition_calc, partition_index) imgarray_np = np.array([x for x in future]) Extract results from iterator (similar to generator)

Slide 76

Slide 76

Key Takeaways

Slide 77

Slide 77

Parallel Processing in Data Science ● Not all processes should be parallelized ○ Amdahl’s Law on parallelism ○ Extra time required for coding and debugging (parallelism vs sequential code) ○ If the cost of rewriting your code for parallelization outweighs the time savings from parallelizing your code (especially if your process only takes a few hours), maybe you should consider other ways of optimizing your code instead.

Slide 78

Slide 78

JIT compilation in Data Science ● Just-in-Time (JIT) compilation ○ converts source code from non-compiled languages into native machine code at runtime ○ may not work for some functions/modules - these are still run on the interpreter ○ significantly enhances speedups provided by parallelization

Slide 79

Slide 79

References Official Python documentation on concurrent.futures (https://docs.python.org/3/library/concurrent.futures.html) Built-in Functions - Python 3.7.4 Documentation (https://docs.python.org/3/library/functions.html#map) 5-minute Guide to Numba (http://numba.pydata.org/numba-doc/latest/user/5minguide.html)

Slide 80

Slide 80

Contact Ong Chin Hwee LinkedIn: ongchinhwee Twitter: @ongchinhwee https://ongchinhwee.me