In this session, we're going to try and wrap our minds on something that is inevitable that we might not considered when writing Spark programs thus far, it's called shuffling and it can happen quite a bit. You never call a method called shuffle but as we're going to see there are many methods which transparently cause shuffle to happen. We're going to dive in to some of those scenarios in this session. So, let's first start by looking at a groupBy or groupByKey operation. For a moment just try to think about what these methods do? And what might happen when one of these methods are called? Importantly, you can't forget that our data is distributed. Now, think again about what a groupBy or groupByKey does now with the fact that our data is distributed in your mind. Well, what about when you've been using Spark on your own? Did you notice anything when playing around with Spark for your programming assignments, about the types when you used groupBy, or groupByKey? Here's a very quick example. Here we just make a pair RDD, and then we call groupByKey on it. And this is the return type that Spark gives us, it's something called a ShuffledRDD. Hmm, that's weird. To do a distributed groupByKey, we typically have to move data between nodes so the data can be collected together with its key in a regular, normal single machine Scala collection. Remember that groupByKey collects all of the values associated with the given key and stores them in that single collection. That means that data has moved around the network and doing this, moving the data on the network is called shuffling. What's important to know is that shuffles happen. They happens transparently as a part of operations like groupByKey. And what every Spark program are learns pretty quickly is that shuffles can be an enormous hit to performance because it means that Spark has to move a lot of its data around the network and remember how important latency is. Let's try to better understand what's really happening when we do operations like groupByKey. As usual, we'll look at an example to help clarify. Let's assume we have a case class called CFFPurchase which contains three fields. And integer for the costumerId, a String for the destination city, and a Double representing the price of the ticket that's been purchased as a part of this CFFPurchase and remember the CFF is the Swiss train company. Assuming that we have an RDD that's full of purchases made in the past month, let's try to calculate how many trips and how much money was spent by each individual customer over that last one month period. So with all the knowledge that you have so far what would you do? Which methods would you use? We've been working on pair RDDs quite a bit now, so perhaps the first step is to make a pair RDD. Okay, but what other methods do you use? I'll give you a moment to try and figure it out yourself. Since the end goal of this example is to try and figure out how many trips each customer has made this month and how much money they've spent over the month, the destinations that they've gone to in each purchase isn't important, so we'll drop that when we make our new pair RDD here. We'll have as the key, the customerId and as the value the price of the purchase that this case class instance represents. Next, since the goal is to focus on each individual customer, and to then calculate the number of trips they've made, and how much money they've spent in the past month, it would make sense to group together all of the purchases per customer. So in that case, we reach for good old groupByKey which returns a Pair RDD with a collection of values that correspond to the values that go along with each key. So now we have a Pair RDD where customerId is the key and then a collection of all of the purchase prices of that customer's purchases over the past month is the value. So we have almost everything we need now. The last step is just to figure out how many purchases were made per customer and how much money was spent in total by each customer. What method should we use next? We can use a simple map, or we can even use mapped values. Here, I've used map but mapped values also works just fine. All we have to do is make a new pair containing the number of purchases as the first element of the pair which is easy to do, we can just call size on the collection of values. And for the second element of the pair, we just need to sum up all of the prices in the collection so we can simply call the method sum on the collection of values. And finally, since none of the methods we called so far are actions yet, we have to call an action just to kick off the computation. So in this case, we call collect, to collect all of the result onto the master node. Now that we understand an example solution to this problem, let's step through what's really happening with the data given some sort of example data set. So here, let's imagine our purchases are this short lists of instances of CFFPurchase. How might the cluster look with this data distributed over it? So let's start with the data in the purchasesRdd. Let's imagine that we have three nodes, this one, this one, and this one. And we split up our data over these three nodes evenly. So since we have six data elements in the purchasesRdd, that means, for the sake of this example, we can put two elements on each node. So two here, two here, and two here. Let's for a second imagine that the map function is executed on this data that's producing a set of pairs on each node. Because remember, after we had these CFFPurchases, what we did was we created a pair RDD. So assuming that was evenly executed, it would look like this. So remember in our code example, we got rid of the destination cities and we kept only the customerId number and the price paid per purchase as the key in the value in our key value pairs. So now we went from six instances of CFFPurchase to six instances of key value pairs, two on each of the three nodes. Now if we look back at the code in the example, the next step was to do a groupByKey. And remember, groupByKey results in a single key value pair per key, so think about it. Therefore, a single key value pair cannot span across multiple worker nodes. So let's go back to our visualization of the cluster. What does groupByKey look like in this case? Remember, we have concrete instances of data, so how would this data have to move around? Well, as you might have guessed, to create collections of values to go with each unique key, we have to move key value pairs across the network. We have to collect all of the values for each key on the node that the key is hosted on. So in this example, we've assumed that since there are three unique keys and three nodes, each node will be home to one single key. So we put 100 on this node, 200 on this node, and 300 on that node. So then we move around all of the key value pairs so that all purchases by customer number 100 on this node and all purchases by customer number 200 are on this node and all purchases by customer number 300 are on this node and they're all in this value, which is a collection. This highlighted part here is where all of the data moves around on a network. This part of the operation is the shuffle. Now I'm just going to step back to one of the slides from the beginning of the course about latency. Remember the humanized differences between operations done in memory and operations that require sending data over the network? The humanized difference between these two kinds of latency was on the one hand seconds to minutes and on the other hand up to years. With these numbers in mind,of course we don't want to be sending all of our data over the network if it's not absolutely positively required. Having to do a lot of network communication, kills performance especially if it's being done unnecessarily. So this is why shuffling is bad. We want to do as little shuffling as possible because of these orders of magnitude. So how can we do a better job? What can we do differently to make this example more performant? After all, if you want to do a groupByKey kind of operation, we definitely have to move data over the network. But can we somehow do it in a more efficient way? Perhaps we can reduce before doing the shuffle. This could potentially reduce the amount of data that we actually have to send over the network. To do that, we can use the reduceByKey operator, do you remember this one? We learned about it in one of the previous sessions. And remember, you can think of it as a combination of first doing groupByKey and then reducing overall the values that were grouped by that key in those collections. And I also told you, back when we cover this method, that this one was much more efficient but I didn't show you how yet. We'll look at exactly how this is more efficient to the next example. But first, let's recall reduceByKey's type signature. Remember, since it operates on the values that you assume are already grouped by some key, we focus only on the types of that value. So the function passed to the reduceByKey operator, only operates on the key value of the value pair. So let's go back to our earlier example. Remember, our goal was to figure out how many trips and how much money was spent by each individual customer over the course of the month. So if we were to replace our call to groupByKey with the call to reduceByKey instead, what will we have to pass to reduceByKey? What would the function be here that we'll have to calculate the total number of trips and the total amount of money spent by each customer? Note, to make things a little easier for you, I changed the function that I passed to map here. So it's a little different than it was in the previous example. Now, instead of a Pair RDD with only the customerId and price, I've also added an integer 1 here to make it easier to do the count of the purchases later. So given that, what should the functional literal be that we pass to reduceByKey in order to get a result that looks like this? A pair of customerId with a value that itself is a pair, of trips, total money spent. Remember that types of I just showed you for the reduceByKey operator. We're reducing over the values per key. And since our values are types integer, Double, Where here we have the price and here we have an integer that we can count up, the function that we pass to reduce by key must reduce over two such pairs. I'll give you a moment to try and come up with the solution with this information now available. The solution is pretty simple. Since the values that we're reducing over are pairs, where each element of the pair must be summed with the same elements of the adjacent pairs, all you have to do is sum up the two first elements of the JSON pairs, remove the sum up the two second elements of the JSON pairs, and finally, kick off the computation with .collect, here. So all we're doing is summing up the first two elements and summing up the second two elements in the pairs. So we're adding together 1 + 1, in this case, and some price + some price, in this case. And after that, you just have to invoke some kind of action to kick off computation and that's it. So what might that look like on the cluster that we just visualized? Going back to our visualization, note that I updated the result of the map. So, we now have what we've been working with here with this extra integer there. What happens with this data on the cluster for this code example now? Well, since reduceByKey reduces the data on the mapper side first, the pairs on our three nodes are reduced and they look like this. So here's before, here's after, so what we had was two elements with key 100 on this node, two elements with key 300 on this node. We could reduce them together into a single element that looks like this. Notice that the logic in our reduceByKey was essentially applied to the mapper side first. Imagine if this was a real data set with millions or billions of elements in each node, now we have at most one key value paired per node. So that's potentially a very large reduction in the amount of data that maybe we have to shuffle. The idea is that hopefully we're shuffling less data now and then we do another reduce again after the shuffle. And in the end, we should have the same answer, but we should have arrived at that answer in considerably less time because the goal here is to reduce the amount of network traffic caused when we do one of these operations. So just to summarize, what are the benefits of this approach? Well, by reducing the dataset first, we can reduce the amount of data that's sent over the network during the shuffle which could mean pretty significant gains in performance. And to make this a little more concrete for you I performed this computation, the exact same code on a real cluster and we can easily see the gains in performance. This is done with billions of generated key value pairs representing the same data sets and the same code we just saw on the previous slides on a cluster of I believe six nodes. This was run in a Databricks notebook and what we can see here is the amount of time it took to do the same computation with a groupByKey versus with a reduceByKey and as we can see, the groupByKey variant is exactly what we saw on one of the previous slides. The groupByKey variant takes 15.48 seconds to complete. While the variant using reduceByKey like I said, same as we had on one of the previous slides takes only 4.65 seconds to complete. In this example, the reduceByKey version of this program is up to three times faster than the groupByKey version. Imagine if this was an hours long computation, this would mean a lot of save time on our big data set. So I think we have a little more intuition now about how all of this works. The bottom line is that if we use operations like groupByKey a lot, the active grouping all of the keys with their values requires us to move the values to be on the same machine with their responding keys. The one big question that we haven't yet asked is, how on Earth Spark knows which key to put on which machine? We just assume that Spark is doing something sane and is spreading the data out evenly but, how does that really work? Well, the answer is partitioning. And how this works we'll cover in depth int he next few sessions. And partitioning can make a big difference to how fast your job completes.