I’m pleased to announce the release of Dask version 2.2. This is a significant release with bug fixes and new features. The last blogged release was 2.0 on 2019-06-22. This blogpost outlines notable changes since the last post.

You can conda install Dask:

conda install dask

or pip install from PyPI:

pip install dask[complete] --upgrade

Full changelogs are available here:

Notable Changes

As always there are too many changes to list here, instead we’ll highlight a few that readers may find interesting, or that break old behavior. In particular we discuss the following:

  1. Parquet rewrite
  2. Nicer HTML output for Clients and Logs
  3. Hyper-parameter selection with Hyperband in Dask-ML
  4. Move bytes I/O handling out of Dask to FSSpec
  5. async/await everywhere, and cleaner setup for developers
  6. A new SSH deployment solution

1 - Parquet Rewrite

Today Dask DataFrame can read and write Parquet data using either fastparquet or Apache Arrow.

import dask.dataframe as dd

df = dd.read_parquet("/path/to/mydata.parquet", engine="arrow")
# or
df = dd.read_parquet("/path/to/mydata.parquet", engine="fastparquet")

Supporting both libraries within Dask has been helpful for users, but introduced some maintenance burden, especially given that each library co-evolved with Dask dataframe over the years. The contract between Dask Dataframe and these libraries was convoluted, making it difficult to evolve swiftly.

To address this we’ve formalized what Dask expects of Parquet reader/writers into a more formal Parquet Engine contract. This keeps maintenance costs low, enables independent development for each project, and allows for new engines to emerge.

Already a GPU-accelerated Parquet reader is available in a PR on the RAPIDS cuDF library.

As a result, we’ve also been able to fix a number of long-standing bugs, and improve the functionality with both engines.

Some fun quotes from Sarah Bird during development

I am currently testing this. So far so good. I can load my dataset in a few seconds with 1800 partitions. Game changing!

and

I am now successfully working on a dataset with 74,000 partitions and no metadata. Opening dataset and df.head() takes 7 - 30s. (Presumably depending on whether s3fs cache is cold or not). THIS IS HUGE! This was literally impossible before.

The API remains the same, but functionality should be smoother.

Thanks to Rick Zamora, Martin Durant for doing most of the work here and to Sarah Bird, Wes McKinney, and Mike McCarty for providing guidance and review.

2 - Nicer HTML output for Clients and Logs

from dask.distributed import Client
client = Client()

Client

Cluster

  • Workers: 4
  • Cores: 12
  • Memory: 17.18 GB
client.cluster.logs()
Scheduler
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:60275
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
distributed.scheduler - INFO - Register tcp://127.0.0.1:60281
distributed.scheduler - INFO - Register tcp://127.0.0.1:60282
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60281
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60282
distributed.scheduler - INFO - Register tcp://127.0.0.1:60285
distributed.scheduler - INFO - Register tcp://127.0.0.1:60286
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60285
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60286
distributed.scheduler - INFO - Receive client connection: Client-6b6ba1d0-b3bd-11e9-9bd0-acde48001122
tcp://127.0.0.1:60281
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:60281
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:60281
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          3
distributed.worker - INFO -                Memory:                    4.29 GB
distributed.worker - INFO -       Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-c4_44fym
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
tcp://127.0.0.1:60282
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:60282
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:60282
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          3
distributed.worker - INFO -                Memory:                    4.29 GB
distributed.worker - INFO -       Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-quu4taje
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
tcp://127.0.0.1:60285
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:60285
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:60285
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          3
distributed.worker - INFO -                Memory:                    4.29 GB
distributed.worker - INFO -       Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-ll4cozug
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
tcp://127.0.0.1:60286
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:60286
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:60286
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          3
distributed.worker - INFO -                Memory:                    4.29 GB
distributed.worker - INFO -       Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-lpbkkzj6
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------

Note: this looks better under any browser other than IE and Edge

Thanks to Jacob Tomlinson for this work.

3 - Hyperparameter selection with HyperBand

Dask-ML 1.0 has been released with a new HyperBandSearchCV meta-estimator for hyper-parameter optimization. This can be used as an alternative to RandomizedSearchCV to find similar hyper-parameters in less time by not wasting time on hyper-parameters that are not promising.

>>> import numpy as np
>>> from dask_ml.model_selection import HyperbandSearchCV
>>> from dask_ml.datasets import make_classification
>>> from sklearn.linear_model import SGDClassifier

>>> X, y = make_classification(chunks=20)
>>> est = SGDClassifier(tol=1e-3)
>>> param_dist = {'alpha': np.logspace(-4, 0, num=1000),
>>>               'loss': ['hinge', 'log', 'modified_huber', 'squared_hinge'],
>>>               'average': [True, False]}

>>> search = HyperbandSearchCV(est, param_dist)
>>> search.fit(X, y, classes=np.unique(y))
>>> search.best_params_
{'loss': 'log', 'average': False, 'alpha': 0.0080502}

Thanks to Scott Sievert. You can see Scott talk about this topic in greater depth by watching his SciPy 2019 talk.

4 - Move bytes I/O handling out of Dask to FSSpec

We’ve spun out Dask’s internal code to read and write raw data to different storage systems out to a separate project, fsspec.

Here is a small example:

import fsspec

