By Matthew Rocklin, Continuum Analytics.
This work is supported by Continuum Analytics the XDATA Program and the Data Driven Discovery Initiative from the Moore Foundation
Editor's note: For an introduction to Dask, consider reading Introducing Dask for Parallel Programming: An Interview with Project Lead Developer. To read more about the most recent release, see Dask Release 0.14.1.
This post talks about distributing Pandas Dataframes with Dask and then handing them over to distributed XGBoost for training.
More generally it discusses the value of launching multiple distributed systems in the same shared-memory processes and smoothly handing data back and forth between them.
- Github issue
XGBoost is a well-loved library for a popular class of machine learning algorithms, gradient boosted trees. It is used widely in business and is one of the most popular solutions in Kaggle competitions. For larger datasets or faster training, XGBoost also comes with its own distributed computing system that lets it scale to multiple machines on a cluster. Fantastic. Distributed gradient boosted trees are in high demand.
However before we can use distributed XGBoost we need to do three things:
- Prepare and clean our possibly large data, probably with a lot of Pandas wrangling
- Set up XGBoost master and workers
- Hand data our cleaned data from a bunch of distributed Pandas dataframes to XGBoost workers across our cluster
This ends up being surprisingly easy. This blogpost gives a quick example using Dask.dataframe to do distributed Pandas data wrangling, then using a new dask-xgboost package to setup an XGBoost cluster inside the Dask cluster and perform the handoff.
After this example we’ll talk about the general design and what this means for other distributed systems.
We have a ten-node cluster with eight cores each (
m4.2xlarges on EC2)
We load the Airlines dataset using dask.dataframe (just a bunch of Pandas dataframes spread across a cluster) and do a bit of preprocessing:
This loaded a few hundred pandas dataframes from CSV data on S3. We then had to downsample because how we are going to use XGBoost in the future seems to require a lot of RAM. I am not an XGBoost expert. Please forgive my ignorance here. At the end we have two dataframes:
df: Data from which we will learn if flights are delayed
is_delayed: Whether or not those flights were delayed.
Data scientists familiar with Pandas will probably be familiar with the code above. Dask.dataframe is very similar to Pandas, but operates on a cluster.
Categorize and One Hot Encode
XGBoost doesn’t want to work with text data like destination=”LAX”. Instead we create new indicator columns for each of the known airports and carriers. This expands our data into many boolean columns. Fortunately Dask.dataframe has convenience functions for all of this baked in (thank you Pandas!)
This expands our data out considerably, but makes it easier to train on.
Split and Train
Great, now we’re ready to split our distributed dataframes
Start up a distributed XGBoost instance, and train on this data
Great, so we were able to train an XGBoost model on this data in about a minute using our ten machines. What we get back is just a plain XGBoost Booster object.
We could use this on normal Pandas data locally
Of we can use
dask-xgboost again to train on our distributed holdout data, getting back another Dask series.
We can bring these predictions to the local process and use normal Scikit-learn operations to evaluate the results.
We might want to play with our parameters above or try different data to improve our solution. The point here isn’t that we predicted airline delays well, it was that if you are a data scientist who knows Pandas and XGBoost, everything we did above seemed pretty familiar. There wasn’t a whole lot of new material in the example above. We’re using the same tools as before, just at a larger scale.
OK, now that we’ve demonstrated that this works lets talk a bit about what just happened and what that means generally for cooperation between distributed services.
What dask-xgboost does
The dask-xgboost project is pretty small and pretty simple (200 TLOC). Given a Dask cluster of one central scheduler and several distributed workers it starts up an XGBoost scheduler in the same process running the Dask scheduler and starts up an XGBoost worker within each of the Dask workers. They share the same physical processes and memory spaces. Dask was built to support this kind of situation, so this is relatively easy.
Then we ask the Dask.dataframe to fully materialize in RAM and we ask where all of the constituent Pandas dataframes live. We tell each Dask worker to give all of the Pandas dataframes that it has to its local XGBoost worker and then just let XGBoost do its thing. Dask doesn’t power XGBoost, it’s just sets it up, gives it data, and lets it do it’s work in the background.
People often ask what machine learning capabilities Dask provides, how they compare with other distributed machine learning libraries like H2O or Spark’s MLLib. For gradient boosted trees the 200-line dask-xgboost package is the answer. Dask has no need to make such an algorithm because XGBoost already exists, works well and provides Dask users with a fully featured and efficient solution.
Because both Dask and XGBoost can live in the same Python process they can share bytes between each other without cost, can monitor each other, etc.. These two distributed systems co-exist together in multiple processes in the same way that NumPy and Pandas operate together within a single process. Sharing distributed processes with multiple systems can be really beneficial if you want to use multiple specialized services easily and avoid large monolithic frameworks.
Connecting to Other distributed systems
A while ago I wrote a similar blogpost about hosting TensorFlow from Dask in exactly the same way that we’ve done here. It was similarly easy to setup TensorFlow alongside Dask, feed it data, and let TensorFlow do its thing.
Generally speaking this “serve other libraries” approach is how Dask operates when possible. We’re only able to cover the breadth of functionality that we do today because we lean heavily on the existing open source ecosystem. Dask.arrays use Numpy arrays, Dask.dataframes use Pandas, and now the answer to gradient boosted trees with Dask is just to make it really really easy to use distributed XGBoost. Ta da! We get a fully featured solution that is maintained by other devoted developers, and the entire connection process was done over a weekend (see dmlc/xgboost #2032 for details).
Since this has come out we’ve had requests to support other distributed systems like Elemental and to do general hand-offs to MPI computations. If we’re able to start both systems with the same set of processes then all of this is pretty doable. Many of the challenges of inter-system collaboration go away when you can hand numpy arrays between the workers of one system to the workers of the other system within the same processes.
Thanks to Tianqi Chen and Olivier Grisel for their help when building and testing
dask-xgboost. Thanks to Will Warner for his help in editing this post.
Bio: Matthew Rocklin is an open source software developer focusing on efficient computation and parallel computing, primarily within the Python ecosystem. He has contributed to many of the PyData libraries and today works on Dask a framework for parallel computing. Matthew holds a PhD in computer science from the University of Chicago where he focused on numerical linear algebra, task scheduling, and computer algebra. Matthew lives in Brooklyn, NY and is employed by Continuum Analytics.
Original. Reposted with permission.
- Introducing Dask for Parallel Programming: An Interview with Project Lead Developer
- XGBoost: Implementing the Winningest Kaggle Algorithm in Spark and Flink
- A Beginner’s Guide to Tweet Analytics with Pandas