Dask是一个分布式DataFrame,跟Spark类似,但是用于python环境(Spark使用Scala和Java,PySpark通过py4j进行包装后执行)。Dask支持单机、多机并行,这里介绍安装与使用方法。
1、快速安装
通过conda:
conda install dask
通过pip:
python -m pip install "dask[complete]" # Install everything
- 详细参考:https://docs.dask.org/en/latest/install.html#
- 使用中多节点无法连接的话,一般是版本不一致的问题。有时需要先卸载dask和dietributed再重新安装,或者直接指定安装的版本。
2、快速使用
import dask.dataframe as dd df = dd.read_csv(...) df.x.sum().compute() # This uses the single-machine scheduler by default
from dask.distributed import Client client = Client(...) # Connect to distributed cluster and override default df.x.sum().compute() # This now runs on the distributed system
3、分布式环境
安装:
conda install dask distributed -c conda-forge
#或者
python -m pip install dask distributed --upgrade
运行主节点:
$ dask-scheduler
Scheduler at: tcp://192.0.0.100:8786
增加子节点:
$ dask-worker tcp://192.0.0.100:8786
Start worker at: tcp://192.0.0.1:12345
Registered to: tcp://192.0.0.100:8786
$ dask-worker tcp://192.0.0.100:8786
Start worker at: tcp://192.0.0.2:40483
Registered to: tcp://192.0.0.100:8786
$ dask-worker tcp://192.0.0.100:8786
Start worker at: tcp://192.0.0.3:27372
Registered to: tcp://192.0.0.100:8786
指定服务端口:
dask-scheduler --port 8000
dask-worker --dashboard-address 8000 --nanny-port 8001
查看运行状态:
- 浏览器输入: http://192.168.199.173:8787
使用:
from dask.distributed import Client
client = Client('192.168.199.173:8786')
4、Kubernetes执行节点调度
Dask Kubernetes provides cluster managers for Kubernetes.
KubeCluster
deploys Dask clusters on Kubernetes clusters using native Kubernetes APIs. It is designed to dynamically launch ad-hoc deployments.HelmCluster
is for managing an existing Dask cluster which has been deployed using Helm. You must have already installed the Dask Helm chart and have the cluster running. You can then use it to manage scaling and retrieve logs.
For more general information on running Dask on Kubernetes see this page.
4.1 HelmCluster
Quickstart
helm repo add dask https://helm.dask.org helm repo update helm install myrelease dask/dask
from dask_kubernetes import HelmCluster cluster = HelmCluster(release_name="myrelease") cluster.scale(10) # specify number of workers explicitly
For more information see the HelmCluster
API reference.
Warning
It is not possible to use HelmCluster
from the Jupyter session which is deployed as part of the Helm Chart without first copying your ~/.kube/config
file to that Jupyter session.
API
HelmCluster ([release_name, auth, namespace, …]) |
Connect to a Dask cluster deployed via the Helm Chart. |
HelmCluster.scale (n_workers) |
Scale cluster to n workers. |
HelmCluster.adapt (*args, **kwargs) |
Turn on adaptivity (Not recommended). |
HelmCluster.logs (*args, **kwargs) |
class dask_kubernetes.
HelmCluster
(release_name=None, auth=[<dask_kubernetes.auth.InCluster object>, <dask_kubernetes.auth.KubeConfig object>], namespace=None, port_forward_cluster_ip=False, loop=None, asynchronous=False, scheduler_name='scheduler', worker_name='worker', node_host=None, node_port=None, **kwargs)[source]
Connect to a Dask cluster deployed via the Helm Chart.
This cluster manager connects to an existing Dask deployment that was created by the Dask Helm Chart. Enabling you to perform basic cluster actions such as scaling and log retrieval.
Parameters: | release_name: str Name of the helm release to connect to. namespace: str (optional) Namespace in which to launch the workers. Defaults to current namespace if available or “default” port_forward_cluster_ip: bool (optional) If the chart uses ClusterIP type services, forward the ports locally. If you are using auth: List[ClusterAuth] (optional) Configuration methods to attempt in order. Defaults to scheduler_name: str (optional) Name of the Dask scheduler deployment in the current release. Defaults to “scheduler”. worker_name: str (optional) Name of the Dask worker deployment in the current release. Defaults to “worker”. node_host: str (optional) A node address. Can be provided in case scheduler service type is node_port: int (optional) A node address. Can be provided in case scheduler service type is **kwargs: dict Additional keyword arguments to pass to Cluster. |
---|
Examples
>>> from dask_kubernetes import HelmCluster >>> cluster = HelmCluster(release_name="myhelmrelease")
You can then resize the cluster with the scale method
>>> cluster.scale(10)
You can pass this cluster directly to a Dask client
>>> from dask.distributed import Client >>> client = Client(cluster)
You can also access cluster logs
>>> cluster.get_logs()
Attributes: | asynchronous dashboard_link name observed plan requested scheduler_address |
---|
Methods
adapt (*args, **kwargs) |
Turn on adaptivity (Not recommended). |
get_logs () |
Get logs for Dask scheduler and workers. |
scale (n_workers) |
Scale cluster to n workers. |
close | |
from_name | |
logs | |
sync |
adapt
(*args, **kwargs)[source]
Turn on adaptivity (Not recommended).
get_logs
()[source]
Get logs for Dask scheduler and workers.
Examples
>>> cluster.get_logs() {'testdask-scheduler-5c8ffb6b7b-sjgrg': ..., 'testdask-worker-64c8b78cc-992z8': ..., 'testdask-worker-64c8b78cc-hzpdc': ..., 'testdask-worker-64c8b78cc-wbk4f': ...}
Each log will be a string of all logs for that container. To view it is recommeded that you print each log.
>>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"]) ... distributed.scheduler - INFO - ----------------------------------------------- distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Scheduler at: tcp://10.1.6.131:8786 distributed.scheduler - INFO - dashboard at: :8787 ...
scale
(n_workers)[source]
Scale cluster to n workers.
This sets the Dask worker deployment size to the requested number. Workers will not be terminated gracefull so be sure to only scale down when all futures have been retrieved by the client and the cluster is idle.
Examples
>>> cluster HelmCluster('tcp://localhost:8786', workers=3, threads=18, memory=18.72 GB) >>> cluster.scale(4) >>> cluster HelmCluster('tcp://localhost:8786', workers=4, threads=24, memory=24.96 GB)
4.2、KubeCluster
Quickstart
from dask_kubernetes import KubeCluster cluster = KubeCluster.from_yaml('worker-spec.yml') cluster.scale(10) # specify number of workers explicitly cluster.adapt(minimum=1, maximum=100) # or dynamically scale based on current workload
# worker-spec.yml kind: Pod metadata: labels: foo: bar spec: restartPolicy: Never containers: - image: daskdev/dask:latest imagePullPolicy: IfNotPresent args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60'] name: dask env: - name: EXTRA_PIP_PACKAGES value: git+https://github.com/dask/distributed resources: limits: cpu: "2" memory: 6G requests: cpu: "2" memory: 6G
# Example usage from dask.distributed import Client import dask.array as da # Connect Dask to the cluster client = Client(cluster) # Create a large array and calculate the mean array = da.ones((1000, 1000, 1000)) print(array.mean().compute()) # Should print 1.0
For more information see the KubeCluster
API reference.
Best Practices
- Your worker pod image should have a similar environment to your local environment, including versions of Python, dask, cloudpickle, and any libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn). See
dask_kubernetes.KubeCluster
docstring for guidance on how to check and modify this. - Your Kubernetes resource limits and requests should match the
--memory-limit
and--nthreads
parameters given to thedask-worker
command. Otherwise your workers may get killed by Kubernetes as they pack into the same node and overwhelm that nodes’ available memory, leading toKilledWorker
errors. - We recommend adding the
--death-timeout, '60'
arguments and therestartPolicy: Never
attribute to your worker specification. This ensures that these pods will clean themselves up if your Python process disappears unexpectedly.
GPUs
Because dask-kubernetes
uses standard kubernetes pod specifications, we can use kubernetes device plugins and add resource limits defining the number of GPUs per pod/worker. Additionally, we can also use tools like dask-cuda for optimized Dask/GPU interactions.
kind: Pod metadata: labels: foo: bar spec: restartPolicy: Never containers: - image: rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.8 imagePullPolicy: IfNotPresent args: [dask-cuda-worker, $(DASK_SCHEDULER_ADDRESS), --rmm-pool-size, 10GB] name: dask-cuda resources: limits: cpu: "2" memory: 6G nvidia.com/gpu: 1 # requesting 1 GPU requests: cpu: "2" memory: 6G nvidia.com/gpu: 1 # requesting 1 GPU
Configuration
You can use Dask’s configuration to control the behavior of Dask-kubernetes. You can see a full set of configuration options here. Some notable ones are described below:
-
kubernetes.worker-template-path
: a path to a YAML file that holds a Pod spec for the worker. If provided then this will be used whendask_kubernetes.KubeCluster
is called with no arguments:cluster = KubeCluster() # reads provided yaml file
-
distributed.dashboard.link
: a Python pre-formatted string that shows the location of Dask’s dashboard. This string will receive values forhost
,port
, and all environment variables.For example this is useful when using dask-kubernetes with JupyterHub and nbserverproxy to route the dashboard link to a proxied address as follows:
"{JUPYTERHUB_SERVICE_PREFIX}proxy/{port}/status"
-
kubernetes.worker-name
: a Python pre-formatted string to use when naming dask worker pods. This string will receive values foruser
,uuid
, and all environment variables. This is useful when you want to have control over the naming convention for your pods and use other tokens from the environment. For example when using zero-to-jupyterhub every user is calledjovyan
and so you may wish to usedask-{JUPYTERHUB_USER}-{uuid}
instead ofdask-{user}-{uuid}
. Ensure you keep the ``uuid`` somewhere in the template.
Role-Based Access Control (RBAC)
In order to spawn a Dask cluster, the service account creating those pods will require a set of RBAC permissions. Create a service account you will use for Dask, and then attach the following Role to that ServiceAccount via a RoleBinding:
kind: Role apiVersion: rbac.authorization.k8s.io/v1beta1 metadata: name: daskKubernetes rules: - apiGroups: - "" # indicates the core API group resources: - "pods" verbs: - "get" - "list" - "watch" - "create" - "delete" - apiGroups: - "" # indicates the core API group resources: - "pods/log" verbs: - "get" - "list" - apiGroups: - "" # indicates the core API group resources: - "services" verbs: - "get" - "list" - "watch" - "create" - "delete" - apiGroups: - "policy" # indicates the policy API group resources: - "poddisruptionbudgets" verbs: - "get" - "list" - "watch" - "create" - "delete"
Docker Images
Example Dask docker images daskdev/dask and daskdev/dask-notebook are available on https://hub.docker.com/r/daskdev . More information about these images is available at the Dask documentation.
Note that these images can be further customized with extra packages using EXTRA_PIP_PACKAGES
, EXTRA_APT_PACKAGES
, and EXTRA_CONDA_PACKAGES
as described in the Extensibility section.
Deployment Details
Scheduler
Before workers are created a scheduler will be deployed with the following resources:
- A pod with a scheduler running
- A service (svc) to expose scheduler and dashboard ports
- A PodDisruptionBudget avoid voluntary disruptions of the scheduler pod
By default the Dask configuration option kubernetes.scheduler-service-type
is set to ClusterIp
. In order to connect to the scheduler the KubeCluster
will first attempt to connect directly, but this will only be successful if dask-kubernetes
is being run from within the Kubernetes cluster. If it is unsuccessful it will attempt to port forward the service locally using the kubectl
utility.
If you update the service type to NodePort
. The scheduler will be exposed on the same random high port on all nodes in the cluster. In this case KubeCluster
will attempt to list nodes in order to get an IP to connect on and requires additional permissions to do so.
- apiGroups: - "" # indicates the core API group resources: - "nodes" verbs: - "get" - "list"
If you set the service type to LoadBalancer
then KubeCluster
will connect to the external address of the assigned loadbalancer, but this does require that your Kubernetes cluster has the appropriate operator to assign loadbalancers.
Legacy mode
For backward compatibility with previous versions of dask-kubernetes
it is also possible to run the scheduler locally. A local
scheduler is created where the Dask client will be created.
from dask_kubernetes import KubeCluster from dask.distributed import Client cluster = KubeCluster.from_yaml('worker-spec.yml', deploy_mode='local') cluster.scale(10) client = Client(cluster)
In this mode the Dask workers will attempt to connect to the machine where you are running dask-kubernetes
. Generally this will need to be within the Kubernetes cluster in order for the workers to make a successful connection.
Workers
Workers are created directly as simple pods. These worker pods are configured to shutdown if they are unable to connect to the scheduler for 60 seconds. The pods are cleaned up when close()
is called, or the scheduler process exits.
The pods are created with two default tolerations:
k8s.dask.org/dedicated=worker:NoSchedule
k8s.dask.org_dedicated=worker:NoSchedule
If you have nodes with the corresponding taints, then the worker pods will schedule to those nodes (and no other pods will be able to schedule to those nodes).
API
KubeCluster ([pod_template, name, namespace, …]) |
Launch a Dask cluster on Kubernetes |
KubeCluster.adapt (*args[, minimum, maximum]) |
Turn on adaptivity |
KubeCluster.from_dict (pod_spec, **kwargs) |
Create cluster with worker pod spec defined by Python dictionary |
KubeCluster.from_yaml (yaml_path, **kwargs) |
Create cluster with worker pod spec defined by a YAML file |
KubeCluster.get_logs ([cluster, scheduler, …]) |
Return logs for the cluster, scheduler and workers |
KubeCluster.pods |
|
KubeCluster.scale (n) |
Scale cluster to n workers |
InCluster |
Configure the Kubernetes connection from a container’s environment. |
KubeConfig ([config_file, context, …]) |
Configure the Kubernetes connection from a kubeconfig file. |
KubeAuth (host, **kwargs) |
Configure the Kubernetes connection explicitly. |
class dask_kubernetes.
KubeCluster
(pod_template=None, name=None, namespace=None, n_workers=None, host=None, port=None, env=None, auth=[<dask_kubernetes.auth.InCluster object>, <dask_kubernetes.auth.KubeConfig object>], idle_timeout=None, deploy_mode=None, interface=None, protocol=None, dashboard_address=None, security=None, scheduler_service_wait_timeout=None, scheduler_pod_template=None, **kwargs)[source]
Launch a Dask cluster on Kubernetes
This starts a local Dask scheduler and then dynamically launches Dask workers on a Kubernetes cluster. The Kubernetes cluster is taken to be either the current one on which this code is running, or as a fallback, the default one configured in a kubeconfig file.
Environments
Your worker pod image should have a similar environment to your local environment, including versions of Python, dask, cloudpickle, and any libraries that you may wish to use (like NumPy, Pandas, or Scikit-Learn). See examples below for suggestions on how to manage and check for this.
Network
Since the Dask scheduler is launched locally, for it to work, we need to be able to open network connections between this local node and all the workers nodes on the Kubernetes cluster. If the current process is not already on a Kubernetes node, some network configuration will likely be required to make this work.
Resources
Your Kubernetes resource limits and requests should match the --memory-limit
and --nthreads
parameters given to the dask-worker
command.
Parameters: | pod_template: (kubernetes.client.V1Pod, dict, str) A Kubernetes specification for a Pod for a dask worker. Can be either a scheduler_pod_template: kubernetes.client.V1Pod (optional) A Kubernetes specification for a Pod for a dask scheduler. Defaults to the pod_template. name: str (optional) Name given to the pods. Defaults to namespace: str (optional) Namespace in which to launch the workers. Defaults to current namespace if available or “default” n_workers: int Number of workers on initial launch. Use env: Dict[str, str] Dictionary of environment variables to pass to worker pod host: str Listen address for local scheduler. Defaults to 0.0.0.0 port: int Port of local scheduler auth: List[ClusterAuth] (optional) Configuration methods to attempt in order. Defaults to idle_timeout: str (optional) The scheduler task will exit after this amount of time if there are no requests from the client. Default is to never timeout. scheduler_service_wait_timeout: int (optional) Timeout, in seconds, to wait for the remote scheduler service to be ready. Defaults to 30 seconds. Set to 0 to disable the timeout (not recommended). deploy_mode: str (optional) Run the scheduler as “local” or “remote”. Defaults to **kwargs: dict Additional keyword arguments to pass to LocalCluster |
---|
See also
KubeCluster.adapt
Examples
>>> from dask_kubernetes import KubeCluster, make_pod_spec >>> pod_spec = make_pod_spec(image='daskdev/dask:latest', ... memory_limit='4G', memory_request='4G', ... cpu_limit=1, cpu_request=1, ... env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'}) >>> cluster = KubeCluster(pod_spec) >>> cluster.scale(10)
You can also create clusters with worker pod specifications as dictionaries or stored in YAML files
>>> cluster = KubeCluster('worker-template.yml') >>> cluster = KubeCluster({...})
Rather than explicitly setting a number of workers you can also ask the cluster to allocate workers dynamically based on current workload
>>> cluster.adapt()
You can pass this cluster directly to a Dask client
>>> from dask.distributed import Client >>> client = Client(cluster)
You can verify that your local environment matches your worker environments by calling client.get_versions(check=True)
. This will raise an informative error if versions do not match.
>>> client.get_versions(check=True)
The daskdev/dask
docker images support EXTRA_PIP_PACKAGES
, EXTRA_APT_PACKAGES
and EXTRA_CONDA_PACKAGES
environment variables to help with small adjustments to the worker environments. We recommend the use of pip over conda in this case due to a much shorter startup time. These environment variables can be modified directly from the KubeCluster constructor methods using the env=
keyword. You may list as many packages as you like in a single string like the following:
>>> pip = 'pyarrow gcsfs git+https://github.com/dask/distributed' >>> conda = '-c conda-forge scikit-learn' >>> KubeCluster(..., env={'EXTRA_PIP_PACKAGES': pip, ... 'EXTRA_CONDA_PACKAGES': conda})
You can also start a KubeCluster with no arguments if the worker template is specified in the Dask config files, either as a full template in kubernetes.worker-template
or a path to a YAML file in kubernetes.worker-template-path
.
See https://docs.dask.org/en/latest/configuration.html for more information about setting configuration values.:
$ export DASK_KUBERNETES__WORKER_TEMPLATE_PATH=worker_template.yaml
>>> cluster = KubeCluster() # automatically finds 'worker_template.yaml'
Attributes: | asynchronous dashboard_link name observed plan requested scheduler_address |
---|
Methods
adapt (*args[, minimum, maximum]) |
Turn on adaptivity |
from_dict (pod_spec, **kwargs) |
Create cluster with worker pod spec defined by Python dictionary |
from_name (name) |
Create an instance of this class to represent an existing cluster by name. |
from_yaml (yaml_path, **kwargs) |
Create cluster with worker pod spec defined by a YAML file |
get_logs ([cluster, scheduler, workers]) |
Return logs for the cluster, scheduler and workers |
new_worker_spec () |
Return name and spec for the next worker |
scale (n) |
Scale cluster to n workers |
scale_up ([n, memory, cores]) |
Scale cluster to n workers |
close | |
logs | |
scale_down | |
sync |
classmethod from_dict
(pod_spec, **kwargs)[source]
Create cluster with worker pod spec defined by Python dictionary
Deprecated, please use the KubeCluster constructor directly.
See also
Examples
>>> spec = { ... 'metadata': {}, ... 'spec': { ... 'containers': [{ ... 'args': ['dask-worker', '$(DASK_SCHEDULER_ADDRESS)', ... '--nthreads', '1', ... '--death-timeout', '60'], ... 'command': None, ... 'image': 'daskdev/dask:latest', ... 'name': 'dask-worker', ... }], ... 'restartPolicy': 'Never', ... } ... } >>> cluster = KubeCluster.from_dict(spec, namespace='my-ns') # doctest: +SKIP
classmethod from_yaml
(yaml_path, **kwargs)[source]
Create cluster with worker pod spec defined by a YAML file
Deprecated, please use the KubeCluster constructor directly.
We can start a cluster with pods defined in an accompanying YAML file like the following:
kind: Pod metadata: labels: foo: bar baz: quux spec: containers: - image: daskdev/dask:latest name: dask-worker args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '2', --memory-limit, 8GB] restartPolicy: Never
See also
Examples
>>> cluster = KubeCluster.from_yaml('pod.yaml', namespace='my-ns') # doctest: +SKIP
scale
(n)[source]
Scale cluster to n workers
Parameters: | n : int Target number of workers |
---|
Examples
>>> cluster.scale(10) # scale cluster to ten workers
class dask_kubernetes.
ClusterAuth
[source]
An abstract base class for methods for configuring a connection to a Kubernetes API server.
Examples
>>> from dask_kubernetes import KubeConfig >>> auth = KubeConfig(context='minikube')
>>> from dask_kubernetes import KubeAuth >>> auth = KubeAuth(host='https://localhost', username='superuser', password='pass')
Methods
load () |
Load Kubernetes configuration and set as default |
load_first ([auth]) |
Load the first valid configuration in the list auth. |
load
()[source]
Load Kubernetes configuration and set as default
Raises: | kubernetes.client.KubeConfigException |
---|
static load_first
(auth=None)[source]
Load the first valid configuration in the list auth. A single configuration method can be passed.
Parameters: | auth: List[ClusterAuth] (optional) Configuration methods to attempt in order. Defaults to |
---|
class dask_kubernetes.
InCluster
[source]
Configure the Kubernetes connection from a container’s environment.
This authentication method is intended for use when the client is running in a container started by Kubernetes with an authorized service account. This loads the mounted service account token and discovers the Kubernetes API via Kubernetes service discovery.
Methods
load () |
Load Kubernetes configuration and set as default |
load_first ([auth]) |
Load the first valid configuration in the list auth. |
class dask_kubernetes.
KubeConfig
(config_file=None, context=None, persist_config=True)[source]
Configure the Kubernetes connection from a kubeconfig file.
Parameters: | config_file: str (optional) The path of the kubeconfig file to load. Defaults to the value of the context: str (optional) The kubeconfig context to use. Defaults to the value of persist_config: bool (optional) Whether changes to the configuration will be saved back to disk (e.g. GCP token refresh). Defaults to |
---|
Methods
get_kube_config_loader_for_yaml_file () |
|
load () |
Load Kubernetes configuration and set as default |
load_first ([auth]) |
Load the first valid configuration in the list auth. |
load_kube_config () |
class dask_kubernetes.
KubeAuth
(host, **kwargs)[source]
Configure the Kubernetes connection explicitly.
Parameters: | host: str The base URL of the Kubernetes host to connect username: str (optional) Username for HTTP basic authentication password: str (optional) Password for HTTP basic authentication debug: bool (optional) Debug switch verify_ssl: bool (optional) Set this to false to skip verifying SSL certificate when calling API from https server. Defaults to ssl_ca_cert: str (optional) Set this to customize the certificate file to verify the peer. cert_file: str (optional) Client certificate file key_file: str (optional) Client key file assert_hostname: bool (optional) Set this to True/False to enable/disable SSL hostname verification. Defaults to True. proxy: str (optional) URL for a proxy to connect through |
---|
Methods
load () |
Load Kubernetes configuration and set as default |
load_first ([auth]) |
Load the first valid configuration in the list auth. |