with fsspec.open("https://github.com/dask/dask/edit/master/README.rst") as f:
    print(f.read(1000))

with fsspec.open("s3://bucket/myfile.csv") as f:
    df = pd.read_csv(f)

with fsspec.open("hdfs:///path/to/myfile.csv") as f:
    df = pd.read_csv(f)

with fsspec.open("gcs://bucket/myfile.csv") as f:
    df = pd.read_csv(f)

Dask’s I/O infrastructure to read and write bytes from systems like HDFS, S3, GCS, Azure, and other remote storage systems is arguably the most uniform and comprehensive in Python today. Through tooling like s3fs, gcsfs, and hdfs3 pyarrow.hdfs, it’s easy to read and write data in a Pythonic way to a variety of remote storage systems.

Early on we decided that we wanted this code to live outside of the mainline Dask codebase, which is why they are independent projects. This choice allowed other libraries, like Pandas, Zarr, and others to benefit from this work, without a strict dependency on Dask. However, there was still code within Dask that helped to unify them a bit. We’ve moved this code out to an external project, fsspec which includes all of the centralization code that Dask used to provide, as well as a formal specification for what a remote data system should look like in order to be compatible. This also helps to unify efforts with other projects like Arrow.

Special thanks to Martin Durant for shepherding Dask’s I/O infrastructure over the years, and for doing the more immediate work of splitting out fsspec.

You can read more about FSSpec and its transition out of Dask here.

5 - Async/Await everywhere, and cleaner setup for developers

In Dask 2.0 we dropped Python 2 support and now support only Python 3.5 and above. This allows us to adopt async and await syntax for concurrent execution rather than an older coroutine based approach with yield. The differences here started out as largely aesthetic, but triggered a number of substantive improvements as we walked through the codebase cleaning things up. Starting and stopping internal Scheduler, Worker, Nanny, and Client objects is now far more uniform, reducing the presence of subtle bugs.

This is discussed in more detail in the Python API setup documentation and is encapsulated in this code example from those docs:

import asyncio

from dask.distributed import Scheduler, Worker, Client

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w1, Worker(s.address) as w2:
            async with Client(s.address, asynchronous=True) as client:
                future = client.submit(lambda x: x + 1, 10)
                result = await future
                print(result)

asyncio.get_event_loop().run_until_complete(f())

As a result of this and other internal cleanup intermittent testing failures in our CI have disappeared, and developer mood is high :)

6 - A new SSHCluster

We’ve added a second SSH cluster deployment solution. It looks like this:

from distributed.deploy.ssh2 import SSHCluster  # this will move in future releases

cluster = SSHCluster(
    hosts=["host1", "host2", "host3", "host4"],
    # hosts=["localhost"] * 4  # if you want to try this out locally,
    worker_kwargs={"nthreads": 4},
    scheduler_kwargs={},
    connect_kwargs={"known_hosts": None}
)

Note that this object is experimental, and subject to change without notice

We worked on this for two reasons:

  1. Our user survey showed that a surprising number of people were deploying Dask with SSH. Anecdotally they seem to be just SSHing into machines and then using Dask’s normal Dask Command Line Interface)

    We wanted a solution that was easier than this.

  2. We’ve been trying to unify the code in the various deployment solutions (like Kubernetes, SLURM, Yarn/Hadoop) to a central codebase, and having a simple SSHCluster as a test case has proven valuable for testing and experimentation.

Also note, Dask already has a dask-ssh solution today that is more mature

We expect that unification of deployment will be a central theme for the next few months of development.

Acknowledgements

There have been two releases since the last time we had a release blogpost. The following people contributed to the following repositories since the 2.0 release on June 30th:

  • dask/dask
    • Brett Naul
    • Daniel Saxton
    • David Brochart
    • Davis Bennett
    • Elliott Sales de Andrade
    • GALI PREM SAGAR
    • James Bourbeau
    • Jim Crist
    • Loïc Estève
    • Martin Durant
    • Matthew Rocklin
    • Matthias Bussonnier
    • Natalya Rapstine
    • Nick Becker
    • Peter Andreas Entschev
    • Ralf Gommers
    • Richard (Rick) Zamora
    • Sarah Bird
    • Sean McKenna
    • Tom Augspurger
    • Willi Rath
    • Xavier Holt
    • andrethrill
    • asmith26
    • msbrown47
    • tshatrov
  • dask/distributed
    • Christian Hudon
    • Gabriel Sailer
    • Jacob Tomlinson
    • James Bourbeau
    • Jim Crist
    • Martin Durant
    • Matthew Rocklin
    • Pierre Glaser
    • Russ Bubley
    • tjb900
  • dask/dask-jobqueue
    • Guillaume Eynard-Bontemps
    • Leo Singer
    • Loïc Estève
    • Matthew Rocklin
    • Stuart Berg
  • dask/dask-examples
    • Chris White
    • Ian Rose
    • Matthew Rocklin
  • dask/dask-mpi
    • Anderson Banihirwe
    • Kevin Paul
    • Matthew Rocklin
  • dask/dask-kubernetes
    • Matthew Rocklin
    • Tom Augspurger
  • dask/dask-ml
    • Roman Yurchak
    • Tom Augspurger
  • dask/dask-yarn
    • Al Johri
    • Jim Crist
  • dask/dask-examples

blog comments powered by Disqus