Thrill YouTube Tutorial: High-Performance Algorithmic Distributed Computing with C++
Posted on 2020-06-01 11:13 by Timo Bingmann at Permlink with 0 Comments. Tags: #talk #university #thrill
This post announces the completion of my new tutorial presentation and YouTube video: "Thrill Tutorial: High-Performance Algorithmic Distributed Computing with C++".
YouTube video link: https://youtu.be/UxW5YyETLXo (2h 41min)
Slide Deck: slides-20200601-thrill-tutorial.pdf (21.3 MiB) (114 slides)
In this tutorial we present our new distributed Big Data processing framework called Thrill (https://project-thrill.org). It is a C++ framework consisting of a set of basic scalable algorithmic primitives like mapping, reducing, sorting, merging, joining, and additional MPI-like collectives. This set of primitives can be combined into larger more complex algorithms, such as WordCount, PageRank, and suffix sorting. Such compounded algorithms can then be run on very large inputs using a distributed computing cluster with external memory.
After introducing the audience to Thrill we guide participants through the initial steps of downloading and compiling the software package. The tutorial then continues to give an overview of the challenges of programming real distributed machines and models and frameworks for achieving this goal. With these foundations, Thrill's DIA programming model is introduced with an extensive listing of DIA operations and how to actually use them. The participants are then given a set of small example tasks to gain hands-on experience with DIAs.
After the hands-on session, the tutorial continues with more details on how to run Thrill programs on clusters and how to generate execution profiles. Then, deeper details of Thrill's internal software layers are discussed to advance the participants' mental model of how Thrill executes DIA operations. The final hands-on tutorial is designed as a concerted group effort to implement K-means clustering for 2D points.
The video on YouTube (https://youtu.be/UxW5YyETLXo) contains both presentation and live-coding sessions. It has a high information density and covers many topics.
Table of Contents
- Thrill Motivation Pitch
- Introduction to Parallel Machines
- 0:20:20 The Real Deal: Examples of Machines
- 0:24:50 Networks: Types and Measurements
- 0:31:47 Models
- 0:35:58 Implementations and Frameworks
- The Thrill Framework
- 0:39:28 Thrill's DIA Abstraction and List of Operations
- 0:44:00 Illustrations of DIA Operations
- 0:55:32 Tutorial: Playing with DIA Operations
- 1:08:21 Execution of Collective Operations in Thrill
- 1:21:10 Tutorial: Running Thrill on a Cluster
- 1:27:02 Tutorial: Logging and Profiling
- 1:32:14 Going Deeper into Thrill
- 1:32:49 Layers of Thrill
- 1:37:59 File - Variable-Length C++ Item Store
- 1:43:54 Readers and Writers
- 1:46:43 Thrill's Communication Abstraction
- 1:48:07 Stream - Async Big Data All-to-All
- 1:49:46 Thrill's Data Processing Pipelines
- 1:51:29 Thrill's Current Sample Sort
- 1:54:14 Optimization: Consume and Keep
- 1:59:34 Memory Allocation Areas in Thrill
- 2:01:17 Memory Distribution in Stages
- 2:02:22 Pipelined Data Flow Processing
- 2:08:48 ReduceByKey Implementation
- 2:12:06 Tutorial: First Steps towards k-Means
- 2:35:56 Conclusion
- 0:39:28 Thrill's DIA Abstraction and List of Operations
Transcript
1 - Thrill Tutorial Title Slide
Hello and welcome to this recorded tutorial on Thrill, which is our high-performance algorithmic distributed computation framework in C++. It is both flexible and a general purpose framework to implement distributed algorithms. In a sense it is similar to Apache Spark or Apache Flink, but at the same time brings back designs from MPI and focuses on high performance.
My name is Timo Bingmann and I was one of the initial core developers of Thrill. Thrill's design was part of my PhD thesis but it originally started as a one-year lab course with six master students and three PhD candidates at the Karlsruhe Institute of Technology in Karlsruhe, Germany.
This tutorial covers Thrill in the version available at the start of June 2020.
2 - Abstract
This is just a slide with an abstract and a Creative Commons license for this slide deck.
3 - Table of Contents
This whole tutorial is rather long and covers many aspects of Thrill, from a high-level beginners tutorial down to technical details of some of the distributed algorithms.
I have put video timestamps on this table of contents slide such that it is easier to jump to the sections you are interested in, and such that you can also see how long each section is. In the YouTube description there are of course also links to the various subsections.
In Part 1 I will try to get you interested in Thrill by presenting you some benchmarks, discussing the framework's general design goals, showing you an actual WordCount implementation, and then demonstrating how easy it is to download, compile, and run a simple "Hello World" program.
Part 2 is then about knowledge on parallel machines and distributed computing. We all know parallel programming is hard, ... but it is even harder, or downright impossible, if you know too little about the machines you are programming. And since there is a wide variety of parallel machines and interconnection networks, we will first cover those and then look at models and frameworks which make programming them easier. This part lays the groundwork for better understanding the challenges we try to tackle with Thrill.
And Part 3 is then the main introduction into Thrill.
It starts with the DIA model: the distributed immutable array, which is the high-level concept used in Thrill to program arbitrarily complex algorithms by composing them out of primitive operations such as Map, Sort, Reduce, Zip, and many others.
This first section of Part 3 then continues by giving you a high-level picture of each of these operations. The illustrations on these slides in the chapter "Thrill's DIA Abstraction and List of Operations" are also a sort of reference of what DIA operations do and which are available.
Immediately after the DIA operations catalog, the tutorial part continues with tips on how to actually write code, when you know how you want to compose the DIA primitives. The tutorial section finishes with a hands-on live demonstration of how to write a few simple Thrill programs.
Once you are convinced that Thrill is actually a quite versatile framework, I will show you how to run Thrill programs on clusters of machines, and then how to use the built-in logging and profiling features of Thrill to get a better idea of what is actually happening.
The following section "Going Deeper into Thrill" then drops all high-level abstraction and dives right down into some of the technical details and highlights of the framework. This subsection is for people interested in extending Thrill or writing new DIA operations, but it is also good to consider if you run into unexpected problems, because these can be often resolved by knowing more about the underlying implementation details.
The last tutorial section is then a hands-on live-coding demonstration of how to implement K-means in Thrill.
And the conclusion then wraps up the entire presentation and discusses some future challenges and trajectories.
4 - Thrill Motivation Pitch
Okay, on to the first part, in which I will try to get you interested in Thrill.
5 - Weak-Scaling Benchmarks
Instead of immediately starting with details on Thrill, I will now show you some benchmarks comparing it against Apache Spark and Apache Flink.
The next slide will show weak-scaling results on the following four microbenchmarks: WordCount, PageRank, TeraSort, and K-Means.
WordCount is a simple distributed reduction on strings from text files to count all distinct words, in this case we used parts of the CommonCrawl web corpus as input.
PageRank is actually sparse matrix-vector multiplication in disguise. It requires a join-by-index of the current ranks with outgoing links and performs exactly ten iterations over the data.
TeraSort is, well, sorting of 100 byte random records.
And K-Means is a simple machine learning algorithm which iteratively improves cluster points by calculating new centers. Again, we run exactly ten iterations of the algorithm.
We implemented all these benchmarks in Thrill, Spark, and Flink. Most often these are included in the "examples" directory of each framework. We took great care that the implementations perform the same basic algorithmic steps in each framework, such that every framework is actually doing the same basic computation.
Here on the side of this slide are also the number of lines of code needed to implement these benchmarks in Thrill.
The benchmarks were done on a cluster of relatively beefy machines in the Amazon EC2 cloud. These machines had 32 cores, lots of RAM, local SSDs, and a virtualized network with up to 1 gigabyte throughput.
6 - Experimental Results: Slowdowns
And here are the results.
What is plotted here are showdown results, with the slowdown relative to the fastest framework on the y-axis. And you will immediately see that Thrill is always the fastest framework, so these slowdowns are actually relative to Thrill.
For WordCount, Spark is something like a factor 2.5 slower on 16 machines. Remember that each of these machines had 32 cores, so that is actually 512 cores here. Flink is about a factor 4 slower on WordCount, here.
For PageRank, Thrill starts out strong, but the speedup drops with more machines, probably because the network becomes the bottleneck. With 16 machines, Thrill is about twice as fast as Flink, but still a factor of 4.5 faster than Spark.
For sorting, Thrill is only a factor 1.4 faster than Flink, and a factor 1.8 than Spark.
But for K-Means the difference is large again: Thrill is a factor of 5 faster than Spark with Scala, a factor 13 faster than Spark with Java, and a factor of more than 60 faster than Flink.
And how should we interpret these results?
Well, for me it appears that Thrill really shines when performing calculations on relatively small items. That is why K-Means is so fast: these are just 3-dimensional vectors, which are scanned over and added together. In C++ this is a fast binary code hardware calculation, while with Java and Scala there is a considerable overhead due to the JVM. And in particular Java appears to be less efficient than Scala, probably because Scala has better support for primitive types.
PageRank is similar: vectors of edge ids are processed and ranks are added together. But there is a lot more data transmitted between machines than in the K-Means benchmark.
In the WordCount example we can see how much faster reduction of strings using hash tables is in C++ than whatever aggregation method Spark and Flink use. I believe the JVM garbage collection also plays a large role in these results.
And for Sorting, well, I believe all frameworks could do considerably better. Thrill's sorting can definitely be improved. On the other hand, these are relatively large 100 byte records, which means that the JVM overhead is less pronounced.
I hope I now got you interested in Thrill, and we'll go on to discuss some of the goals which guided much of its design.
7 - Experimental Results: Throughput
This next slide contains the absolute throughput of these experiments, mainly for reference, and we're going to skip these for now.
8 - Example T = [tobeornottobe$]
So for me, the story of Thrill starts with suffix sorting. In my PhD I was tasked to engineer a parallel distributed suffix array construction algorithm.
The suffix array of a text is simply the indexes of all suffixes in lexicographic order. And here you can see the suffix array of the text "tobeornottobe$".
Many will not know this data structure, but it is a fundamental ingredient to text search algorithms, data compression, bioinformatics, and stringology algorithms.
9 - bwUniCluster
And you really want to construct this data structure for very large inputs. And that is why we considered how to use these clusters of machines in the basement of our computing center at KIT for suffix sorting.
10 - Big Data Batch Processing
But to use them, you need to select a framework to build on. And this is how the world presented itself to us back them:
On the one hand there is MPI, which is incredibly fast and still used by most software on supercomputers, but at the same time MPI is incredibly difficult to use correctly, which anyone who has programmed with MPI can attest to.
On the other side, there is MapReduce and Hadoop, which maybe has the simplest possible interface consisting of just two functions: map and reduce, well yes, and possibly complex partitioners and aggregators, but still, it's quite simple. The problem is it is also notoriously slow.
Somewhat better are the two frameworks Apache Spark and Apache Flink. They have a reasonably more complex interface, for example they allow you sorting and other flexible operations, but still they are programmed in Java or Scala and are also known to be not as fast as MPI on supercomputers.
And when you see the world like this, what do you do? You try to fill the blank spot in the upper right corner. And this is our attempt with Thrill. And these are the list of requirements we had.
Many come from previous experience with implementing STXXL, a library of external memory algorithms. And other goals came from the suffix array construction use case, which I mentioned previously. While this use case is very specific, I believe our goals formulated from it are universally important.
The objective was to have a relatively small toolkit of well-implemented distributed algorithms, such as sorting, reduction, merging, zipping, windows, and many others. And to be able to combine or compound them efficiently into larger algorithms without losing too much performance.
The framework should work well with small items, for example single characters or integers as items, which is needed for the suffix array construction.
And use external disk memory automatically when needed. And to overlap computation and communication, something that MPI is pretty bad at. And also to overlap external disk I/O if that is needed. These computation overlapping ideas, using asynchronous communication and I/O stem from STXXL's design.
And last but not least Thrill should be in C++ to utilize compile-time optimization.
So we had really high goals. And it turned out that much work was concentrated on what I call the catacombs, or the lower layers of Thrill, which were needed to build up from MPI or another communication layer.
11 - Thrill's Design Goals
This next slide contains the same goals which I just explained on the previous slide.
Overarching main goal was to have an easy way to program relatively complex distributed algorithms using C++.
Thrill is obviously a moving target. It may change, and this tutorial was recorded in June 2020.
12 - Thrill's Goal and Current Status
So what is the current status of Thrill.
We have a well-working implementation, which I however still consider a prototype, because the interface can change between releases. It has about 60K lines of relatively advanced C++ code, written by more than two dozen developers over three years.
We published a paper describing the ideas and prototype of Thrill in 2016 at the IEEE Conference on Big Data in Washington D.C. That paper contains the microbenchmarks I showed in the first slides.
Since then, we published a paper of distributed suffix array construction in 2018, also at the IEEE Conference on Big Data, this time in Seattle, Washington. And another group at KIT implemented Louvain graph clustering and presented it at Euro-Par in 2018. There was a poster at the Supercomputing Conference in 2018. And we have many more examples in Thrill's repository, like gradient descent and triangle counting.
And for the future, the main challenges are to introduce fault tolerance, which we know how to do it in theory, but someone has to actually do it in practice.
Real algorithmic research needs to be done on scalability of the underlying "catacomb" layers to millions of machines, it is unclear how to do that well in practice.
And what I think is very interesting is to add "predictability", where one wants to run an algorithm with 1000 items, and then to extrapolate that and predict how long it will take on a million items.
And maybe another challenge not listed here is to productize Thrill, which would mean to add convenience and interfaces to other software.
But more about future work in the final section of this talk.
13 - Example: WordCount in Thrill
And this is what WordCount looks like in Thrill. It is just 20 lines of code, excluding the main caller program.
This is actual running code, and it contains lots of lambdas and dot-chaining to compound the primitives together. We will take a closer look at the code later in this tutorial, once I explained more about DIAs and the operations.
But for now WordCount is composed out of only five operations: ReadLines, FlatMap, ReduceByKey, Map, and WriteLines.
14 - DC3 Data-Flow Graph with Recursion
And this is a diagram of the suffix sorting algorithm, I alluded to earlier. This is just supposed to be a demonstration that even complex algorithms can be composed from these primitives. It contains 21 operations and is recursive.
15 - Tutorial: Clone, Compile, and Run Hello World
So let me show you a live demonstration of Thrill.
16 - Tutorial: Clone, Compile, and Run
This is an instructions slide on how to clone an simple "Hello World" repository, which contains Thrill has a git submodule, and how to compile it with cmake and then how to run the simple example program.
I will now do the steps live such that you can actually see it working, and we'll extend the example program later on in this tutorial.
Okay, let me switch to a console and let me clone the git repository from Github. And now it's downloading this tutorial project, and its downloading Thrill and all of the dependencies recursively, which are git submodules.
And the repository contains a small shell script to make compiling easier. Once its downloaded. You can just run it like that, but we're also going to build the examples; to build the example we have to add this flag, because I'm to run the example later on. And it detects the compiler and then starts building Thrill, including of all the examples.
Okay, the compilation of the examples does take some time. In the meanwhile we're going to look at the code of the simple program.
17 - Tutorial: Run Hello World
And this is the code of the "Hello World" program.
But before we get to the code, I somehow really like the photo of the hello world baby on this slide. I don't know why, it's just awesome on some many levels, I just love it.
Okay, sorry, ehm, so you first include the "thrill/thrill.hpp
", which is a straight-forward way to include everything from Thrill.
And to better understand this Hello World program, there are two concepts needed to get you started: there is the thrill::Context
class, and the thrill::Run()
method.
Each thread or worker in Thrill has its own Context object. And the Context object is needed to create DIAs and many other things: and in this example to get the worker rank by calling my_rank()
. The Context is, well, the context of the worker thread.
And to get a Context, you have to use thrill::Run()
, which is a magic function which detects the environment the program is running on, whether it is launched with MPI or standalone, it detects the number of threads and hosts, it builds up a communication infrastructure, etc etc.
In the end, it launches a number of threads and runs a function. And that function in this case is called "program
". And what "program
" does here is simply to print "Hello World" and the worker rank.
18 - Control Model: Spark vs MPI/Thrill
Okay. To better understand what happens here, we'll skip ahead to this comparison of the control model of Thrill or MPI and Spark.
Apache Spark has a hierarchical control model. There is the driver, which contains the main program. A cluster manager which runs continuously on the system, and an executor which is launched by the cluster manager and then deals out work to its actual worker threads.
Thrill and MPI work differently: All workers are launched in the beginning, and live until the program finishes. Data is passed between the threads by sending messages.
And this means that all threads can collectively coordinate because they can implicitly know what the other threads are currently doing and which messages they can expect to receive next.
19 - Tutorial: Hello World Output
Okay, so lets have a look if the compilation has finished... Yes it has. And let's run the simple program, which is in the build directory here and called simple.
So this is the output of the simple program.
I will switch back to the slides to explain the output, mainly because they are colored.
So there is a bunch of output here: the first few lines start with "Thrill:" messages and inform you about the parameters Thrill detected in the system. It says it runs "2 test hosts" with "4 workers per host", and each virtual test host has a bit less than 8 GB of RAM, because this machine has 16 GB in total. And it uses a "local tcp network" for communication.
Then there is some details about the external memory, which we're going to skip for now.
And then we can see the actual output of our "Hello World" program. And of course the worker ranks, which are going to be in a random order.
And then the program finishes, and Thrill prints a final message with lots of statistics. It ran, well, a few microseconds, had 0 bytes in DIAs, did 0 bytes of network traffic, and so on. I'll also skip the last "malloc_tracker" line.
Okay, so far so good we have a simple "Hello World" Thrill program.
20 - Introduction to Parallel Machines
In the next section, we are going to look at some real parallel machines, and then discuss the challenges that we face when programming these.
21 - The Real Deal: HPC Supercomputers
So the biggest supercomputer in the world is currently the Summit system at Oak Ridge National Laboratory in the United States. It has 4356 nodes with 44 cores each, in total a bit over 200k CPUs. These CPUs use the rather exotic Power9 instruction set from IBM. But the real deal is that each node also contain six NVIDIA GPUs, which sum up to 2.2 million streaming multiprocessors (SMs).
As you may expect, this system is used for massive simulations in physics or chemistry, and maybe also for training neural networks and such.
22 - The Real Deal: HPC Supercomputers
The biggest system in Germany is the SuperMUC-NG at the Leibniz Rechenzentrum in Munich. Its the successor of the SuperMUC, and currently number 9 in the TOP500 list.
It is a more traditional supercomputer without GPU cards. It has 6336 nodes and 24 physical Intel Xeon cores per node, which is about 150k physical cores. These are connected with an Intel Omni-Path network, which is actually kind of exotic.
Oh right, I forgot the network on the Summit is a Mellanox EDR InfiniBand network, which is a lot more common in supercomputers than the Intel Omni-Path.
But they both allow remote direct memory access (RDMA) which is the "standard" communication method in supercomputers. It is also noteworthy that both systems don't have local disks.
23 - The Real Deal: HPC Supercomputers
So, we're getting smaller now. This is the biggest machine available at KIT, the ForHLR II, which is short for, well, Forschungshochleistungsrechner II.
So the ForHLR II has 1152 nodes and 20 physical cores per node, in total 23k physical cores. These are connected with a EDR InfiniBand network, and each has a 480 GB local SSDs.
24 - The Real Deal: Cloud Computing
Right, these were all supercomputers for scientific computing. But the industry uses "the Cloud".
Not much is known about the actual hardware of the big cloud computing providers, because they employ virtualization systems.
The available virtual systems range from very small ones to large ones, and these are two examples of large instances on the AWS cloud.
The first has 48 cores and 192 GB of RAM and they are connected by 10 Gigabit virtual Ethernet network.
The second has 32 cores, about 250 GB of RAM, 10 Gigabit network, and additionally 4 local 1.9 TB NVMe disks.
My own experience with performance on AWS has been very positive. Despite the "virtualized" hardware, the benchmark results that I got were very stable. The virtual CPUs compute very similar to real ones, and the 10 Gigabit network actually delivers about 1 Gigabyte/sec throughput.
25 - The Real Deal: Custom Local Clusters
And then there are "real" server installations, which often contain many different systems, acquired over time, which makes them highly heterogeneous server installations.
And of course one can build "toy" clusters of Raspberry Pis, which are homogeneous, but have relatively small computer power per node.
26 - The Real Deal: Shared Memory
But there are also even smaller distributed machines: every shared memory parallel machine can be seen as a small distributed system, because they also have a communication protocol.
The protocol is the cache coherence protocol, which runs over QPI links between the sockets and processors.
Cache coherence is actually an implicit communication system, were writing to memory becomes "automatically" visible to the other cores.
But of course the details of cache coherence are much more complicated.
27 - The Real Deal: GPUs
And parallel computing is of course taken by GPUs to a whole new level:
Each of the 80 streaming multiprocessors on a NVIDIA Tesla V100 can perform operations in lock-step with 64 CUDA cores.
They share an L2 cache and communicate with the outside via NVLink, which is similar to the QPI links between shared-memory processors.
28 - Networks: Types and Measurements
Okay, that was a sort of the overview of parallel systems available these days.
Depending on the application, it is of course very important to pick the right system.
(skip back pages)
Thrill is currently design for small clusters of homogeneous systems, like smaller supercomputers, possibly with disks.
But it also works well with shared memory systems, because these actually have a very similar characteristic, as I was alluding to before.
And Thrill is designed for implementing complex algorithms, which are not embarrassingly parallel and thus easy to schedule via batch jobs.
But now let's look into more details of the communication networks, which are of prime importance for parallel computing.
29 - Types of Networks
I believe there are three major communication networks currently used in large computing systems.
The first is RDMA-based, which allows remote direct memory access. That means a node in the network can read or write the memory of a remote node directly without additional processing done by the remote CPU. This is possible via the memory access channels on the PCI bus. Of course there is also a privilege system in place, which means the remote system must have been granted access to the particular memory area before.
RDMA is implemented by various vendors for supercomputers, such an Mellanox's InfiniBand, or Intel's Omnipath, or Cray's Aries network.
There are also different network topologies, which limits the various aggregated bandwidths between hosts.
The second type is TCP/IP or UDP/IP based, which is what the Internet is built on. The IP frames are usual transported via Ethernet, or whatever virtualization of Ethernet is used in cloud computing environments. The various web protocols like HTTP, or SMTP are built on TCP.
And the third type, that is often overlooked, are the internal networks between sockets and CPUs in shared-memory systems. These provide the implicit communication among cores in a multi-core, many-core, or GPU system.
So these were the three types.
30 - Round Trip Time (RTT) and Bandwidth
To compare Ethernet and InfiniBand on some real systems, I performed the following three small experiments.
The first is to measure round-trip time by ping-pong message. One machine sends a ping, and the receiver answers with a pong. The round-trip time is measured by the sender.
Let's immediately look at the results from these four systems that I tested. The first is a standard Ethernet local area network (LAN) at our research group. The second and third are virtualized 10 Gigabit Ethernet on the AWS cloud. And the fourth an RDMA InfiniBand network at our local computing cluster.
As you can see, the round-trip time in our local Ethernet is slowest, about 140 microseconds. The first AWS instance is about 100 microseconds, and the second about 80. But InfiniBand is then about an order of magnitude faster than the TCP/IP-based solutions.
Now let's look at the second experiment: Sync Send.
In this experiment one machine sends a continuous stream of messages to one other machine. And we simply measure the achieved bandwidth.
Our local Ethernet nearly reaches the 1 Gigabyte/sec mark. The first AWS system is rather disappointing, with 390 Megabyte/sec, but the second is much better with 1140 Megabyte/sec. The second probably has newer virtualized hardware. But again, InfiniBand reaches nearly 6 Gigabyte/sec is this scenario.
The last experiment is ASync Send: where everyone sends to everyone else in the same system in a randomized fashion. I don't have results from the first two systems for this experiment, because I designed the third scenario later.
On the second AWS system, the total asynchronous bandwidth is around 4.3 Gigabyte/sec and on the InfiniBand cluster about 5.6 Gigabyte/sec.
So, what have we learned?
To summarize, AWS's newer virtual Ethernet performs just as good or better than real Ethernet.
But InfiniBand in supercomputer clusters is considerably better than Ethernet: the point-to-point throughput is more than a factor 5 higher, and the latency a factor 8 lower.
31 - MPI Random Async Block Benchmark
So since we now know that InfiniBand is really a lot better, I am going to present more results showing how to maximize the performance when using InfiniBand.
This following experiment is identical to the ASync Send from the previous slide: it uses MPI and transmits blocks of size B between a number of hosts in a randomized fashion.
Each host has a fixed number of send or receive operations active at any time. In the figure there are exactly four.
The transmission pattern is determined by a globally synchronized random number generator. This way, all hosts know the pattern without having to communicate, and they can easily calculate whether to send or receive a block and which partner to communicate with in each step.
At the same time the pattern is unpredictable and really tests the network's switching capacity.
There are two parameters which are interesting to vary: the block size and the number of simultaneously active requests.
32 - Random Blocks on ForHLR II, 8 Hosts
And these are the results on the ForHLR II system with eight InfiniBand hosts.
I plotted the achieved bandwidth versus the transmitted block size, and the various series in the plot are different numbers of simultaneous requests.
One can clearly see that with a single issued send or receive request, which is the red line, the network can in no way be saturated. It takes 64 simultaneous requests or more to get near the full transmission capacity of the InfiniBand network.
And then you also need larger blocks in the best case 128 KiB in size.
In summary, it is really, really hard to max out the InfiniBand network's capacity. Too small blocks are bad, but too big transmissions as well. And you really have to issue lots of asynchronous requests simultaneously.
33 - Variety of Parallel Computing Hosts
Okay that brings us to the wrap-up of our tour through parallel computing machines.
And it believe the best summary is a that it resembles a fish market:
The variety of parallel systems is large, some have special CPUs, some have commodity hardware, some have Ethernet, some InfiniBand, some have local disks, some don't, and so on.
And in the end the prices for the computing resources fluctuate daily and exotic systems can demand higher prices.
Well, just like on this fish market in Seattle.
34 - Introduction to Parallel Machines: Models
Before jumping into Thrill it is good to review some more parallel programming concepts and models.
35 - Control Model: Fork-Join
Maybe the most important concept or model is "fork-join".
In fork-join, a thread of execution splits up into multiple parallel threads, which is called a fork. This is a fork. When the multiple threads finish, a join brings the results together, processes them, and then the sequential sequence can continue. Forks and joins can also be done recursively by threads running in parallel.
While this may seem trivial, it is a very good mental model to work with when parallelizing applications. The fork starts multiple independent jobs, thereby splitting up work, and the join combines the results of these jobs.
The challenge is then of course to construct phases in which all parallel tasks run equally long, such that computation resources are efficiently used. This is due to phases taking as long as the longest thread inside them.
36 - Trivially Parallelizable Work
Let's look at the simplest case: when the workload can be split up perfectly into independent equal-sized subtasks, then is it also called trivially parallelizable, or embarrassingly parallel.
Of course, on the other hand, it is not always easy to see when a task is embarrassingly parallel.
In the fork-join model this is just one phase. Of course, things can get messy if the subtasks are not equally long. Then one needs a load balancing system, such as a batch scheduler, which assigns subtasks to processors.
37 - Control Model: Master-Worker
The simplest and most widely used task scheduler is the master-worker paradigm.
In that model, there is a master control program, which has a queue of jobs. It may know a lot about these jobs, such as how long they will take, or what resources they need.
The master then deals out jobs to a fixed number of workers, and while doing so it can schedule the jobs in some intelligent manner.
This model has many advantages, mainly that it is simple, workers can easily be added or removed, and it implicitly balances uneven workloads. It is used by most Big Data computation frameworks.
However, there are also disadvantages. Namely the master is a single point of failure. It is also not truly scalable, because the master has to know everything about all workers and all jobs.
And it may also increase latency, due to the round-trips involved sending jobs to the workers. This may also require transmitting data for the job, and so on.
38 - Control Model: Spark vs. MPI/Thrill
We already considered this slide in the introduction.
Apache Spark follows a master-worker model. There is a driver program which is written by the user and contains RDD operations. This driver program is transmitted to a long-running cluster manager, which acts as a high-level resource scheduler. The cluster manager spawns one or more executors, which manage data and code, and use the master-work paradigm to schedule smaller short-lived jobs.
MPI and Thrill follow a different approach. At launch time, a fixed number of threads is allocated, and they collectively run the program. This model has the advantage of the workers being able to synchronize implicitly.
It however also requires one to think a lot more about how to balance the work, as not to waste processing time.
39 - Bulk Synchronous Parallel (BSP)
A good way to think about that challenge is the Bulk Synchronous Parallel model, or BSP model.
In the BSP model, there are a fixed number of processors, which perform local work. During that work they can send messages to other processors, however these messages are sent asynchronously, which means that they are only available at the destination after a barrier.
The time from one barrier to the next is called a superstep. And one wants to minimize the number of barriers and the wasted time inside a superstep.
It is good to think about Thrill DIA operations as BSP programs. And for many operations, Thrill reaches the minimum number of supersteps possible.
40 - Introduction to Parallel Machines: Implementations and Frameworks
After considering models, we are now going to look at some of the most popular actual frameworks and implementations.
41 - MPI (Message Passing Interface)
MPI definitely belongs into this category, though I would say it is not actually popular; widely-used is a better adjective for MPI.
MPI itself is only a standard defining an interface. There are several different implementations, and also specialized ones for the large supercomputers, where it is still the most used communication interface in applications.
This slide shows some of the most important collective operations. For example Allgather, were each processor contributes one item, and after the operation all processors have received the item from all other processors.
42 - Map/Reduce Model
An entirely different model is the Map/Reduce model, which was popularized by Google in 2004 and initiated an entire new research area.
One programs in Map/Reduce by specifying two functions: map and reduce. Map takes an items and emits zero or more elements. These elements are then partitioned and shuffled by a hash method such that all items with the same key are then processed by one reduce function.
This here is the canonical WordCount example.
43 - Map/Reduce Framework
But what Map/Reduce does differently from MPI and other frameworks is to free the user from having to think about individual processors.
The Map/Reduce framework in some sense is auto-magical. It is supposed to provide automatic parallelization, automatic data distribution and load balancing, and automatic fault tolerance.
On the one hand this makes the model so easy and attractive, on the other hand it makes the work of the framework very, very, very hard. And that is why many Map/Reduce frameworks appear to be slow: they have to guess correctly how to automatically fulfill all these expectations.
44 - Apache Spark and Apache Flink
Then there are the what I call "post"-Map/Reduce frameworks: namely Apache Spark and Apache Flink.
Spark was started in 2009 in the US at Berkeley and is centered around so-called RDDs, and later DataFrames. As said before, it follows a driver-master-worker architecture and has become very popular for many tasks in the industry.
Flink on the other hand was started as Stratosphere at the TU Berlin in Germany by the group around Professor Volker Markl. This group has a strong background in database design and data management, and this shows clearly in Flink's design.
Flink has an optimizer and parallel work scheduler similar to a relational database. But Spark on the other hand has also been adding these components lately.
45 - Flavours of Big Data Frameworks
And then there are many, many other frameworks. This is a slide on which I tried to categorize many of the popular names and frameworks.
Depending on the application it is of course better to use specialized Big Data frameworks, for example for interactive queries, or NoSQL databases, or machine learning.
There is probably no one-size-fits all solution.
In Germany there is a funny expression for that: there is no such thing as an "eierlegende Wollmilchsau", which is this pictured here, a fantasy figur of an egg laying pig with woolen skin that also produces milk.
And Thrill is also not a one-size-fits all solution.
46 - The Thrill Framework: Thrill's DIA Abstraction and List of Operations
And now, after this long introduction into parallel programming and its many challenges, let's dive into the high layer programming interface of our Thrill framework.
47 - Distributed Immutable Array (DIA)
This high layer interface revolves around the concept of a DIA or DIA, a distributed immutable array, which is a conceptional array of C++ items that is distributed transparently onto the cluster in some way.
The framework intentionally hides how the array is distributed, but one can imagine that it is divided equally into the processors in order.
The array can contain C++ objects of basically any type, characters, integers, vectors of integers, structs, and classes, as long as they are serializable.
But you cannot access the items directly, instead you apply transformations to the entire array as a whole. For example, you can apply a Map transformation which can modify each item, or you can Sort the array with some comparison function. Each time the result is a new DIA.
47a - Distributed Immutable Array (DIA)
However, inside the framework these distributed arrays usually don't exist explicitly. Instead a DIA is a chain of operations, which is built lazily and the actual operations are only executed when the user triggers an Action, which must deliver some external result.
Only then does the framework execute the operations necessary to produce that result. The DIAs in the execution are actually only the conceptional "glue" between operations, the operations themselves determine how to store the data. For example Sort, may store the items in some presorted fashion.
47b - Distributed Immutable Array (DIA)
The chaining of operations one can visualize as a graph, which we call DIA data-flow graphs. In these graphs the operations are vertices, and the DIAs are actually the edges going out of operations and into the next.
Now, why have a distributed array and not a distributed set or similar. I believe that becomes more clear when we look at the following excerpt of some of the operations.
48 - List of Primitives (Excerpt)
This is the original short list of important operations, the current Thrill version has many more DIA operations and variations that have been added over time.
There are two basic types of operations: local operations, LOp for short, and distributed operations, called DOps.
LOps cannot perform communication, these are for example Map, Filter, FlatMap, and BernoulliSample. On the other hand DOps can communicate, and they contain at least one BSP model barrier, and there are many different types of DOps.
There is actually even a third kind of "auxiliary" DIA operations, which are used to change and augment the DIA graph, but more about those later.
Here is a short list of DOps: Sort, ReduceByKey, PrefixSum, Zip, Union, and so on. Each of these DOps is implemented as a C++ template class that runs a distributed algorithm on a stream of input items.
There are two special types of DOps: Sources and Actions.
Sources start a DIA chain by reading external data or generating it on-the-fly. Examples of these are Generate, ReadLines, and ReadBinary.
And then there are Actions, like Sum, Min, WriteBinary, or WriteLines, which are also distributed operations, but actually trigger execution of the chain of DIA operations, and write files or deliver a result value on every worker which determines the future program flow.
Now back to the question, why a distributed array and not a distributed set? In the very beginning we looked at a list of operations that we wanted to support. These operations we knew would be possible to implement in a scalable distributed manner, and yes, some of them had a set as output. For example ReduceByKey outputs a set, and Sort takes a set and produces an ordered array.
But in the end, there were so few unordered sets, that we decided it was best to have just one central data structure instead of a complex type algebra. So we simply consider unordered sets as arrays with an arbitrary order! Internally, the items are always stored in some order, so we just take this internal order as the arbitrary array order.
49 - Local Operations (LOps)
And now come a lot of slides with pictures of what the various DIA operations do, because pictures say more than a thousand words.
These are Map, Filter, and FlatMap, which all are local operations.
Map maps each item of type A to an item of type B. Hence, it maps a DIA of type A to a DIA of type B, both of exactly the same size.
Filter identifies items that should be kept or removed using a boolean test function.
And FlatMap can output zero, one, or more items of type B per input of type A. This allows one to expand or contract a DIA.
50 - DOps: ReduceByKey
ReduceByKey takes a DIA of type A and two arguments: a key extractor function and a reduction function.
The key extractor is applied to each item and all items with the same key are reduced using the reduction function.
The keys are delivered in the output DIA in a random order, which is actually determined by the hash function applied internally to the keys.
51 - DOps: GroupByKey
GroupByKey is similar: it takes a DIA of type A, a key extractor function, and a group function.
This time all items with the same key are delivered to the group function in form of an iterable stream or a vector. This however means that all these items have to be collected at the same processor.
Compare this to ReduceByKey, where the reduction can happen in a distributed fashion. So you should always prefer ReduceByKey whenever possible.
52 - DOps: ReduceToIndex
And then there is ReduceToIndex, which is like ReduceByKey but you are allowed to define the order of the items in the output array.
ReduceToIndex takes a DIA of type A and three arguments: an index extractor, a reduction function, and the result size.
The index extractor determines the target index for each item, and all items with the same target index are reduced together into that slot in the output array.
There may of course be empty cells in the result, which are filled with zeros or default constructed items.
And the output size is needed such that the resulting DIA can be distributed evenly among the processors.
Now ReduceToIndex really highlights the use of distributed arrays as a data structures. It is for example used in the PageRank benchmark to reduce incoming weights on a list of vertices.
53 - DOps: GroupToIndex
There is of course also a GroupToIndex variant, which like GroupByKey collects all items with the same key or index and delivers them to a group function as a stream. But like ReduceToIndex you specify the target index of each item instead of a random hash key.
54 - DOps: InnerJoin
There is also an experimental InnerJoin, which takes two DIAs of different types A and B, a key extractor function for each of them, and a join operation.
The key extractors are executed on all items, and all items with keys from A that match with keys from B are delivered to the join function. The output array is again in a random order.
If you are more interested in InnerJoin it is best to read the corresponding master thesis.
55 - DOps: Sort and Merge
As often mentioned before, there is a Sort method. It takes a less comparator, which is the standard way to write order relations in C++.
And there is also a Merge operation, which also takes a less comparator, and two or more presorted DIAs.
56 - DOps: PrefixSum and ExPrefixSum
Because we have an array, we can define a PrefixSum operation, which, well, calculates the prefix sum of the DIA elements. In the C++'s STL the same is called a partial_sum()
.
There are actually two variants of PrefixSum: the inclusive and the exclusive one, which means the item is either included in the sum at its index, which is inclusive variant, or not, which is the exclusive variant.
57 - Sample (DOp), BernoulliSample (LOp)
There are two Sample operations: A Sample method returning a fixed size result, and a BernoulliSample which takes only the probability of an item surviving, which means the result of BernoulliSample can obviously vary widely.
58 - DOps: Zip and Window
And there are zip functions to combine DIAs. Zip takes two or more DIAs of equal size, aligns them, and delivers all items with the same array index to a so-called zip function, which can process them and outputs a new combined item. There are also variants of Zip to deal with DIAs of unequal size by cutting or padding them.
And since Zip's most common use was to add the array index to each item, we implemented a specialized ZipWithIndex method, that is a bit more optimized that the usual Zip method.
And then there is the Window method. Which is actually quite strange: what Window does is delivers every window of k consecutive items to a function. There are also variants which deliver only all disjoint, meaning non-overlapping windows.
I find Window very strange because it reintroduces the notion of locality on the array, something that all of the previous operations did not actually consider.
59 - Auxiliary Ops: Cache and Collapse
There are two special auxiliary operations: Cache and Collapse. In the technical details section later on, I will explain more about the reasons behind them.
Cache is simple: it actually materializes the DIA data as an array. This is useful for caching, but also vital when generating random data.
Collapse on the other hand is a folding operation, which is needed to fold Map, FlatMap, or other local operations, which Thrill tries to chain or combine together. A DIA is actually a lazy chain of template-optimized operations, here written as f1 and f2. These functions are usually automatically folded into DIAs, but in loops and function return values, it is sometime necessary to fold or collapse them manually into a DIA object. But more about that later.
Cache and Collapse are sometimes hard to distinguish, it is easiest by remembering the cost of these two: The cost of Cache is the size of the entire DIA data that is cached, while the cost of Collapse is only one virtual function call per item, so it is very small.
60 - Source DOps: Generate, -ToDIA
And now for a list of Source operations.
The most basic Source operation to create a DIA is called Generate. Generate, well, it generates a DIA of size n by passing the numbers 0 to n-1 to a generator function which can actually then construct the items. It is often used to generate random input or simply index numbers.
There is also a plain Generate version, which simply creates a DIA containing the numbers 0 to n-1. The two parameter version is identical with the plain Generate followed by a Map.
Generate should however not be used to convert a vector or array into a DIA. For that there are two simpler methods: ConcatToDIA and EqualToDIA. These two operations differ in how the input vector at each worker is viewed.
ConcatToDIA concatenates the vector data at each worker into a long DIA, which is what you want when each worker has one slice of the data.
EqualToDIA on the other hand assumes that the vector data on each worker is identical, and it will cut out that part of the vector which the worker will be responsible for.
61 - Source DOps: ReadLines, ReadBinary
So beyond these three Source operations, there are currently two main input file formats used in Thrill: text files and binary files.
Text files can be read line-wise into a DIA. Each item in the result DIA is a std::string containing a line. The ReadLines operation can also take a list of files, which are then concatenated.
Again, there are two scenarios: one where all Thrill hosts see all the files, for example when they are stored in a distributed file system such as NFS or Lustre.
And another scenario, where each Thrill hosts sees only its local directory of files. This is similar to the difference between ConcatToDIA and EqualToDIA.
The default case is the distributed file system case, because that is what we seen on our supercomputers. If you need to concatenate local files, please lookup the LocalStorage variant of the operations.
So text files are easy and straight-forward, but Thrill will also read pure binary files.
When reading binary files, these are just parsed directly without any format definition or similar. They are simply deserialized by Thrill's internal serializer.
This is most useful to load raw data files containing trivial data types such as integers and other PODs, but it can also be used to reload data objects that has been previously stored by Thrill.
62 - Actions: WriteLines, WriteBinary
There are of course also matching write operations: WriteLines and WriteBinary.
WriteLines takes a DIA of std::strings and writes it to the disk.
WriteBinary takes any DIA and writes it to disk in Thrill's serialization format. This format is trivial for POD data types, and very simple for more complex ones and the written files can be loaded again using ReadBinary.
Now it is important to note that these functions create many files, not just one. Namely each worker writes its portion of the DIA into a separate file, so you get at least as many files as you have workers.
This is not a problem for the corresponding Read methods, because they can read and concatenate a list of files.
And creating many files is obviously the cheapest way to do it in a distributed system. However, in the beginning it may be somewhat surprising and thus I warned you now.
63 - Actions: Size, Print, and more
And then there are Actions. Well, to be precise the WriteLines and WriteBinary operations already were Actions. But this is the overview of many other Actions.
Actions take a DIA and calculate a non-DIA result from it. This triggers the chain of executions necessary to produce that result.
The maybe most important commonly used ones are Size and AllGather. Size just calculates the size of a DIA.
And AllGather collects the entire DIA content on all workers. This is of course a somewhat dangerous operation, and should only be used for small DIAs or for debugging. There is also a Gather operation which collects it only on one worker.
Speaking of debugging. The Print action is invaluable! It simply prints objects using the std::ostream
operator. Use it often! I can only repeat: Use it often!
Execute is a helper action, which triggers execution of a DIA chain, but doesn't actually deliver any result.
And then there are the AllReduce methods. As I mention before AllGather is often used, but cannot work on large DIAs. For that you probably want to use AllReduce, or Sum, or Min, or Max, which are actually only AllReduce in disguise.
The AllReduce methods are the best scalable, distributed way to calculate a result. The hard part is determining how to use them to calculate something useful in a larger algorithm.
64 - The Thrill Framework: Tutorial: Playing with DIA Operations
Wow. That was the end of quite a long list of DIA operations.
The previous slides also functions as a kind of visual reference of what operations are available and what they do.
Coding with Thrill often boils down to picking the right DIA operations, parameterizing them in the right way, and then combining them into a larger algorithm.
The DIA operations reference list that we just went through is therefore really valuable for coding algorithms in Thrill.
In this next section, I will show you how to actually code using these DIA operations. As the title says, we are going to play with DIA operations.
65 - Playing with DIA Operations
In the first section I showed you a Hello World program. We will soon start from that and add some DIA operations.
But how do you figure out the actual C++ syntax?
For that the easiest way is to look at example code from the Thrill repository, or to look at the function definition in the doxygen documentation, which you can find under this link here.
And all of the operations are listed here somewhere, and you can find the full C++ syntax. However, this list is pretty long, for example there are multiple different variants of all of the different functions.
But I believe the first thing to do is to try and guess, because the illustrated documentation is pretty close to what is needed in C++.
66 - Playing with DIA Operations
Okay, how to get your first DIA object?
We already know how to bootstrap everything with the Hello World program.
The only other thing you need to know is that initial DIAs are created from Source operations, and these source operations take the Context as first parameter. This first parameter is actually missing from the illustrations in the last section for simplicity.
But the rule is simple, if no parameter of an operation is a DIA, then the Context has to be passed as first parameter.
So here you see how to add a ReadLines and a Print operation to the Hello World program. It is good practice to add the name of the variable to the Print call.
And since we are going to use this a starting point later, I will now copy the code and compile and run it.
(live-coding demonstration)
67 - Playing with DIA Operations
So now we have a DIA. And with that object you can call other DIA operations. Most operations are methods of the DIA class, like .Sum() or .Print() and so on. But there are also some free functions, like Zip, which take more than one DIA as a parameter.
It is good practice to use auto keyword to save the result of a DIA operation. There is a lot of magic going on in the background which is captured by this auto.
But maybe the most elegant and preferred way is simply to chain the operations with the dot "operator", which immediately calls methods on the resulting temporary DIA handles. This is of course the same as using auto.
68 - Playing with DIA Operations
And some more explanation about these DIAs objects:
The DIAs in the code are actually only handles to the "real" DIAs inside the data flow graph.
This mean you can copy, assign, and return these DIAs without any processing cost. You can pass DIAs as template parameters, which is of course the same as using autos. And again, better use templates instead of explicitly writing DIA<T>.
69 - Playing with DIA Operations
And of course the easiest way to write small functions needed for the many parameters of DIA operations is to use lambdas like you already saw in the WordCount example.
Here it does not matter if you use explicit types or the auto keyword. I use explicit types most of the time, despite them often generating lots of template error messages until you get them right.
70 - Context Methods for Synchronization
And now for some more details on the interface. Besides the DIA operations interface, there are also many methods in the Context class which are useful.
The first you already saw: my_rank()
returns the rank or ordinal number of the worker. It is most often used to print some result only once.
But there are many others, which are similar to MPI's collective operations: you can broadcast, prefix sum, and allreduce items. Different from MPI, these methods use Thrill's serialization, which means they automatically work with many different types.
And there is also a Barrier synchronization method, in case you need that, for example for timing.
71 - Serializing Objects in DIAs
I often talk about Thrill's serialization without giving any details. This is because it mostly just works the way you think it should.
The serialization framework automatically works for PODs, plain old data types, which are integers, characters, doubles, floats, structs, and classes containing only PODs. Most importantly, fixed-size classes are PODs, I believe, if they only have the default constructor and not some specific constructor.
Many of the STL classes are also automatically serialized. strings of course, pairs, tuples, vectors, and so on.
The serialization format for all of these is totally trivial. Trivial types are serialized as verbatim bytes. For strings or vectors, the number of characters or items in the vector is prefixed to the actual data. In the end it's all quite simple.
If you run into problems with a class not being serializable, you can add a serialize()
method. For more complex classes such as this one which contains a variable length string, we use the cereal framework, that's the cereal as in your morning breakfast.
See the cereal web site for more info. But it is really mostly as simple as this piece of code here: you have a serialize()
method, which takes a template Archive, and puts or restores all items of the struct to or from the Archive. That's it, and then this item becomes serializable.
72 - Warning: Collective Execution!
And now one last word of warning, before we jump into some real examples.
Maybe the most important word of warning: in Thrill programs, all workers must perform all distributed operations in the same order!
This is of course because the distributed operations implicitly synchronize their collective execution. This allows the implementations to assume the states the others are in, without communicating at all. This saves communication and latency.
So it is important to remember, that something like the following code piece does not work: Why?
(pause)
Simple: the .Size() method here is run only by the worker with rank 0. That just does not work, because all workers must run all distributed operations.
This is of course a trivial example, but it is actually quite a common pitfall, because Thrill's syntax is so convenient.
73 - Tutorial: Playing with DIAs
Okay, let's get to the tutorial part.
Here are some examples of small tasks you can try to implement in Thrill to get a feeling of the framework, and how to use it.
All are very simple, and you just have to use the previous slides containing the DIA operations, figure out how the actual C++ syntax works, and apply and parameterize a few of them. Very simple stuff.
For the purpose of this YouTube video, I will actually do the third one.
If you try to do them yourself: one word of advice. Use lots of .Print() methods! Just print out the DIAs and figure out what the next step should be.
(live-coding demonstration)
74 - The Thrill Framework: Execution of Collective Operations in Thrill
So in the last section, I gave you an overview of how to actually program in Thrill by detailing how to chain DIA operations to form a DIA data-flow graph.
And I even wrote a simple Thrill program for you and ran it on some input data.
Now we are going to look at more details of the DIA operations and discuss how are they actually executed.
Much of what you saw up to now, could just as well have been written in a functional programming language as list operations, there was only very little details about how everything is run on a distributed system.
75 - Execution on Cluster
But Thrill is of course a distributed framework. Which means it is designed to run on a cluster of machines.
This is implemented in Thrill by compiling everything into one single binary, which is launched on all machines simultaneously.
This one binary uses all the cores of the machine, and attempts to connect to other instances of itself running on other hosts via the network.
This one binary is exactly what we have been compiling in the examples previously. It can be run on a single machine, or on many connected via a network.
This schema is similar to how MPI works: there is only one binary, which is run using mpirun on a cluster. And as in Thrill, flow control is decided via collective coordination using the C++ host language.
For Thrill there are launcher scripts, which can be used to copy and run the binary via ssh or via MPI. More about those in the following on-hand tutorial section.
One word about names: machines are called hosts in Thrill-speak, while individual threads on cores of a host are called workers. We use this terminology to avoid the word "node", especially for network hosts. Cores or threads are also ambiguous.
76 - Example: WordCount in Thrill
For an overview of what actually happens when Thrill is run on a distributed system, let us reconsider the WordCount example from the beginning.
WordCount consists of five DIA operations, and you have seen what each of them does previously in this tutorial.
The first is ReadLines, which creates a DIA of std::string from a set of text files containing each line as a DIA item.
This DIA is immediately pushed into a FlatMap operation, which splits the lines apart into words. The output of the FlatMap is a DIA of Pairs, containing one Pair for each word, namely ("word",1).
In the example, I labeled this DIA word_pairs
. That is of course completely unnecessary, and only to show that you can assign DIAs to variable names.
This DIA of Pairs is reduced using ReduceByKey. Remember, the ReduceByKey operation collects all items with the same key and applies the reduction function to combine them. For this we parameterize, the ReduceByKey method with a key extractor, which just takes out the word from the Pair, and a reduction function, which adds together the second component of the Pairs, which in this case is the word counter. The key is obviously the same in a and b inside the reduction function, so we can pick either of them.
The result of ReduceByKey is already the word counts which we want, but to write the result using WriteLines, we have add a Map from Pair to std::string. This is because WriteLines can only write a DIA of std::strings. So this Map operation is actually used to format the Pair as a string.
Yes, and that's it, that is WordCount in Thrill.
As said before, these are just 20 lines of code, excluding the main caller program, which is where the Context comes from, and which you have seen before.
But what I just explained is the functional view of the program. There was nothing parallel or distributed about what I just explained.
Now, let's look at what actually happens when this Thrill program is executed.
ReadLines takes as input a list of text files. These text files are sorted lexicographically and split up evenly among the workers: Thrill looks at their total size in bytes, partitions the bytes or characters evenly, and then searches forward at these places for the next newline.
So the DIA of lines is actually evenly split among the workers. These then execute the FlatMap on each line of the DIA. However, consider for a second how large the DIA of Pairs (words,1) actually is.
It is huge!
But luckily, Thrill never actually materializes DIAs unless absolutely necessary, remember everything is a lazily executed DIA data-flow graph. The DIAs actually never exist, except for some rare exceptions.
So what actually happens is that the text files are read block by block, the lines are split by scanning the data buffer, and each line is immediately processed by the FlatMap. There is even a variant using a string_view
, such that the characters aren't even copied from the read buffer into a temporary std::string
.
The Pairs (word,1) are equally, immediately processed by the following ReduceByKey operation.
And what is ReduceByKey? It is actually a set of hash tables on the workers: In the first phase, the items are placed in a local hash table and items with the same key, are immediately reduced in the cells of the hash table.
So, again, neither the DIA of lines, nor the DIA of Pairs actually ever exists as an array. The text files are read, split, and immediately reduced into the hash table in ReduceByKey. The hash table is the only thing that uses lots of memory.
And all of this is done in parallel and on distributed hosts.
So after the first phase, each worker has a locally reduced set of word counts. These then have to be exchanged to create global word counts.
This is done in ReduceByKey by transmitting parts of the hash table: the entire hash value space is partitioned equally onto the workers, and because we can assume an equal distribution, each worker gets approximately the same number of keys.
So each worker is assigned a part of the global hash value space, which can actually be done without any communication, and these then receive all keys for that partition from all workers. So each worker transmits almost the entire hash table that it has, excluding the part for himself, but in a pre-reduced form.
On the other side, the items are received from all workers and reduced in a second hash table phase, and again items with the same key are reduced in the cells of the hash table.
Back to the WordCount example. When the second reduce phase is finished, the hash tables in each worker contain an approximately equal number of items.
The following DIA operations consists of the Map from Pair to string and the WriteLines operation. Again, these steps are chained or pipelined, which means each worker scans over the items in its hash table, applies the Map operation item-wise, and pushes each item into a WriteLines buffer. The WriteLines buffer is flushed to disk when full.
And such that all workers can operate in parallel, each worker writes its own text file, which means there will be many, many output files, which concatenated in lexicographic order are the complete result of this program.
On a side note: we used to have a method to write just one big text file, but actually the distributed file systems used on clusters work better when writing many files in parallel than writing only a single file at multiple places at once.
So this is what actually happens when Thrill executes this WordCount program. Each operation is internally implemented using distributed algorithms, and they are chained together using the DIA concept. The DIAs, however, never actually exist, they are only the conceptional "glue" between the operations. The operations themselves determine how the items are stored in memory or on disk.
In the case of WordCount, the ReduceByKey stores the items in a hash table, and possibly spills slices out to disk if it grows too full, more about that later. ReadLines and WriteLines only have a small read or write buffer, and the FlatMap and Map are integrated into the processing chain.
Yes, that was WordCount.
77 - Mapping Data-Flow Nodes to Cluster
Now let's have a closer look at how this processing chain is implemented in Thrill.
On the left here there is an example DIA data-flow graph: a ReadLine, PrefixSum, Map, Zip, and WriteLines. This graph does not have to make sense, it's just an example.
Each distributed operation in Thrill has three phases: a Pre-Op, a Main-Op, and a Post-Op. Each phase is actually only a piece of code.
The Pre-Op receives a stream of items from previous DIA operations and does something with it. For example in PrefixSum each worker keeps a local sum of the items it receives and just stores them.
Once all local items have been read, a global prefix sum of the local sums has to be calculate. This is the Main-Op, and of course requires communication.
This global prefix sum is the offset value each worker needs to add to its own items for the complete prefix sum result to be correct. This of course it done only when the data is needed, which means in the Post-Op or PushData phase.
In this phase, the stored items are read again, the local partial prefix sum is added and further local items are aggregated into the sum. The items calculated in the Post-Op of PrefixSum are then pushed down to further operations.
In this case it is a Map operation, which is easy to apply, and the result of Map is pushed down even further.
Each distributed operation implements different Pre-Op, Main-Op and Post-Ops. In the case of Zip incoming items are only stored in the Pre-Op.
In the Main-Op of Zip the number of stored items is communicated and splitters or alignment indexes are determined. The stored item arrays are then aligned such that all items with the same index are stored on one worker. This can be as expensive a complete data exchange, or only a few items which are misaligned, depending on how balanced the input was. In some sense in Zip the DIA array is actually materialized and then aligned among the workers.
Once the arrays are aligned, the arrays are read again in the Post-Op, the zip function is applied, and the result pushed further down, in this case into WriteLines.
Okay, so in this example we saw how chains of DIA operations are built. Local operations are simply inserted into the chain, while distributed operations consist of Pre-Op, Main-Op, and Post-Ops.
77b - Mapping Data-Flow Nodes to Cluster
From a BSP model view, the Pre-Op, Main-Op, and Post-Ops are in different supersteps, and the lines in between are barriers. These barriers are impossible or very hard to avoid, because in these cases the next phase can only start when all items of the previous phase have been fully processed.
This also means that local operations, such as the Map here do not require barriers.
77c - Mapping Data-Flow Nodes to Cluster
Again, this means these can be integrated into the processing pipeline, and Thrill does this on a binary code level using template meta-programming and lambdas.
To be precise, all local operations before and including a Pre-Op are chained and compiled into one piece of binary code. The previous Post-Op cannot be chained, instead there is one virtual function call in between them, which is this line here. The reason is that new DIA operations have to be able to attach to the Post-Op at run-time.
78 - The Thrill Framework: Tutorial: Running Thrill on a Cluster
So, after these heavy implementation details, let's get back to some hands-on exercises. In this section I will explain how to run Thrill programs on a cluster.
79 - Tutorial: Running Thrill on a Cluster
As mentioned before, Thrill runs on a variety of systems: on standalone multi-core machines, on clusters with TCP/IP connections via Ethernet, or on MPI systems.
There are two basic steps to launching a Thrill program on a distributed cluster: step one is getting the binary to all hosts, and step two is running the program and passing information to all hosts on how to connect to the other instances.
Thrill does this, not read any configuration files, because this would require the files to also be distributed along with the binary, instead Thrill reads environment variables, which are passed by the scripts.
80 - Tutorial: On One Multi-Core Machine
Okay, but before we get to clusters, first more about running Thrill on a single multi-core machine. This is the default startup mode, and you have already used or seen me use it in this video.
This default startup mode is mainly for testing and debugging: the default is that Thrill actually creates two "virtual" test hosts on a single machine, and splits the number of cores equally onto the virtual hosts. These then communicate via local kernel-level TCP stream sockets. This means this setup is ideal for testing everything in Thrill before going onto real clusters.
When Thrill is run, it will always print out a line describing the network system it detected and is going to use. You already saw this line in the first example: Thrill is running locally with 2 hosts and 4 workers per host in a local tcp network.
There are two environment variable to change the parameters. THRILL_LOCAL changes the number of virtual test hosts, and THRILL_WORKERS_PER_HOST changes the number of threads per virtual host. I suggest you try changing these parameters on an example program.
If you want to run Thrill programs on a standalone multi-core machine for production, it is best to use THRILL_LOCAL equals 1, because then all communication between workers is performed internally without going to the kernel.
81 - Tutorial: Running via ssh
This next method is used for launching Thrill programs via ssh.
This method works for plain-old Linux machines which you can access via ssh, for example, I used this for AWS EC2 virtual machines.
For it to work, you must be able to ssh onto the machines without entering a password, so please install ssh keys.
In principle, a shell script is then used to ssh onto all the machines, run the Thrill program simultaneously on all of them and pass the other host names via an environment variable such that they can connect to each other.
For this the Thrill repository contains an "invoke.sh" script. This script takes a list of hosts, optionally a remote user name, and a Thrill binary path.
The script also has many more parameters which you can see by just running it. For example, for the case when the host names used for ssh are different than those needed for connecting directly among the running instances.
However, there is one important flag I would like to mention: "-c", which copies the Thrill binary via ssh to all remote machines. If your cluster is set up to have a common file system, like NFS, Ceph or Lustre, then the binary can be accessed on all machines directly, and the script can just call a full path.
If however, the machines do not share a file system, for example, AWS EC2 instances, then add the "-c" flag and the script will copy the binary onto all machines into the /tmp folder and run it from there.
82 - Tutorial: Running via MPI
And the last method is using MPI.
And it is maybe also the easiest, because Thrill can simply auto-detect that it was launched using MPI. No configuration at all is necessary, because it can just read all the information from MPI.
For this to work, you only have to compile the Thrill programs with an MPI runtime, which is automatically done with cmake if it can detect an MPI library.
When compiled with MPI, you can run the binary with mpirun, and it will use MPI as transport layer. You can verify that you are really using MPI by checking this via the Thrill startup message.
Thus Thrill programs can also easily be scheduled on supercomputers using Slurm or some other MPI cluster manager.
83 - Tutorial: Environment Variables
And these are some of the other environment variables you can use to configure Thrill.
THRILL_RAM sets the amount of RAM Thrill uses, which is usually auto-detected.
You already know THRILL_WORKERS_PER_HOST, which is also usually auto-detected as the number of cores on each machine.
And there is THRILL_LOG which can be used to write a JSON log, and I will show you more details on that in the next section.
Here below are shown how to set the environment variables:
When running Thrill programs directly, just prepend the program invocation with the variables, like this.
The same is true when using the invoke shell script. The script actually copies any environment variables starting with THRILL_ but also sets a lot of them itself.
And when running with MPI, the environment variables can be passed using mpirun. Depending on the MPI implementation there are different flags for this, for OpenMPI I know that it is "-x".
84 - The Thrill Framework: Tutorial: Logging and Profiling
So we now know how to launch Thrill programs on a cluster.
This next section will show how to use Thrill's integrated logging and profiling facilities such that we can actually see what happens on the cluster.
85 - Tutorial: Logging and Profiling
To activate the built-in logging system, you have to set the environment variable THRILL_LOG. The last section showed you how to do that.
This instructs Thrill to write a JSON log file, actually one for each host, and these files can grow rather large. The log files contain various events like when DIA operations start and stop, how they depend on each other, and also a profile of the CPU, network, and disk usage. This system profiling is only available in Linux.
The JSON log itself is probably not much use directly, but there are two utilities in the Thrill repository to parse and process this log. The first tool generates a time-based profile in HTML format viewable with a web browser and the second tool extracts the DIA data-flow graph of the execution.
The following slides will show examples generated by running the PageRank example on a multi-core system using two virtual hosts. The steps to generate the example log are shown below in case you want to try them out yourself.
The tool json2profile generates graphs in HTML plotted using javascript.
86 - Tutorial: Example Profile
And this is a screenshot of the profile generated by the example.
The profile contains many series and only some of them are visible by default. The plotting tool is quite nice, because you can show or hide series by clicking on them in the legend.
Most series are averages across all hosts in the distributed system. For example the CPU utilization, memory, network transmission and I/O series are averages.
The series CPU is CPU utilization in percent. CPU user and sys are CPU utilization split into user-land and kernel-land time. Mem RSS is the allocated resident RAM size.
TX and RX net are network send and receive counters. The series TX plus RX, TX and RX net are counters measured my Thrill internally by counting bytes in the network layer, while the three "sys net" series are measurements made by reading system counters from /proc.
I/O sys are read and write counters for disk utilization. These are just measured by the system, and may include any other I/O performed by other programs. The same is true, of course, for the "sys net" network series.
So those were the CPU, network and disk counters.
The next few are internal counters: the series "Data bytes" is simply the amount to data in all existing DIAs. This is not an average, but the total over all hosts. This series is quite useful to see if the actual data size in DIAs is what you expect it to be.
Now as I have iterated many times before, the DIAs rarely ever exist, so you may be thinking, how can we measure their size? Well, the simple answer is we don't, but it's pretty close.
Items are stored in Thrill inside Files, and Streams, and other lower-layer constructs that I will present in the next section. What is measured in this series is the sum of all items stored in these lower-layer storage data structures. The DIA operations use them to perform their tasks and thus "Data bytes" is just the number of items stored somewhere somehow in DIA operations.
The items in these data structures have various states. They can be located in RAM or be swapped out to disk, if the data no longer fits into RAM.
Furthermore, those located in RAM can either be pinned or unpinned. Without going into too much technical detail: the pinned bytes cannot be swapped to disk, and are those on which the DIA operations are currently working.
"Mem Total", "Mem Float", and "Mem Base" are three memory counters. Again in the next section there will be more technical details on these counters, but for now: floating heap memory is use by normal C++ objects like std::string, and "Mem Base" is used by DIA operations for hash tables and such.
Below the plot there are also further summary statistics and more details about the DIA operation steps.
87 - Tutorial: Output DIA Data-Flow Graph
The other thing you can extract from the JSON log is the DIA data-flow graph.
This can be done with a python script that generates a graph in dot or graphviz format, and by layed out as a waterfall level-wise using the "dot" tool.
The graph of the PageRank example shows all ten iterations rolled out, which doesn't fit onto the slides; here below is just the top of this graph. The various DIA operation classes are represented using colors and shapes in the PDF or SVG output.
88 - The Thrill Framework: Going Deeper into Thrill
Okay, in the previous two sections we saw how to run Thrill programs on a cluster and how to generate a profile and log to inspect what it actually does.
This section now focuses on some of the deeper secrets or designs used in Thrill. It contain information about why and how things are coded and help explain phenomena in the profiles.
It is also an overview of the source code, which is difficult to navigate or modify without having a big picture view.
89 - Layers of Thrill
Thrill is built as a layered framework. These here are the layers, which are separated into different directories in the repository and also into separate C++ namespaces.
Higher layers depend on the lower layers. This layering is strict, lower layers must not depend on higher layers otherwise all hell breaks loose in the code. Layers beside each other in the diagram are usually somewhat independent, except for the data and net layers which go hand-in-hand.
On the bottom there are common tools and utilities classes, which are stand-alone data structures and otherwise completely independent pieces of code.
Since this common code was so useful we extracted much of it into an extra library named tlx. It is now maintained separated from Thrill and is used in many other projects. tlx contains rock-solid reusable C++ data structures and code, and is worth checking out independently from Thrill.
The "common" namespace in Thrill still contains many other utility classes more specific to Thrill.
Also on the very bottom layer is the mem namespace, which overrides malloc, contains allocators, and allows memory usage tracking. This module is super low-level and cannot depend on any higher level, because all higher layers use dynamic memory allocation.
Next up come the data, net, and foxxll layers.
The data and net layers are intertwined.
The net layer builds up an asynchronous messaging system on top of three different transports: a mock system simulating a network, a real kernel-level TCP stream sockets based system and on MPI calls.
It abstracts these transports and establishes an asynchronous callback-based dispatching system, which allows higher layers to send a packet to a different host in a group and when the send operation is done to receive a completion callback. Likewise, asynchronous receiving is possible to accept the messages and then receive a completion callback.
On top of this very basic asynchronous send and receive functionality we built collective operations similar to MPI: a broadcast method, reduce, allreduce, and a few more depending on what was needed. These however only work well for small items.
Larger amounts of data are handled by the data layer. The next few slides will explain more about the concepts of this layer, but I'd like to mention the main ones here: at the lowest layer items are serialized into flat memory areas called Blocks. Multiple blocks can be collected as a File.
Files can grow very large and store lots of items, and Thrill will transparently write out Blocks to disk if there is too little RAM available. This is done using a separate asynchronous I/O library called "foxxll" which was borrowed from the STXXL library.
Beyond Files, the data layers also constructs so called Streams which allow massive asynchronous all-to-all data exchanges across the network, using the network layer.
The internal implementation of these all-to-all Streams requires item writers and readers, multiplexing of packets, thread-safe BlockQueues, and much more.
Above the data and net layers there is the core and vfs layer.
The core layer contains higher level algorithms and data structures built on the data layer. Here for example, the hash tables are implemented for ReduceByKey, but also multiway merge for Sort and bit stream encoding methods which can be used compress streams sent via the data layer.
The vfs layer or virtual file system layer is independent of the core layer and is used to read or write local, remote, or compressed files in the ReadLines, WriteLines and similar DIA operations. It is a simple file system abstraction, and mostly implemented because I wanted to transparently decompress gzipped files on-the-fly. I remember that I also added HDFS supported some time ago, but that the underlying library sadly is no longer maintained.
And then on top of everything there is the api layer implementing the DIA operations API. The central class here is the DIA class, which however is only a handle into the DIA data-flow graph.
The individual DIA operations are implemented as C++ template classes which expose the Pre-Op, Main-Op, and Post-Op code pieces mentioned previously. If you are interested in how these work, I would recommend starting by reading the PrefixSum code. Much of the glue between DIA nodes is automatic, and PrefixSum is simple enough to easily separate the glue code from the operation's actual implementation code.
90 - File - Variable-Length C++ Item Store
So after that high-level overview, let's look at some of the central design concepts. These have evolved out of various iterations of designing, implementing, and then again redesigning the code.
In the data layer, the central concept is the item storage data structure.
There is obviously a need in Thrill to store items as bytes in a linear serialized fashion. Any item that is stored in a DIA must be serializable and deserializable. More about the serialization methods later.
For storing a sequence of items in the data layer, one of the top design goals was to avoid as much overhead per item as possible, because items could be as small as single bytes or characters. This is of course a completely different approach than JVM-based frameworks, which often have large overheads per item.
So in Thrill we devised and implemented the following design: the item serialization methods are given a pointer, they are allowed to write raw binary data at that pointer and at the same time advance this pointer to the end of the item and the possible start of the next one. The deserialization function does the opposite, it gets a pointer, reads raw data, and again advances the pointer to the beginning of the next item.
By repeated calling the serialization method, a continuous stream of items can be stored without any padding or other overhead, the only necessary criterion is that the deserialization method can decode it in the same way and internally knows how far to advance the pointer to the next item.
Now there are some details on how to work with large items and such, but I will glance over these for the purpose of this tutorial.
This serialization design however means that we cannot jump to an arbitrary item number, because there are no entry markers in the byte stream.
Instead, we store blocks of serialized items. Each block then has one entry pointer to the first item inside the byte stream at which deserialization can start. In theory we could add more entry pointers if needed, but currently we only have one for the first item, this first pointer here.
However, besides this super-compact serialization method, Thrill's data layer had a second design criterion: to be able to cut-and-paste or slice-and-dice item sequences in arbitrary ways. And we obviously wanted to implement this slicing and dicing without copying the actual item data.
So the design we finally came up with contains so called ByteBlocks and Blocks.
ByteBlocks are large memory buffers of about 1 megabyte in size and used to store the serialized item stream. ByteBlocks are dumb byte buffers and know nothing about the items inside, they only have a memory area and know their size.
ByteBlocks are referenced by Block objects, which contain a reference counting pointer to the ByteBlocks, and four important integers: the beginning and end of the valid range of bytes inside the ByteBlock's buffer, a pointer to the first item, and the number of items starting between first and end.
The first pointer is used to start the deserialization. However, items can be larger than a single ByteBlock, which means they may span multiple buffers, like the item 4 on this slide. But that is not a problem, because Blocks are usually stored in sequence inside a File object, and that spanned block just has an item count of zero.
The combination of ByteBlock, Block, and four integers allows us to cut, slice and dice, and otherwise reorganize sequences of items completely arbitrarily without copying data. And this is necessary for many data operation in Thrill. The most common usage is to split a File into p parts for sending one slice of the File to each worker.
Furthermore, we can also seek to an item index inside the File with reasonable efficiency: first seek to the right block containing the item using a prefix sum of the num_items of the Blocks, and then deserialize the remaining item stream.
This whole schema is necessary to support variable-length items.
But Thrill also contains a very important acceleration for fixed-length items, such as integers, or POD structs. For these fixed-length items one can obviously directly calculate the offset, and Thrill does this.
Okay, this is the highly flexible variable-length item storage system in Thrill.
For supporting extremely large data, Thrill actually now has a somewhat more complex implementation of this system, which allow the contents of ByteBlocks to be swapped out to disk transparently! Thrill maintains a least-recently-used LRU list of ByteBlocks, and if too much data is allocated, it will write out only the contents of unpinned ByteBlocks to disk.
The metadata structure which you can see on the slide remains in RAM, only the data itself is swapped out. But for this to work correctly, algorithms then have to "pin" those ByteBlocks and Blocks which are needed in RAM. And that is why in the implementation there are actually PinnedBlock and PinnedByteBlock classes, which keep reference counted pins onto the data.
However, these are quite low-level design issues which even hardcore-level users of Thrill usually will never reach.
91 - Readers and Writers
Instead Files and other data Streams in the data layer are read and written using so called Writers and Readers, and these abstract away all the pinning issues that I just mentioned.
On the following slides Writers are symbolized with red triangles and Readers with green triangles. Those are supposed to symbolize serialization: Writers take items and channel them down into a byte stream, and Readers expand byte streams into items.
Writers are used to fill Blocks and ByteBlocks with items, and when a Block is full, this Block is passed on to a so-called BlockSink, which is basically anything that accepts Blocks.
A File for example is a BlockSink, and it appends a new block to its end. But there are also network BlockSinks that send a Block via the network to another worker.
Likewise a Reader can load blocks from a BlockSource and deserialize items.
The code example below shows typical Reader and Writer loops. First the Context is used to create a File object.
A File object has a method GetWriter()
to construct a Writer that stores new blocks inside the File. And the main method of a Writer is Put<T>()
which appends a new item of type T. When a Writer is closed or destroyed then the last block is pushed into the File.
To read the data from the File, the File has a GetReader()
method, which constructs a matching Reader. And the Reader has an interface similar to Java iterators with two methods: a HasNext()
method to check if there is another item available, and a Next<T>()
method to deserialize it.
Notice that File is not a template class, so a File does not know what type of items it contains. This may seem unintuitive at first, but it turned out that all the code was simpler when only Put<>
and Next<>
are passed the types. Put<>
takes an item and writes bytes, while Next<>
reads bytes and extracts an item. One can even interleave different data types inside a File.
All of this is quite dangerous and can lead to errors if the types are mismatched, and that is why Thrill has some optional self verification debugging methods to check that serialized and deserialized data types are identical. This has some overhead when running in debug mode, but is off and has no cost when running in optimized Release mode.
Notice that Readers and Writers both operate on streams of items. This is crucial in all of Thrill: large amounts of data must be streamed in sequence, because random access to Files is very expensive.
92 - Thrill's Communication Abstraction
So, Readers and Writers are used in much of the data layer to serialize and deserialize items into blocks.
These block can be store in Files and transmitted via network Streams.
This slides shows the three main communication abstractions provided by the data layer and used by DIA operations in Thrill:
There are the Files we just saw, which are a way to store large amounts of items on the local worker, basically to send itself data into the future.
The second abstraction are so called Streams, which are asynchronous big data all-to-all exchanges. A Stream is created across all workers, and provides p Writers on each worker which transmits items to the corresponding worker id.
At the other end, a Stream can open p Readers to read the inbound data from the p workers. More about Streams on the following slides.
And the third communication abstraction are MPI-like collective operations such as Broadcast, Reduce, and PrefixSum for small items.
And that's it. These three communication abstractions are everything needed to build all the complex DIA operations. There are, however, some helpers, for example multiway merge, to make life easier in the data and core subsystems.
93 - Stream - Async Big Data All-to-All
But first, let's have a closer look at the Streams.
Streams are complex objects and are synchronized between hosts implicitly only by order of their allocation, which is similar to how DIA operations are synchronized implicitly: all hosts must create all Streams in the same order.
Each Stream exposes p writers and p readers, where p is the total number of workers in the system. Writing and reading can happen at the same time, nothing in the code base actually enforces a barrier or anything like that, but most often, first all data is pushed into the Stream, and then all data is read out of the Stream.
So one can see it as a bulk-synchronous parallel (BSP) style exchange with a barrier.
Internally, the Stream is a complex object switching some blocks over loopback BlockQueues to other workers on the same host, these are "transmitted" internally, and sometimes actually sending blocks to remote hosts via asynchronous multiplexed data streams.
There are many more details to Streams, for example, the closing of the Stream's Writers is done is a cyclic fashion to improve load balancing and avoid network contention. Otherwise the first worker would get all messages first and last one would become a straggler. Furthermore, the close messages which end streams are also aggregated on the host level. Again there are many more details to Streams and these are obviously the most important classes in the data layer.
94 - Thrill's Data Processing Pipelines
There are actually two subtypes of Streams: MixStreams and CatStreams. These are more commonly used than plain Streams. While both have p Writers, these variants have only one Reader.
A MixStream's Reader delivers the items from the other workers in an arbitrary order, namely the order in which the data blocks happen to arrive via the network from other hosts, basically as fast as possible, thus mixing the order of workers.
And a CatStream's Reader delivers items in order of the worker indexes, namely concatenating all worker inputs in order. This may require buffering some of the data of the last workers in RAM, and is thus potentially more expensive than the MixStream.
These are the variants used in DIA operation to construct what I call data processing loops: these are loops which get items from Readers, process them somehow, and then put them into various Writers which send the items somewhere else.
A simple data processing loop is shown here on the left: we use a Reader to read items from a File in order, do something with each item, for example Map it, and then place the resulting item into a Writer for transmission to another worker.
On the right there is another example, reading items from a MixStream, so from other workers in some arbitrary order, doing something with them, for example keeping a fixed number of samples, and then serializing the item into a File using a Writer.
And these are basically the loops out of which most DIA operations are constructed.
95 - Thrill's Current Sample Sort
And now let's look at the data processing loops of one of the more complex DIA operations: the current Sample Sort implementation in Thrill.
The Pre-Op of Sort is rather simple: items are simply written into a File and sampled uniformly at random using a growing reservoir sampling implementation.
95b - Thrill's Current Sample Sort
When the Main-Op is called the complete input stream has be processed. The samples are then sorted globally, which of course requires communication, and from the sorted samples p-1 splitters are chosen and a common classification tree is constructed on all workers.
Each worker then reads the items stored in its local File and uses the classification tree to decide which worker to send the item to. Once classified, the item is pushed into one of the Writers of a Stream.
95c - Thrill's Current Sample Sort
On the other side of the Stream, which is a MixStream, the incoming items are just read in an arbitrary order and stored in an array of size Theta(M), which just means that the array is as large as possible, such that it still fits into RAM.
Once this array is full or all items have been received, it is sorted locally, and again written into a File for "long-term storage", because in the array items are not serialized.
This can happen multiple times, depending on how much RAM is available, how big this array is, and how much data is processed, which may result in multiple sorted Files.
These are two parts of the Main-Op phase of Sort, in theory there is a BSP barrier between the sending and receiving of the items in the Stream. However, Thrill does not enforce this barrier, which means the sending phase and receiving phase can process items simultaneously.
95d - Thrill's Current Sample Sort
The result of the Main-Op are one or more sorted Files, and in the Post-Op, these are merged using a multiway merge algorithm. Inside the triangle here is an efficient loser tree implementation, which reads sorted input streams from the Readers deserializing the sorted Files.
And thus a sorted sequence of items is pushed out of the Sort operation into whatever is attached to it.
So I hope this complex example helped you understand how DIA operations are implemented. Reading the Sort operation's code should now be much easier with this high-level view.
However, I must warn you that the Sort implementation may change, because we have some results that a Merge Sort-based implementation is actually better than this Sample Sort one. So just keep that in mind, the sort implementation may change.
96 - Optimization: Consume and Keep
And now we come to some of the pieces of darker art in Thrill.
The first are Consume and Keep.
A question that previously was never touched is "When are DIAs actually freed?"
Or more precisely the data stored in DIA operations, because the metadata of the DIA data-flow graph may actually live longer.
This is a complex question but worthwhile to think about, because we want to process massive amounts of data.
There are some straight-forward answers: obviously, Thrill can deallocate a DIA's data if all handles go out of scope. However, this is often only the case at the very end of a program, and DIAs can be huge.
Hence, there is also a manual disposal method on a DIA, called .Dispose(), which free's its internal data.
But Thrill also has a third, more advanced DIA data management method called Consume and Keep.
The idea is that the input DIA is consumed while processing DIA operations. This is obviously a great feature, because it potentially doubles the maximum input size that can be processed.
For example, if the data in DIA A here is 1 TB in size, and we want to process the data path A -> M -> B here, and M doesn't actually do anything with the data, then usually one would need 2 TB of memory, because A is stored and B is created by this processing step.
However, if we enable consumption of A while creating B, then this processing step needs only 1 TB plus a small overlapping block size. That is a pretty cool feature, because we thus increased the maximum input size by a factor two.
However, this consume operation obviously comes at the cost of additional complexity, that the data of DIA A is gone afterwards, and the problem is that by default Thrill cannot know that the data of DIA A will never be used again. Quite often in source code there is still a variable referencing A such that it is completely possible to attach another DIA operation to A at run-time and thus require A to deliver its data again.
This is of course unfortunate and may lead to strange "I have no data left" errors from DIAs.
97 - Optimization: Consume and Keep
And that is why DIA consumption is disabled by default. DIA data is only freed once all references go out of scope or manually. This is easier to understand and makes writing Thrill programs simpler.
However, once the basic implemented algorithm is fixed and works, it is possible to improve RAM usage by enabling consumption. This is done by calling enable_consume()
on the Context.
The trick is that Thrill then assumes that all DIAs are read exactly once in the first execution and no future DIA operations are attached.
This will obviously be wrong in some cases, mostly iterative loops, which is why one can increment the consumption counter of each DIA using a virtual .Keep() operation. This increases usage by one, basically it tells Thrill to keep the data of a DIA for another following operation. There is a simple example on the next slide which makes everything more clear.
For us humans it is often difficult to say when a .Keep() is needed. That's however not a big problem, because you can just run your Thrill program, and it will print an error message when a DIA's data is requested again after it is consumed. In general, adding .Keep()s is the easy part, it is however at lot less easy to find superfluous .Keep()s, so add them sparsely and don't go and just add .Keep() everywhere.
98 - Optimization: Consume and Keep
Here is an example for Consume and Keep:
Assume you want to read a text file, and count the lines, and then sort them.
Since Size is an action, it will read and consume the ReadLines DIA's data. Thrill cannot know that you will add a Sort method afterwards.
98b - Optimization: Consume and Keep
That is why you have to add a .Keep() in front of the Size to increment the counter, such that Size does not consume the contents of ReadLines when it is run.
This is easy to spot in this particular case, but for larger algorithms it can be very hard, but as stated before, Thrill will just print an error message if a DIA has no data left and thus a .Keep() is needed somewhere.
98c - Optimization: Consume and Keep
And now for a bonus: there is actually a better solution for the example above. The trick is to use ActionFutures. These are Actions, which do not triggered execution immediately. Instead they return a future object, which can be waited on and queried later.
In the example, this allows the ReadLines data to be read only once, and processed by both operations simultaneously. That's obviously better than reading it twice. The value of the size_future
variable can be queried with .get()
, which executes the DIA operation and returns the result.
99 - Memory Allocation Areas in Thrill
And here I want to excuse myself, but this section is slowly turning into a somewhat unorganized mess of random Thrill internals.
Next we will look at the memory allocation areas in Thrill.
The total amount of RAM on a host is split by Thrill into three parts, which I labeled M1, M2, and M3 here. By default all are the same size, namely one third of the available RAM.
M1 is memory used by DIA ByteBlocks to store serialized items in. This is the main data structure in the data layer storing items as mentioned previously in this section.
If there is too much DIA data, then the contents of ByteBlocks may be swapped out to an external memory, which is why there is a pinning mechanism necessary for ByteBlocks. But all this is automatically hidden behind the File, Reader, Writer, and Stream objects.
M2 is memory reserved for allocations done by DIA operations. These are buffers for sorting, like in the Sample Sort we saw earlier, or hash tables used by ReduceByKey and many more examples.
And last is M3 which is "free" C++ heap memory. This is where the usual C++ classes allocate memory, like std::string, but also various Thrill objects like the DIA nodes. It is just the remaining heap space, besides M1 and M2.
M1 and M2 are managed and counted explicitly by Thrill, and it self-imposes a limit on the objects in these areas. M3 is also counted by overriding malloc and other black magic, but it is not limited explicitly.
You can see the usage of these memory area in the profile plot.
100 - Memory Distribution in Stages
The memory of M2 is distributed automatically when running an execution stage, which is colored in red here and contains the Post-Op and all attached LOps and Pre-Ops.
When a stage is executed, all participating DIA operations are asked for the RAM requirements of their Post-Op and Pre-Ops.
They can answer with a fixed amount, like I need 2 megabytes, or that they want as much as possible. The executor collects all the requests, and fairly distributes the memory of M2 to the operations.
This is crucial to make the hash tables or sorting buffers in the DIA operations as large as possible, within the restrictions of M2.
What we currently haven't implemented is to request ratios of the available RAM or similar fine tuning.
But this completely transparent automatic RAM distribution feature is pretty useful and relieves the user from calculating the size of any internal data structures.
101 - Pipelined Data Flow Processing
In the next part of this tour through internals of Thrill we will consider how DIA operations are chained or pipelined together and then actually executed.
As previously mentioned a Post-Op generates a stream of items in some way. For example Generate just creates them on-the-fly, ReadLines reads files, ReduceByKey empties hash tables, and so on.
In any case, distributed operations, DOps, _emit_ a stream of items in their Post-Op. This steam is processed by attaching function chains to the DOp at run-time. Inside the DIA operation, there is a list of function pointers that are called for each item emitted by the Post-Op.
And this is exactly how DIA operations are chained: new DIA operations simply insert their Pre-Ops into the output list of function pointers of other DIA nodes.
The stage executor then calls PushData on the DOp which instructs the Post-Op to send its data to all attached children. DOps have three states: New, Executed, and Disposed. In the state New they are ready to receive items in their Pre-Op.
When all items have been pushed, the stage executor runs the Main-Op and switches the state to Executed. In the Main-Op the DOps usual communicate and process their data.
When the Executed state is reached, the DOp must be able to send out its data to children when invoked with PushData. This may occur multiple times, until the DIA data is disposed, because future DIA operations may demand the DIA's data by dynamically attaching new function chains.
And how do local operations, LOps, fit into this picture?
102 - Pipelined Data Flow Construction
Well, LOps don't allow communication, so they can be executed immediately without synchronization between workers.
And in Thrill we implemented something fancy: we found a way to use template meta-programming in C++ to chaining of a sequence of lambda functions and to "fold" them into a single piece of code, which is optimized by the compiler into one piece of continuous binary instructions. We use this method to generate and combine the functions which receive the items from a DOp's Post-Op.
In principle a Post-Op calls a function multiple times, such that this function receives a stream of items. The function will probably save the items or call another function and passes them on.
A DIA in that sense is actually a handle to an object to which we can attach functions to receive items. A LOp is simply a lambda function that is attached at the end of a chain of item processing functions currently being collected by the DIA.
Because these functions are stored in template arguments, I label these with DIA and T1, f in angle brackets. This is the previous DIA<T1> with f attached to it at the end.
This chain of lambda functions is closed by adding a DOp, or more precisely by adding the Pre-Op lambda function of a DOp to the end of the processing chain. This terminates the chain, because the Pre-Op cannot call another LOp.
The chain is then "folded" into an actual function and inserted into the previous DOp's child function pointer list, and thus it will receive the stream of items from the previous node.
This actually means that if we add another DIA operation to the handle DIA<T1, f> that the function f will be copied into both item processing paths and thus be run twice on each item. There, however, is a solution for this problem called Collapse on the next slide.
A pretty hard C++ technical problem we had to tackle was how to implement FlatMap, because FlatMap requires the ability to emit zero, one, or more items for one input item. This is difficult to implement because one can't use the return values of lambda functions, as it is always only one item, unless one uses a vector and those require dynamic memory allocation, which is slow.
Our solution was to use "emitter" lambda functions, which you can see in action in the FlatMap signature in the WordCount example. A FlatMap lambda gets an item and a generic emit method as auto parameter, which it may called zero or more times. This emit functor is actually the remaining function chain up to the next DOp's Pre-Op.
When you see it like that, the design actually makes a lot of sense: we basically construct an item processing function out of lambda pieces which receive a) an item, and b) another lambda function or generic functor to which it may emit items.
These are folded together into a single function and attached to the output list inside a DOp and called by the Post-Op with an item stream when running PushData.
That is how the DIA data-flow graph is actually constructed and executed.
103 - Data Flow Construction: Collapse
There is a special DIA operation which folds the function chain explicitly. It is called Collapse, because it, well collapses the template meta-programming lambda chain.
What it does is transform a DIA<T1, f> into a DIA<T2>, where T2 is the type returned by f.
This also creates a real processing node in the DIA data-flow graph. Plain LOp's on the other hand are usually inlined with the following Pre-Op and don't exist explicitly in the DIA data-flow graph.
Collapse is an example of an auxiliary DIA operation. There are others: Cache for example can be used to materialize the DIA in an array or actually in a File as you may have guessed by now, and Union is a pretty wacky piece of code to combine two DIAs of the same type, at the lambda function processing level.
Okay, as you see lots of thought and evolution when into the way DIAs are glued together. It is pretty amazing that everything works so well and is so convenient, and much of it is due to the new features in C++11, 14, and 17 compilers and the high-level optimizations they can do these days.
104 - ReduceByKey Implementation
This is the last slide on deeper implementation details of Thrill, we almost made it.
In it we will take another closer look at how ReduceByKey works. I already discussed many details in the WordCount example, but this is an illustration of the phases which greatly helps.
There are two phases in ReduceByKey: a pre-phase and a post-phase. In the pre-phase, which is the Pre-Op, items are read from the input stream and inserted into a local hash table on each worker.
We tried multiple hash table implementations, and currently it is a growing linear probing hash table. Growing means that in the beginning it is small, and when it fills up too much then it is flushed, and linear probing means that in case of collisions we advance to the next free cell by scanning forward. Linear probing is very cache efficient.
If an item with the same key is found when inserting into the hash table, then these items are reduced locally directly in the hash table using the reduction function.
The pre-hash table is split into p sub-tables, essentially these are p independent hash tables, which overall cover the entire hash value universe from 0 to $u$.
When a sub-table fills up too much, we flush the items out to the network Stream instead of rehashing them and reinserting them into a larger hash table, like one would normally do. But the sub-table's size is doubled up to the maximum size available.
The flushed items are sent via a Stream to the worker handling the particular hash space range of the sub-table.
On the other side of the Stream, the incoming items are read in an arbitrary order, which means the Stream is a MixStream. The items are inserted into the post-phase reduction hash tables, which have a different hash space layout: the entire hash space universe spans across all hash tables on the workers, because each worker handles only one part of it.
Items with the same key are again reduced directly in the hash tables. But even the post-phase hash tables can grow full if a massive amount of data is reduced. Our solution to this is again to create sub-tables in the post-phase and this time to flush them out to disk if they grow too large. These Files are associated with a particular subrange of the already reduced hash universe space range of the worker in the post-phase.
Once all items have been read from the Stream, any items spilled out to disk are reduced further by recursively partitioning the sub-table's hash space range and then reading the items back from disk. This can theoretically occur recursively multiple times, but in practice it happens only for one level.
In the Post-Op the items from the post-phase reduction tables are read sequentially and pushed out into the next DIA operation.
That was the ReduceByKey implementation. The ReduceToIndex operation has a similar construction, except that instead of the hash space universe, the index space universe is partitioned recursively.
105 - The Thrill Framework: Tutorial: First Steps towards k-Means
And that was the end of the section going deeper into Thrill.
In the following section, we will turn back to more high-level topics: namely, we are going to walk through a tutorial and live-coding demonstration on how to implement a simple k-means clustering algorithm in Thrill.
106 - Tutorial: First Steps towards k-Means
So, first up: what is k-means?
For k-means, we are given $n$ $d$-dimensional points and a fixed target number of clusters $k$. Goal is to determine cluster points and to assign points to centers, which serve as a so-called centroid or "prototype" point for the cluster.
The algorithm is iterative and easy to implement: just start with $k$ initial center points, selected from the point set uniformly at random, and iteratively improve them.
The improvement iterations are done by assigning all points to one of the currently selected clusters and then calculating new centers by taking the mean of all points in a cluster, hence, calculating a "better" center for the current cluster.
This is repeated a fixed number of times, or one can use more advancing stopping strategies, like breaking when the centers don't change much any more.
107 - Tutorial: k-Means Iterations
This is an illustration of how the algorithm works: we have a set of uniformly distributed 2-dimensional points, in this case, and ten randomly selected initial centers. The other points are colored to match the closest of the initial centers.
107b - Tutorial: k-Means Iterations
And then we calculate new centers by averaging all points with the same color. This is the state after the first step: the new centers are simply the mean of all points with the same color.
107c - Tutorial: k-Means Iterations
Using these new centers, we again classify all points by their closest center and color them accordingly.
107d - Tutorial: k-Means Iterations
Again new centers are calculated by averaging all points with the same color.
107e - Tutorial: k-Means Iterations
And so on and so further.
As you can see the centers move further and further apart and grow more equal in size in each step, until the algorithm stops, in this example after eight steps.
Okay, that was it, how do we implement this algorithm in Thrill?
108 - K-Means: Printable 2D-Points
I will now do some live coding, but the slides also contain instructions such that you can try to do this exercise yourself. So you have pick now: either stop this video and do it yourself and learn more about Thrill by _doing_, or to watch me implement and learn by _watching_.
If you want to do it yourself, there is a better tutorial in Thrill's documentation which has more detailed tips for the individual steps.
This is the web site of Thrill (https://project-thrill.org), you go into the doxygen documentation, and then there is "Write Your First Thrill Program", this is also the k-means tutorial, except that the steps are written in much more detail than on these slides here.
But now let's start: we first need a simple runner program and a struct for a 2-dimensional Point. We are only going to use 2-d points for this example.
108b - Generate Points, first with all-zero coordinates.
So we are going to start with the program I already wrote previously. I am going to replace this, and copy this 2-dimensional Point structure here from the slides into the program as a starting point.
And then we are going to generate random points. As the tutorial says, we are going to use Generate to make random points, Print, and then Cache them.
Okay, let's see that this compiles first. And then we're going to make a DIA called points using Generate. We are going to generate, I don't know, lets say 100 points. And use a lambda to actually generate the points for an index inside the DIA, and since we are going to start small, we'll just generate points that have zero coordinates everywhere, and Print these.
Okay, sorry an error. Right, Generate is a Source node, which needs the Context.
auto points = Generate( ctx, 100, [](size_t index) { return Point { 0, 0 }; }); points.Print();
108c - Generate random points
There we go, now we have 100 points with coordinates zero. Now we need a random generator, and for this we are going to use std::random, I have a short-cut here to create a random generator,
What coordinates do we want for our points? We'll just use uniformly random points in the interval from zero to one for the beginning.
And we're going to use the zero to one random distribution generator. And before I do another compiler error, we also have to capture these here, and now we should have random points.
Right, so this ampersand captures these two variables, which are of course the random generators, and we can see we now have random points, which is pretty cool.
And now, we actually have to .Cache() these random points, which materializes this DIA, because otherwise the Generate would regenerate random points every time, and this actually means that the points DIA would change every time it is executed. With .Cache() of course this does not happen, because it is materialized.
Okay, so pretty good so far, we have random points, and we printed and cached them.
std::default_random_engine rng(std::random_device { } ()); std::uniform_real_distribution<double> random_dist(0.0, 1.0); auto points = Generate( ctx, 100, [&](size_t) { return Point { random_dist(rng), random_dist(rng) }; }) .Cache(); points.Print();
108d - Display Points with SVG
Now inside the repository there is a small Python script called points2svg
which will actually display these points inside a SVG. And we can actually use this, I made this specifically for this tutorial, we have to save the output of the Thrill program, and the Python script will actually parse this output and find anything that looks like a coordinate, like this here, and transforms this into an SVG point.
So we run Python with this points2svg
script on the points, and the output is a nicely formatted SVG, which I can display using a web browser. Like this. And these are our points, and as you can see zero to one is probably very small. So we will increase the range of coordinates here, and do this whole thing again, such that we can actually see our points.
Ah, we are getting these, so these are 100 uniformly random distributed points. I guess I'll increase the coordinate range to 1000, this seems to be pixels.
Okay, yay, 100 random points, nice. I'll actually make it a few more points, because this is actually kind of sparse. Okay, random points in an SVG.
python ../points2svg.py points.txt > ~/Downloads/points.svg
109 - K-Means: Map to Random Centers
Okay, what are the next steps in k-means?
109b - Select ten of them as centers uniformly at random
So, right, we have to select a number of centers uniformly at random. In Thrill that is easy, we just use the .Sample() method. So let's see, we are going to select ten centers. The number of centers equal ten, and there is going to be a centers DIA, we'll just call it "centers", which are points and we are going to sample num_centers
from these. And as I told you before, Print, Print often, so we are going to print our centers to see that everything works right.
Right, we can remove this one warning, because we don't actually use the index inside the Generate. Right and now we have centers, exactly ten of them, sampled uniformly at random.
static const size_t num_centers = 10; auto centers = points.Sample(num_centers); centers.Print("centers");
109c - Map each point to it's closest center
What is the next step?
Now we have to map each of the points to it's closest center. So we are going to have a .Map(), we're going to call them closest
. We are going to take the Points apply a Map function, and figure out what to do with these points. So for this Map, this Map actually takes a Point and will probably return an integer, or something like that.
But we have to iterate over all of the centers that we have, but the centers
is actually a DIA, so we are going to turn this DIA into an std::vector
of Points, because we have to have the points on every worker. To do that we use an .AllGather()
, which gathers the DIA centers at every worker. We are to renamed this to a DIA otherwise we have two variables with the same name, like this.
And now we can iterate over the centers here inside the Map and return the closest one. For that we actually have to calculate the distance, and that's of course something that we want to add, right; this is this hint here, maybe add a distance()
method to the Point.
auto centers_dia = points.Sample(num_centers); centers_dia.Print("centers_dia"); std::vector<Point> centers = centers_dia.AllGather(); auto points_closest = points.Map( [&](const Point& p) { });
109d - Add distance2() method to Point
So we are going to add a distance()
method. And this is of course a double distance to another Point. And we are going to use the Euclidean distance, which is something like the distance squared of the two coordinates. It's the delta x plus the delta y, both of them squared. And, usually we would want to have the square root of these here, but since we are only going to compare them, we can just skip the square root, and calculate the squared distance instead.
We are going to name this squared, or distance2()
, where the two is for squared. This is of course the Euclidean distance, squared.
//! Euclidean distance squared double distance2(const Point& b) const { return (x - b.x) * (x - b.x) + (y - b.y) * (y - b.y); }
109e - Calculate closest center
And now we can pick the closest point. The easiest way to do this is the best point in the beginning with the best distance is the first one. So the distance squared to the first center.
And then we iterate over all the centers, starting with the first one. We have num_centers
of these, let's use the centers.size()
. We calculate the new distance, which is the point distance squared to the iterated centers. And if d is smaller than the current best distance, we update our counters to best equals i and the best distance that is currently known is d. There we go.
And then we return only best
. There we go. This should work.
Oh, I made some small error. Oh right, centers is not captured. We have to capture this vector, which is of course the same vector on all of the workers, that is of course vital. But we have this due to the AllGather. Right.
There we go. Oh, we have to .Print()
it. Otherwise we don't see anything.
There we go, this is for the 100 or actually 1000 points that we have, the closest randomly selected center. Cool.
auto centers_dia = points.Sample(num_centers); centers_dia.Print("centers_dia"); std::vector<Point> centers = centers_dia.AllGather(); auto closest = points.Map( [&](const Point& p) { size_t best = 0; double best_dist = p.distance2(centers[0]); for (size_t i = 1; i < centers.size(); ++i) { double d = p.distance2(centers[i]); if (d < best_dist) best = i, best_dist = d; } return PointTarget { p, best, 1 }; }); closest.Print("closest");
110 - k-Means: Calculate Better Centers
Let's go back to the slides and figure out what to do next.
So what should the Map output be for the next step, and what is actually the next step?
Okay, let's think for a moment, let's go back to these slides. We now have basically this classification into colors.
What do we have to do next? We have to calculate the new centers as the average of all of these points with the same color. And how do we do this with Thrill?
We are going to use a ReduceByKey or a ReduceToIndex method, which will reduce all of the points which are associated to the same center. And then we want to calculate the new average of all of these points.
How do we do that?
Well, the reduction key is obviously going to be the center ids, that we just calculated here. Right, these center id. So these are the keys, but what are the values that are going to be reduced?
All of the points with the same key should be averaged. So we are going to calculate their sum and their number, to later divide the sum by their number, to get the average points.
Okay, there are some hints listed here. We are going to follow these instructions.
110b - Create new class PointTarget and add ostream operator.
The key idea is to make a second struct called PointTarget, containing point and a new target id. We are going to do that.
So we have a new struct called PointTarget, which contains a Point called point and a target integer called target. And we are immediately going to make this ostream-able, otherwise we can't Print it anymore. So we print both of these components.
struct PointTarget { Point point; size_t target; }; std::ostream& operator << (std::ostream& os, const PointTarget& pt) { return os << '(' << pt.point << ':' << pt.target << ')'; } return PointTarget { p, best };
110c - Reduce everything by target
Right, and now we can return not only the best center, but the PointTarget, namely the point which we are going to average over and the current best target. And due to all of the magic of Thrill, we can actually use this immediately.
Right, now we not only have the target id but also the point itself. And now we are going to reduce everything by this target id. So we are going to use, in this case a ReduceToIndex, which is going to calculate the next centers from the closest points.
We are going to use a ReduceToIndex, where the key extractor function from the PointTarget is of course the target id. Like this. And the reduction function of two PointTargets, a and b, is going to be a new PointTarget, which adds the two points together and the target is of course the same, so we just keep that.
And since we are going to want to look at these, we are going to Print them.
auto next_centers = points_closest.ReduceToIndex( [](const PointTarget& pt) { return pt.target; }, [](const PointTarget& a, const PointTarget& b) { return PointTarget { a.point + b.point, a.target }; }, num_centers); next_centers.Print("next_centers");
110d - Add Vector Sum Operator
Okay, something is wrong here. Ehm, yeah, there is no plus operator for Points, because we didn't define that. We want to be able to add two of these vectors; we actually have to code that.
So the new Point from an operator plus with a second Point just a new Point which adds the two coordinates, like this. This is of course a vector sum operator. And now this should compile.
Point operator + (const Point& b) const { return Point { x + b.x, y + b.y }; }
110e - Missing DIA Output Size of ReduceToIndex()
And it doesn't, I'm sorry, because I missed the ReduceToIndex method, because the ReduceToIndex requires the target size, and that is of course num_centers
.
Right, and now everything works. Right, so we have next_centers
which is a new DIA, created by ReduceToIndex of size exactly ten, because that is the number of clusters that we have. We reduce all of the clusters with the same target, and add together their vectors or points.
110f - Count points associated with the same target center.
Right, but something is still missing here now, because we now want to divide by the number of points that we add into each of these new center slots.
So how do we fix this? Well actually, we'll use the same trick as used in WordCount: we add a count field to the struct, where we are going to count the number of points we add to this structure. And in the beginning this is just one. So when we first emit this, it is just one. And when we aggregate, we aggregate the total counts, like this.
struct PointTarget { Point point; size_t target; size_t count; }; std::ostream& operator << (std::ostream& os, const PointTarget& pt) { return os << '(' << pt.point << ':' << pt.target << ':' << pt.count << ')'; } auto next_centers = points_closest.ReduceToIndex( [](const PointTarget& pt) { return pt.target; }, [](const PointTarget& a, const PointTarget& b) { return PointTarget { a.point + b.point, a.target, a.count + b.count }; }, num_centers);
110g - Divide by the count to get new average Point
Now we not only have the sum for every of these slots, but also the number of points reduced into it. And the only thing we now have to do is to calculate the average, by mapping every one of our PointTargets, which are now aggregated, to the actual average point. So we are going to return the new Point, which are the new centers, by taking the points and dividing by the number of aggregated points into it. So the points coordinates and dividing by the number of points aggregated.
There we are, so I silently the type of next_centers
from PointTarget to Point, but all of that works automatically due to the auto. And these are actually our new centers. And we can now replace the current centers
vector by just using an AllGather, which takes the next_centers
DIA gathers it on all workers and replaces the vector centers
with it.
.Map([](const PointTarget& pt) { return Point { pt.point.x / pt.count, pt.point.y / pt.count }; }); next_centers.Print("next_centers"); centers = next_centers.AllGather();
111 - k-Means: Iterate!
And this is the step that we need to actually do the iteration. Let's skip back to the slides. So we did all of these parts, we added an add method to Points.
And the next step is to iterate ten times. We collect the new centers on all hosts using AllGather and add a for loop for iteration. This is easy, we have to add the for loop once the centers are changed, here. So we are just going to iterate this ten times, and that should be it. And each time improving the centers.
Let's see if this runs correctly.
There we have it. Each time it calculates the next closest points and the new centers. Let's remove the closest points output and only look at the centers.
There we have it. Ten iterations of hopefully always improving centers. They seem to be moving, yes.
for (size_t r = 0; r < 10; ++r) { }
111b - Print Out the Points and Final Colors
Right and now the last thing I want to print out is the final center association. So we are going to take this closest
DIA, which is calculated here, and calculate it one last time.
There we have it that, this is the last output, this is all of the points and their associated centers. The aggregated number is of course only one.
And now the last thing I want is to actually show it to you using SVGs. And I think this Python script will actually do this automatically, right. It will now color these points. Of course there is not enough points, so I will increase the number of points, to let's say 10000, and run everything again.
auto closest = points.Map( [&](const Point& p) { size_t best = 0; double best_dist = p.distance2(centers[0]); for (size_t i = 1; i < centers.size(); ++i) { double d = p.distance2(centers[i]); if (d < best_dist) best = i, best_dist = d; } return PointTarget { p, best, 1 }; }); closest.Print("closest");
And now we have lots of colored points, you probably can see the colors here. So this is one area and inside there still some black dots, because the SVG just contains all of the points inside the log. And this is actually how the centers have moved. I am going to increase the number of points even further and see what happens.
Right, lots and lots of colored points. Oh sorry, I made a mistake, there's lots of black points, because I'm printing the points all the time, so I'll just remove the first Print here. And we're going to leave these center prints, right, such that now the black points in the beginning should not be visible any more.
Let's see.
Right, so these are ten clusters, actually the points in the clusters are colored differently, and the black point in the middle is the center itself.
Pretty cool, huh.
Right, so this was the hands-on live coding tutorial. And there are some more bonus steps here that you can do, for example, you can add input and output to text files, of course. Instead of iterating exactly ten times, you can calculate the total distance that the centers moved every time. And of course, are some more bonus steps, that you can now try to do on you own.
112 - The Thrill Framework: Conclusion
And that brings us to the last section of this rather long tutorial. Let's wrap everything up with a conclusion and future work.
113 - Thoughts on the Architecture
Where do I see Thrill's sweet spot? I believe it is actually two-fold.
On the one hand, Thrill is a very convenient framework to write distributed algorithms in C++. You get a reproducibility of experimental results that you just don't get with JVM-based frameworks.
On the other hand it is a platform to engineer _new_ distributed algorithms with; algorithms which you can later then plug-and-play together with others based on the DIA concept.
Thrill's data layers allow efficient processing of small items and automatic pipelining of DIA operations.
However, there are also some inherent problems with the C++ interface, namely complex template programming, and the overall verbosity of C++, and many manual quirks that are needed for the DIA concept. This brings up the idea, that Thrill may become the platform for building a more convenient domain specific language on, something like a Big Data SQL.
A higher domain specific language could also address one of Thrill's open questions: how to automatically select algorithms based on statistical properties of the data. Thrill currently does not select algorithms at run-time, it is compile-time only, which of course very good for predictability, but often users just don't care about how the data is actually processed.
Another open question for Thrill is that it currently assumes h identical hosts constantly running. This restriction _has_ to be made more flexible when adding fault tolerance or more generally: when going to malleable algorithms, which adapt automatically to the number of cores or hosts available while running.
And in the bigger picture, I find predictability of an algorithm's running time very interesting. Namely, I want to be able to run an algorithm on let's say a thousand items and then predict how long it will take on a million items, and then possibly also suggest to add more compute resources if necessary.
And then there is another research area on how to provide even larger scalability to a million cores or beyond.
And the elephant in the room of all Big Data processing frameworks is memory management. I believe Thrill is going in the right direction here with the three memory areas I mention, however, also that can be improved. Compared with JVM-based frameworks, in C++ at least we have a chance to actually limit the memory of algorithms correctly!
114 - Current and Future Work
Okay, so what is there for future work?
I already mentioned many ideas and open questions.
On the API level there is also the question what lies beyond a DIA? A DIA is a one-dimensional data structure, can one extend Thrill with multi-dimensional data structures like matrices or even tensors and add distributed algorithms for these?
And then there is the possibility of adding a Graph<V,E>, which in a sense is the opposite of the matrices, because locality of data in a graph is defined by the data itself, while in arrays or matrices it is fixed by the data type.
These are the high-layer objects that one could add.
In the lower layers I believe it would be worth-while to rethink the communication abstraction that worked so well in Thrill, and to come up with a better solution that integrates malleability and fault tolerance. This could be an abstraction that integrates flexible changes to the number of cores, workers, hosts, and data storage.
Related to fault tolerance and malleability, but also very interesting on its own is the question of predictability of computing and algorithm running time. In a sense this is actually the original domain of theoretical informatics, but here one could actually try to make statistical models of algorithms and machines and predict how long something would take on real inputs. In the domain of Big Data processing, this is actually worth it due to the long running times.
And Thrill is also a playground for communication efficient algorithms. If you can implement a better Sort algorithm, then immediately a whole range of applications can benefit from it.
And maybe a step beyond everything is to raise the level of convenience of Thrill by creating a more abstract data processing or programming language that complies down to Thrill's primitives. In a sense Thrill would be the distributed algorithm toolkit that is necessary as a foundation of such a high-level language.
Yes, there is lots of do.
Thank you for listening to this quite long tutorial on Thrill, which covered so many things, from simple examples to the complex internals of some of the distributed algorithms.
I hope you had a great time, and that I gave you a good overview of Thrill.