Dask Release 0.17.0
This work is supported by Anaconda Inc. and the Data Driven Discovery Initiative from the Moore Foundation.
I’m pleased to announce the release of Dask version 0.17.0. This a significant major release with new features, breaking changes, and stability improvements. This blogpost outlines notable changes since the 0.16.0 release on November 21st.
You can conda install Dask:
conda install dask -c conda-forge
or pip install from PyPI:
pip install dask[complete] --upgrade
Full changelogs are available here:
Some notable changes follow.
Deprecations
- Removed
dask.dataframe.rolling_*
methods, which were previously deprecated both in dask.dataframe and in pandas. These are replaced with therolling.*
namespace -
We’ve generally stopped maintenance of the
dask-ec2
project to launch dask clusters on Amazon’s EC2 using Salt. We generally recommend kubernetes instead both for Amazon’s EC2, and for Google and Azure as well - Internal state of the distributed scheduler has changed significantly. This may affect advanced users who were inspecting this state for debugging or diagnostics.
Task Ordering
As Dask encounters more complex problems from more domains we continually run into problems where its current heuristics do not perform optimally. This release includes a rewrite of our static task prioritization heuristics. This will improve Dask’s ability to traverse complex computations in a way that keeps memory use low.
To aid debugging we also integrated these heuristics into the GraphViz-style plots that come from the visualize method.
x = da.random.random(...)
...
x.visualize(color='order', cmap='RdBu')
Nested Joblib
Dask supports parallelizing Scikit-Learn by extending Scikit-Learn’s underlying library for parallelism, Joblib. This allows Dask to distribute some SKLearn algorithms across a cluster just by wrapping them with a context manager.
This relationship has been strengthened,
and particular attention has been focused
when nesting one parallel computation within another,
such as occurs when you train a parallel estimator, like RandomForest
,
within another parallel computation, like GridSearchCV
.
Previously this would result in spawning too many threads/processes
and generally oversubscribing hardware.
Due to recent combined development within both Joblib and Dask, these sorts of situations can now be resolved efficiently by handing them off to Dask, providing speedups even in single-machine cases:
from sklearn.externals import joblib
import distributed.joblib # register the dask joblib backend
from dask.distributed import Client
client = Client()
est = ParallelEstimator()
gs = GridSearchCV(est)
with joblib.parallel_backend('dask'):
gs.fit()
See Tom Augspurger’s recent post with more details about this work:
- http://tomaugspurger.github.io/distributed-joblib.html
- joblib/joblib #595
- dask/distributed #1705
- joblib/joblib #613
Thanks to Tom Augspurger, Jim Crist, and Olivier Grisel who did most of this work.
Scheduler Internal Refactor
The distributed scheduler has been significantly refactored to change it from a forest of dictionaries:
priority = {'a': 1, 'b': 2, 'c': 3}
dependencies = {'a': {'b'}, 'b': {'c'}, 'c': []}
nbytes = {'a': 1000, 'b': 1000, 'c': 28}
To a bunch of objects:
tasks = {'a': Task('a', priority=1, nbytes=1000, dependencies=...),
'b': Task('b': priority=2, nbytes=1000, dependencies=...),
'c': Task('c': priority=3, nbytes=28, dependencies=[])}
(there is much more state than what is listed above, but hopefully the examples above are clear.)
There were a few motivations for this:
- We wanted to try out Cython and PyPy, for which objects like this might be more effective than dictionaries.
- We believe that this is probably a bit easier for developers new to the schedulers to understand. The proliferation of state dictionaries was not highly discoverable.
Goal one ended up not working out. We have not yet been able to make the scheduler significantly faster under Cython or PyPy with this new layout. There is even a slight memory increase with these changes. However we have been happy with the results in code readability, and we hope that others find this useful as well.
Thanks to Antoine Pitrou, who did most of the work here.
User Priorities
You can now submit tasks with different priorities.
x = client.submit(f, 1, priority=10) # Higher priority preferred
y = client.submit(f, 1, priority=-10) # Lower priority happens later
To be clear, Dask has always had priorities, they just weren’t easily user-settable. Higher priorities are given precedence. The default priority for all tasks is zero. You can also submit priorities for collections (like arrays and dataframes)
df = df.persist(priority=5) # give this computation higher priority.
Related projects
Several related projects are also undergoing releases:
- Tornado is updating to version 5.0 (there is a beta out now). This is a major change that will put Tornado on the Asyncio event loop in Python 3. It also includes many performance enhancements for high-bandwidth networks.
-
Bokeh 0.12.14 was just released.
Note that you will need to update Dask to work with this version of Bokeh
- Daskernetes, a new project for launching Dask on Kubernetes clusters
Acknowledgements
The following people contributed to the dask/dask repository since the 0.16.0 release on November 14th:
- Albert DeFusco
- Apostolos Vlachopoulos
- castalheiro
- James Bourbeau
- Jon Mease
- Ian Hopkinson
- Jakub Nowacki
- Jim Crist
- John A Kirkham
- Joseph Lin
- Keisuke Fujii
- Martijn Arts
- Martin Durant
- Matthew Rocklin
- Markus Gonser
- Nir
- Rich Signell
- Roman Yurchak
- S. Andrew Sheppard
- sephib
- Stephan Hoyer
- Tom Augspurger
- Uwe L. Korn
- Wei Ji
- Xander Johnson
The following people contributed to the dask/distributed repository since the 1.20.0 release on November 14th:
- Alexander Ford
- Antoine Pitrou
- Brett Naul
- Brian Broll
- Bruce Merry
- Cornelius Riemenschneider
- Daniel Li
- Jim Crist
- Kelvin Yang
- Matthew Rocklin
- Min RK
- rqx
- Russ Bubley
- Scott Sievert
- Tom Augspurger
- Xander Johnson
blog comments powered by Disqus