On April 24, 2019 Unravel VP of Data Insights Eric Chu presented a session on Use Machine Learning to Get the Most Out of Your Big Data Clusters at Spark AI Summit in San Francisco, CA. You can view the video of the session or read the transcript.
Welcome, everybody. Thank you for coming to the talk. I’m Eric from Unravel Data. And, today, I want to talk about “Use Machine Learning to Get the Most Out of Your Big Data Clusters.”
To set the context, this is about optimizing the performance of your big data applications so that they run reliably, efficiently, and economically, and in turn, you get the most bang of the buck out of the big data clusters that you pay for.
In particular, I want to discuss what machine learning, and more generally artificial intelligence, can be of use for big data application performance management, what are the requirements and challenges of using ML and AI effectively to tackle this problem, and what are the benefits that we can really expect.
Since we are at a Spark Conference, I’m pretty sure that Spark plays a central role in the big data stack of your company. But in many companies, perhaps including yours, Spark is only one of the many systems of the whole big data stack, which is highly distributed in nature.
Your data can come from many places such as logs and different types of media and apps, and can vary from unstructured free text to semi-structured, to traditional structured data.
Even the data storage has many options: S3, data lake, HDFS to name a few. In learning from this data, Spark is probably one of the most popular choices to do so. But even if we just consider Spark, there are different flavors such as Python, Scala, and SQL, etc., and then the results of these apps can, in turn, be used in other places in your business pipeline.
So in this setup, companies have the freedom to play mix and match with the technologies that are best suited for their use cases, and that is one of the strengths of the modern big data stack. However, running an application in this distributed environment in a reliable fashion and efficiently is very difficult.
Take Spark, for example. When you write a Spark program, no doubt have you run into some kind of failures that have taken you a long time to figure out what’s going on or maybe your app does run successfully, but it’s slow, it’s much slower than what you expect. And if your app is part of a pipeline that has some time-critical SLAs, then you’re in real trouble.
Also, many companies are moving their data to the cloud, and cloud is touted as this pay-as-you-go paradigm where you can just pay for what you use. But it’s so easy to spin up a cluster or add a node when you have a spiky workload that people learn in a hard way in reality that the costs quickly get out of control.
So all these symptoms, failed apps, inefficient apps, and high cost, they actually can have one or more root causes. If you’re talking about Scala program, maybe you just have a bug, or if you’re talking about a SQL query, maybe the query is written in a bad way that has an inefficient join, for example, or maybe some of the configurations that have to do with the container sizes or number of executors are sub optimal so you don’t have that optimal degree of parallelism.
Maybe it has nothing to do with your apps at all, but the data, your query has a bad layout like too many small files, or using a text format that is not columnar, or it has a skew.
In those cases, there’s not much you could do unless you are also responsible for the ETL pipeline that produces that data in the first place. Some of these issues are even more remote, like network issues, or machine degradation.
You have a bad note that as an application developer you really have no control. Or some of this have to do with the scheduler settings like the queue, the configuration for the resource for each pool are not optimal for the workload that you are having.
So to find out which of these is the real root cause, you kind of have to play Sherlock. You have to look at the evidence and have some kind of domain expertise, piece together this information and try to find out and make the best judgment.
But all these systems, because of the distributed nature, also mean that the monitoring data is siloed. So it’s not so much about not having data, but then the data, it’s everywhere, and it’s really hard to put them together and understand.
So from DevOps point of view, they are quickly overwhelmed and often can only react to these problems. Most of the problems, in fact, are surfacing up just from the affected users reaching out to them or some boss and high-up in the organization talk to them separately.
Of course, what that means is a lot of money wasted on not just running your apps, but also trying to figure out what the problem is and as well as time.
Well, fortunately, if you have gone through these experiences, you probably have made some observations that serve as the inspiration to solve this problem as a data plus AI problem. First of all, there are lots of data about how the applications run.
Second, a lot of these root causes that I mentioned, they are common and recurring. So, let’s say you have a small files problem, it’s not just going to affect one app, but any app that uses that table with the small files problem, we will have that problem.
So what that means is that a lot of this root cause analysis and recommendations can actually be automated and improved via learning. Earlier today, I went to a talk by Facebook about scaling on Spark, and, in fact, they talked about a lot of these things and even their approaches are very similar.
So the term that I want to bring to your attention is this, probably you have heard of it recently, it’s called AIOps. And that’s different DevOps in that people try to emphasize the use of artificial intelligence and learning to address distributed application performance and operations management.
So how do we do this? First of all, to do machine learning or any kind of AI, you need data. And there are lots of data, as I mentioned, about how the applications run.
You can get resource manager, you can use some of the APIs such as the resource manager API, or the history server API, you can go look at the logs to get some information about the container, or the Hive metastore about the schema, and then the query plan from each query, so on and so forth.
When you get this data, you need to store it in some kind of a central place where you can run, build in intelligence and automation so that you could detect when a problem arises, understand what the root cause is, and then take the necessary action.
This sounds pretty straightforward. But to do so, it actually is quite challenging in many ways. First of all, in terms of data collection, you have to collect data in a very non-intrusive way with low overhead so that you actually don’t affect the ongoing operations on the clusters.
Some of the particular challenges in the cloud include that the cloud can have auto-scaling or the clusters are transient so that all these information you want to get before they disappear or you’re ready to get them when they show up.
Of course, each company’s choice of technologies are different, but, in general, you are dealing with a variety of technologies in your pipelines. And so, you have to also support getting all these different types of information and your system has to scale.
And then the other part about data storage is that some of the information that we’re talking about, even with respect to just one app, arrive at different times, so you don’t really know when you’re going to get that data. So you have to be ready for a synchronous arrival.
And then when it comes to the algorithms to analyze these applications, it has to scale and it has to combine expert knowledge with the data that you collect. It has to be predictable and reliable. That goes without saying, of course, but it’s non-trivial in combination with the other challenges.
So, next, I’m going to talk about some of these approaches that we use to particularly leverage machine learning. The first one is application auto-tuning, and then the second one is going to be cluster-level optimization.
So, going back to that single Spark app that you may have performance issues, when you have a problem, what do you do? You look at the logs, you look at the Spark UI, and you probably spend a lot of time looking at the configurations because there are a bunch of them that affect how fast it runs.
So these are some of them. And there are even different talks at this conference and in the past that talk about some of the best practices to pick a value. But often, you really don’t know what value.
So you have to try just based on your understanding of how these configurations work and how your applications is like, try out different values, and then submit to the program, see if it improves, and then that process can continue on and on.
To abstract this out, we can think of this set of parameters as a vector. And we can imagine that this is a multidimensional space where what you are trying to do is you have this surface that you’re looking for the point where the dimension that represents your performance, that could be maybe duration of your program or the resource that the program is using is at the lowest.
So, now, when you have this abstraction, what you quickly realize is that that surface is actually unknown. And so how can we automate this process? So what we are doing at Unravel is we implement this feature that can automate this process. Users simply talk about, specify what goal they’re trying to do. So, for example, I want to speed up an app or I want to make sure the app runs because it’s failed and then submit along with the app that they submit.
And then what we will do is there’s some kind of algorithm that we will actually look at this single app and then tells you some kind of recommendation. Meanwhile, it will keep learning and then try to improve over time. So, the longer it takes, maybe it has more time to look at that search space, and maybe we’ll find something better.
So what does that really mean in terms of the workflow? So what I just talked about, the one part of it is the probe algorithm where it will take the app that you provide, and the performance goal, and then it will provide some kind of recommendation.
What it needs to be done next is that it automatically submits a new app with this new recommendation to the cluster to run it. This can be on-premise or on the cloud. And then when the new run happens, then we get a new data about this run, and we will combine that with the historical data and the probe data.
So, now, with more data, then we will repeat this process and see if we could come up with a better recommendation algorithm. And this cycle can repeat up to certain times specified by some kind of resource budget that the user is allowed to spend on finding a good solution for this app.
And at any stage of the cycle, there would be chances that where this algorithm would take the user feedback and change its course, if necessary.
Orchestrator simply is a part of the system that takes in the recommendation that we have generated and then submit it automatically to the cluster so that it would start another run.
So to be clear about the probe algorithm, it doesn’t necessarily just take one run. It can actually take in all the past runs and then combine the data that we have gained in the new run. So every time when we keep doing this process, we basically are looking at all the history that we have regarding that application. And then we repeat this process until some stopping condition is reached.
One key in this approach is that there are different ways that we could look for this space, and we want a balance between exploration and exploitation. So, first, when you just have one data point, you have some domain expertise that you apply to the algorithm, you may find one point and then you apply to it, and then it gives you some encouraging results, and you might keep going in that direction.
But in a multi-dimensional space, it’s also possible that you actually get stuck in some local minimum and local maximum. So, at some point, you have to balance out to explore some parts where you have not before and see if actually that gets you out of the local main to find you the even better answer.
So, in this graph, what I’m trying to illustrate is that in these two points which are much closer in this dimension of the attribute, you know that this is a good performance, and you want to exploit the knowledge that you have by going in between.
But then the other argument is that if you look at these two points that are quite far apart and that means there’s a lot of space that you haven’t explored, and you might want to try that out to see if actually you have a better performance.
So this is one example of applying auto-tuning to a Spark application. So it started out…the first row is the first one where it started out, the app has failed after running for about 15.5 minutes, and we can analyze that, just to see that it failed because of out of memory.
Then what it does is we can tune, and recommend using more memory and automatically run it again. And, after the first time, we’re able to make it run successfully within 4.5 minutes.
But that’s just one run. So, after the second run, we combine the first and the second run of the data and we observe that actually, the usage of the memory is much lower than what we allocate to make it run successfully in the first place. So then the algorithm detects that, and it tries again with the lower memory configuration.
After a few rounds, it is able to bring it down to around a minute and a little bit over a minute. And here it ends in five because the resource budget that we’ve given it is in terms of how many times we want to run this application, and it ends after five. But you can see that after basically the third round, the performance has converged.
So, obviously, there’s some challenges and limitations to this approach. When we are talking about tuning an app, that can have problems in any parts of the big data stack. First of all, when we’re talking about tuning one single app, this app is something that people might run periodically. And, every time they run, it’s possible that the input data, maybe the same table but on a different time.
So what that means is that the data volume could change over time, and what that means is that the recommendations that you have for some point X may not be the best when it has some time after X. And the worst thing is they could even make it worse, the recommendations.
So what we did there is we…this is something that the learning can, in a sense, take into account, the more runs you do or you provide as part of the history, the more accurately it can mitigate that problem.
The other part that we’re working on now is actually automatically matching incoming query into the corpus of queries that we have tracked so that it’s almost like some kind of query clustering so that when we have an incoming query, we automatically know that there are some queries that are similar to that, and we could actually use the whole history that we’ve seen with those queries to come up with some kind of t-shirt sizing of the configurations for that app but in response to different data size.
Auto-tuning, the one that I talked about, mostly has to do with configurations that Unravel could automatically replace with the recommended values and try again without user intervention. However, there are many problems that does require user feedback, some things like small files or things that we cannot just fix by changing some configurations, or some SQL queries that need to be rewritten.
That actually is something that we’re working on actively, but it’s still not ready to be brought out in a general feature. And also, from a semantic point of view, even if we can automatically rewrite SQL queries, users would likely want to review them to make sure that the intent hasn’t changed.
Other issues that I mentioned like hardware issues or poor resource allocations, if those turn out to be the root cause of your performance problem, then those are things that you likely have to make changes in that area as opposed to changing the configuration of your application.
So this is the part where user feedback is needed and that’s something that I also want to take the opportunity to introduce my colleague’s talk tomorrow, “An AI-Powered Chatbot to Simplify Spark Performance Management.” So if you’re interested, please go to that talk.
So, next, I’m gonna talk about cluster level optimization. So, we’ve talked about how we need to collect data from various places to tune a single application. Of course, when you have built a system to collect all this information, you’re not just going to collect this for a single query, but you will collect all the queries that run on your cluster.
And in doing so, you basically have weaved a really rich corpus of data about how your cluster perform with all the applications. And combining that, you can start addressing some of these performance issues not at the application level, but at the cluster level, some things like the scheduler configurations for the queues.
So that really requires you to know the workload for each queue, and if you also know the time, then you can basically replay and see, compare the pending and the available resources. And then you could have sophisticated algorithms to actually try out different resource configuration for the scheduler and see if it comes up with a better expected performance.
So, we actually have some past work that we published in a research conference, and it’s very complicated. So, today, I’m just going to mention that in briefly. But other examples include Schema and data insights.
If you have the metastore and you have access to the HDFS, then you could collect data and recognize things like small files in the table or data skew, things that would affect multiple apps, and combining with the app information, you can also get a sense of how severe your problem is because you can tell users that, “Hey, your table has small files and this table actually is accessed by a certain percentage of applications per day and it causes that amount of negative performance in fact.”
What I’m going to talk about next is more on building on top of the application tuning work that we are doing, first of all is the cluster default for some of these parameters, and then next, I’m going to talk about the cluster cost optimization.
So for cluster level default, what I was trying to say is that single app tuning, being the ability to drill down deeply and try out different values of a set of configurations is very powerful because it saves time from users to do that manually and then not knowing what values they should try.
However, this is just about one single app. Your cluster probably is running thousands of apps every day, and some of these are not so bad that you would want to invest in extra resources to try to find out what is the better configuration.
There are probably many okay apps that could be improved, but people just…this approach just does not scale to be used for that high number of apps.
What we observed is that the default that comes with the cluster installations are often sub optimal for the workload that users are having. So what we are trying to do is let’s look at the workload that we have seen and try to figure out some cluster defaults that would be better for the overall workload.
So if you know some better values for the cluster default, you can simply set it so that every single app would use the new values automatically. So this is really powerful because you don’t have to individually explicitly override a single application. But the challenge, of course, is how do you know the new default and what do we mean by better?
Because now that we’re talking about the default, unless you override it, any app would use that configuration value. So while that default could be better for some apps, it might be worse for others. Take memory, for example.
If you say memory, some apps could have used less memory and you try to change that, but then some other apps actually still require that much memory such that your new configuration could lead to an app failure from out of memory.
How do you balance that? Well, so remember this is a cluster default that we’re talking about. So what we want to do is to look at the workload and then pick the best new default for the overall workload, so that it may not be the best for a single app, but it’s the best overall.
And the way we do that is we look at…we observed our past workload, so we need to specify a time window and other optional parameters, and then we go through all the apps in the window. Depending on the parameter, we have different algorithms to identify one or more candidates that improves the performance of the workload.
And, in doing so, we also have to evaluate both the benefit and the risk, if it exists, for each of these candidates. And, of course, for some of the configurations, there’s no one final right value, but we do suggest one candidate so that users have some sense of what we are suggesting, but they could always pick the other candidates.
Now, this is a screenshot of the result of two parameters that affect the memory allocation of containers for MapReduce. And, here, if you see, the current default is four gigabytes, and over on the right we have a bunch of candidates. And then for each of the candidates, we would show the reward and the risk. The reward, in this case, is how much memory you would save assuming you have a similar workload by changing the default of this candidate. The risk is the percentage of the jobs that still run or actually the reverse of that, it’s like how…the risk measurement is to give you a sense of how many jobs from your workload would still run with this new default.
So what you’re seeing here, for example, is if you lower your default from four gigabytes to two gigabytes, then you can save about 50% of memory from 97% of the workload. Ninety seven percent represents the percent of the jobs that are actually using the default, which is a very important measure you want to look at because if you actually see that most of your jobs are not using the default, then there’s not much use to actually change the cluster default.
The other side of the story is the risk. So, obviously, if you lower it to the extreme, you will save a lot of memory, but then that’s also probably because a lot of the apps fail because they are running out of memory.
So this is one example where it doesn’t have one right answer. In the past, some companies have taken different approaches. Some of them really want to cut down on the memory that they’re using, and so they would actually choose a default that saves a higher amount of memory with a lower percentage of jobs that would still be guaranteed to run with the new default.
So, in other words, they want to take the chance of having jobs failed because they care more about… they can afford to have those jobs fail and they care more about cutting the amount of memory. And what they will do is that if those people have failed jobs, then they would come talk to the DevOps and request an override. Other companies are less tolerant of failed jobs.
And so in this range of candidates, they would pick something that is safer. What that means is they would save less memory, but they would know that 90% of the jobs will still be able to run.
So how do we evaluate whether the new cluster default is actually useful? So we have to come up with some kind of cluster level KPIs. So the window that we use before is kind of the input workload window where it tells you the performance before you change the default.
After you have come up with some new default, you would apply that, and then you would let it run. And, after some time, you would also do a measurement over the new window. And, here the assumption is that the cluster workload characteristics don’t really change.
You may have different queries, of course, but overall, in most places, the cluster workload is about the same in that they have periodic workloads running every week and then with the newest data. So, even though they’re not exactly the same workload, they can be compared.
And the KPIs that we used to compare are the total number of apps per day, and the number of Vcore hours, and the number of memory hours per day.
The per day part is really there because the window can be different, so if you take a seven-day window before you change the cluster default and then afterward, you take a 10-day, then obviously your total number of apps or your total number of Vcore hours would be different because one window is much longer than the other.
But if you take a long enough window and you do a daily average, you’ll effectively get a sample that pretty accurately reflects any changes that are due to the cluster default changes. And lastly, of course, as I mentioned before, you do want to be mindful about what percentage of apps were actually using the default in the first place because if you saw that the first time that only 20% of the apps are using the default, then you wouldn’t expect this to affect your cluster performance that much.
So this is one customer that uses this feature, and then it does a comparison. So this is Hive on MapReduce. And what you see is on the per day measure, the number of jobs before and after goes from 902 jobs to almost 2000 jobs.
And with that increase in the number of jobs that they can do per day, you would expect that the number of Vcore hours would also increase dramatically. But here, surprisingly, actually it’s almost half of that resource. So that kind of underscores how wasteful the cluster default is before for the given workload.
And, for Spark, we are rolling out this feature, planning to roll out this very soon. The reason why it took longer is because we developed it later and also there are a lot more knobs to tune. And a lot of these configurations that you probably have personally played with, you will know that they are not independent of each other.
Your algorithm needs to take into account of changing one versus the other, and that’s where the learning and modeling are really helpful.
Now, extending beyond that, having this data about each app that you’re running, where they are running, you could construct other pieces of information that is helpful in understanding your cluster cost. You can do chargeback, which is very important to business because they want to know how much money they’re spending and how to improve from that.
That is closely tied to resource utilization and how much…especially the amount of CPU and memory and also the disk which you can forecast and see how much you need to allocate or expect in the future.
So, a lot of this, maybe not the forecasting, but the cluster workload and chargebacks, etc., they don’t really need a lot of learning. They’re just about capturing their data and show it. But there are always opportunities to leverage learning to improve your cluster performance.
One of it is…one common question on the cloud is that when people are trying to allocate nodes for their clusters, they often wonder how many nodes that they need to allocate and also what type of notes they need to use. If they don’t, they’re not sure, then they usually allocate more just to make sure that their workload still runs.
That’s something that we could easily match the resource utilization data combining with the cluster default to construct a much more economical proposal for them, even as simple as observing the actual utilization is much lower than what they allocate, and we could map it to the different options that they could do.
That could be very helpful. And then if you add on to what they could set for the cluster defaults to actually make the resource even more efficiently used, then they could further reduce the number of nodes or the instance that they need while guaranteeing the same performance.
And, obviously, this has very significant implications on the cloud story. One part is when companies move the data from on-prem to the cloud, then they need to figure out, for the workload that they have on-prem, what kind of clusters they need to allocate, and how big, and what types, etc.
Even when they are already on the cloud, they can use this information to improve how much they spend or what they allocate the ephemeral clusters and also to improve on their auto-scaling rules for their permanent clusters so they fit better to their actual usage.
So I’m going to wrap up now. And I hope in this talk that I have conveyed to you that there are indeed a lot of opportunities to address this distributed application performance management with a combined approach of data and artificial intelligence.
To really use learning effectively, the prerequisite is that we have all the data, the relevant data that we need, but we also, in terms of constructing such a system, it’s also very important to make sure that we collect this data in a non-intrusive way and scalable.
And lastly, well, to improve your application performance, you don’t have to start with fancy models. You can start with some expert knowledge, and then improve on that knowledge based on the data that you collect. And what we’ve learned is that this is the best use of the data that we’ve collected and then also if we can make sure that the learning that we use are very specific to the use cases we want to support.
So, thank you for coming to the talk. I’m open to questions. If you want to try it out, here’s the URL. And if you are interested in joining my team, then come talk to me or email me. Thank you very much.
So, thanks for the great talk. So I’m a little curious that you talk about the importance of being able to kind of explore the space, but exploration has a cost. And so can you talk a little more about how you balance the cost with the value of more knowledge?
So the parameter that we exposed to users is simply the budget, which is expressed in terms of the number of runs that they are willing to run. So imagine this is like a cluster where they have some problematic application, and they wanted to improve that.
And they care enough to be willing to say that, “Hey, I’m allowing Unravel to run these a few times. Learn from that, and then tell me.” So usually it’s about five runs or so, we would have a pretty good idea.
So it’s a specific mode that you have that would let the algorithm gather knowledge and you take it out of that?
Yeah. And then internally, when we have these number of runs, then we track like when are we doing exploring, when are we doing exploiting, and that is something in-house.
But the one thing that I mentioned in passing is that what we are looking into is to automatically cluster similar queries so that people may not even need to specify a budget, but we have enough history because assuming they have run these before and we track all the queries, that we could actually see different runs that represent the same application and then see the data volume changes or the different configuration value to be more accurately predict what would be a good value. So we are expecting that we are not going to completely remove the budget approach, but we will combine that with the history that we have collected already.
Hey, Eric, I have two questions. So regarding the application level tuning, so there are two very common challenges, the first one is that the cost of running the application can be really large, it might not be 15 minutes, it might be, say, 15 hours in some cases, and the second is that the runtime, there can be a large variance of the runtime.
So, even if we don’t change anything, just have two completely same runs with exactly same configs, there can be a large variance of the runtime. So I’m wondering when you’re dealing with the application level tuning, how do you deal with these two challenges?
So the way that we deal with…what you said is definitely possible. So the key to really come up with some accurate recommendation is based on the fact that you are looking at all the runs that you have on your hands. So if you just have one of those two apps, then you would have a very biased visibility. So it goes back to how can you discover all these queries that are actually about the same, that are referring to the same application, and then consider all that data when you provide the recommendation.
Yeah, but even if we have the whole history, say we have 90 days of run of this single job, we still have this issue of we have to try…each iteration is actually very costly. It could take many, many hours for each run, so how do you…?
Well, so I wanna answer that in a couple of ways. So first of all, if you have different runs and they represent very different configurations, what we can do is not necessarily come up with the best configuration because you also don’t know that the inputs of these different applications can be different. Data volume is a good example.
So what you are saying is if you have different runs that have very different performance, you really don’t know in the future when you run it again what kind of input data or the volume that you’re going to see.
So, right now, we don’t really address that problem, but what we’re planning to do is if we can look at all the historical runs that have already happened, so we’re not asking you to run more, then we could leverage that data and then provide a landscape that could tell you, “Hey, if your data volume is this much, then this is the best configuration.”
If it’s another amount, then you would have some other configuration. That’s something you can definitely do when you have the data. That’s what I was referring to when I mentioned the t-shirt sizing. As to the second part of your question when you are saying, “Oh, it’s very expensive to run,” I think this is something that would require some external decision to be made that is outside of this auto-tuning scope.
It’s a reasonable question, and then people would have to decide does it make sense to actually do sampling just to move the needle a little bit. There’s no shortcut around that.
This video first appeared here on the Spark AI Summit website.