일반적으로 다음과 같이 Dask를 import 합니다. 작업 중인 데이터 유형(DataFrame, array, list)에 따라 이들 중 일부가 필요하지 않을 수도 있습니다.
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
Dask DataFrame¶
Dask 객체 생성¶
먼저, Dask DataFrame을 활용해 Pandas DataFrame 유형의 데이터를 작업해보도록 하겠습니다.
index = pd.date_range("2021-09-01", periods=2400, freq="1H")
df = pd.DataFrame({"a": np.arange(2400), "b": list("abcaddbe" * 300)}, index=index)
df
a | b | |
---|---|---|
2021-09-01 00:00:00 | 0 | a |
2021-09-01 01:00:00 | 1 | b |
2021-09-01 02:00:00 | 2 | c |
2021-09-01 03:00:00 | 3 | a |
2021-09-01 04:00:00 | 4 | d |
... | ... | ... |
2021-12-09 19:00:00 | 2395 | a |
2021-12-09 20:00:00 | 2396 | d |
2021-12-09 21:00:00 | 2397 | d |
2021-12-09 22:00:00 | 2398 | b |
2021-12-09 23:00:00 | 2399 | e |
2400 rows × 2 columns
ddf = dd.from_pandas(df, npartitions=10)
ddf
a | b | |
---|---|---|
npartitions=10 | ||
2021-09-01 00:00:00 | int32 | string |
2021-09-11 00:00:00 | ... | ... |
... | ... | ... |
2021-11-30 00:00:00 | ... | ... |
2021-12-09 23:00:00 | ... | ... |
이제 2개의 열, 2400개의 행으로 구성된 Dask DataFrame이 있습니다. 이 Dask DataFrame은 2400개 행을 10개의 파티션(partition)으로 나누며, 따라서 각 파티션은 240개 행으로 구성되어 있습니다. 여기서 파티션은 데이터 조각을 나타냅니다.
다음은 Dask DataFrame의 몇 가지 주요 속성입니다.
# 각 파티션이 포함하는 인덱스 값 확인
ddf.divisions
(Timestamp('2021-09-01 00:00:00'), Timestamp('2021-09-11 00:00:00'), Timestamp('2021-09-21 00:00:00'), Timestamp('2021-10-01 00:00:00'), Timestamp('2021-10-11 00:00:00'), Timestamp('2021-10-21 00:00:00'), Timestamp('2021-10-31 00:00:00'), Timestamp('2021-11-10 00:00:00'), Timestamp('2021-11-20 00:00:00'), Timestamp('2021-11-30 00:00:00'), Timestamp('2021-12-09 23:00:00'))
# 특정 파티션에 액세스
ddf.partitions[1]
a | b | |
---|---|---|
npartitions=1 | ||
2021-09-11 | int32 | string |
2021-09-21 | ... | ... |
인덱싱 (Indexing)¶
Dask DataFrame의 인덱싱은 pandas DataFrame을 슬라이싱(slicing)하는 것과 유사합니다.
ddf.b
Dask Series Structure: npartitions=10 2021-09-01 00:00:00 string 2021-09-11 00:00:00 ... ... 2021-11-30 00:00:00 ... 2021-12-09 23:00:00 ... Name: b, dtype: string Dask Name: getitem, 3 graph layers
ddf["2021-10-01": "2021-10-09 5:00"]
a | b | |
---|---|---|
npartitions=1 | ||
2021-10-01 00:00:00.000000000 | int32 | string |
2021-10-09 05:00:59.999999999 | ... | ... |
연산 (Computation)¶
연산 결과는 사용자가 요청할 때까지 계산되지 않으며, 대신에 연산 과정을 정리한 Dask 작업 그래프가 생성됩니다. 연산 결과를 얻고 싶다면 compute
를 호출해야 합니다.
ddf["2021-10-01": "2021-10-09 5:00"].compute()
a | b | |
---|---|---|
2021-10-01 00:00:00 | 720 | a |
2021-10-01 01:00:00 | 721 | b |
2021-10-01 02:00:00 | 722 | c |
2021-10-01 03:00:00 | 723 | a |
2021-10-01 04:00:00 | 724 | d |
... | ... | ... |
2021-10-09 01:00:00 | 913 | b |
2021-10-09 02:00:00 | 914 | c |
2021-10-09 03:00:00 | 915 | a |
2021-10-09 04:00:00 | 916 | d |
2021-10-09 05:00:00 | 917 | d |
198 rows × 2 columns
메서드 (Methods)¶
Dask DataFrame의 메서드는 기존 Pandas 메서드와 일치합니다. 메서드를 호출하여 작업 그래프(task graph)를 설정한 다음, compute
를 호출하여 결과를 가져옵니다.
ddf.a.mean()
dd.Scalar<series-..., dtype=float64>
ddf.a.mean().compute()
1199.5
ddf.b.unique()
Dask Series Structure: npartitions=1 string ... Name: b, dtype: string Dask Name: unique-agg, 5 graph layers
ddf.b.unique().compute()
0 a 1 b 2 c 3 d 4 e Name: b, dtype: string
Pandas처럼 여러 메서드를 함께 연결하여 사용할 수도 있습니다.
result = ddf["2021-10-01": "2021-10-09 5:00"].a.cumsum() - 100
result
Dask Series Structure: npartitions=1 2021-10-01 00:00:00.000000000 int32 2021-10-09 05:00:59.999999999 ... Name: a, dtype: int32 Dask Name: sub, 8 graph layers
result.compute()
2021-10-01 00:00:00 620 2021-10-01 01:00:00 1341 2021-10-01 02:00:00 2063 2021-10-01 03:00:00 2786 2021-10-01 04:00:00 3510 ... 2021-10-09 01:00:00 158301 2021-10-09 02:00:00 159215 2021-10-09 03:00:00 160130 2021-10-09 04:00:00 161046 2021-10-09 05:00:00 161963 Freq: H, Name: a, Length: 198, dtype: int32
작업 그래프 (Task Graph) 시각화¶
지금까지 연산 작업을 설정하고 compute
를 호출했습니다. 추가적으로, compute
를 호출하기 전에 작업 그래프(task graph)를 검사하여 연산 과정을 파악하고 검토할 수 있습니다.
result.dask
HighLevelGraph
HighLevelGraph with 8 layers and 26 keys from all layers.
Layer1: from_pandas
from_pandas-6142e84c1d4011397f444f391cd4e05f
|
Layer2: to_pyarrow_string
to_pyarrow_string-0f3b37cf4394c7ec6c87ad2f07e26a91
|
Layer3: loc
loc-f142afadbb298ee83a952d7ae89f15bd
|
Layer4: getitem
getitem-4c2d6071b79fb3cda430b7c7b592b9c1
|
Layer5: series-cumsum-map
series-cumsum-map-a07067c6901a2e403c12ad5e674204bb
|
Layer6: series-cumsum-take-last
series-cumsum-take-last-156f128c2ac1566b7ee1873a77b6286c
|
Layer7: series-cumsum
series-cumsum-014418bacdeef6b8de8aca9141b970c7
|
Layer8: sub
sub-79ec33501f65f80195c25e7152e649e2
|
result.visualize()
Dask Array¶
다음은 Dask array를 활용해 위와 동일한 작업을 해보겠습니다. 먼저, NumPy 라이브러리를 사용해 Dask Array에 입력할 배열(array)을 생성해보겠습니다.
data = np.arange(100_000).reshape(200, 500)
data
array([[ 0, 1, 2, ..., 497, 498, 499], [ 500, 501, 502, ..., 997, 998, 999], [ 1000, 1001, 1002, ..., 1497, 1498, 1499], ..., [98500, 98501, 98502, ..., 98997, 98998, 98999], [99000, 99001, 99002, ..., 99497, 99498, 99499], [99500, 99501, 99502, ..., 99997, 99998, 99999]])
a = da.from_array(data, chunks=(100, 100))
a
|
이제 (200, 500) 형태의 2D 배열(array)을 (100, 100) 형태의 10개 덩어리(chunk)로 나눈 Dask array 객체 a
를 생성했습니다. 여기서 덩어리(chunk)는 데이터 조각을 나타냅니다. 다음은 Dask Array의 주요 속성입니다.
# 덩어리(Chunk) 검사
a.chunks
((100, 100), (100, 100, 100, 100, 100))
# 특정 데이터 덩어리(chunk)에 액세스
a.blocks[1, 3]
|
a[:50, 200]
|
a[:50, 200].compute()
array([ 200, 700, 1200, 1700, 2200, 2700, 3200, 3700, 4200, 4700, 5200, 5700, 6200, 6700, 7200, 7700, 8200, 8700, 9200, 9700, 10200, 10700, 11200, 11700, 12200, 12700, 13200, 13700, 14200, 14700, 15200, 15700, 16200, 16700, 17200, 17700, 18200, 18700, 19200, 19700, 20200, 20700, 21200, 21700, 22200, 22700, 23200, 23700, 24200, 24700])
a.mean()
|
a.mean().compute()
49999.5
np.sin(a)
|
np.sin(a).compute()
array([[ 0. , 0.84147098, 0.90929743, ..., 0.58781939, 0.99834363, 0.49099533], [-0.46777181, -0.9964717 , -0.60902011, ..., -0.89796748, -0.85547315, -0.02646075], [ 0.82687954, 0.9199906 , 0.16726654, ..., 0.99951642, 0.51387502, -0.4442207 ], ..., [-0.99720859, -0.47596473, 0.48287891, ..., -0.76284376, 0.13191447, 0.90539115], [ 0.84645538, 0.00929244, -0.83641393, ..., 0.37178568, -0.5802765 , -0.99883514], [-0.49906936, 0.45953849, 0.99564877, ..., 0.10563876, 0.89383946, 0.86024828]])
a.T
|
a.T.compute()
array([[ 0, 500, 1000, ..., 98500, 99000, 99500], [ 1, 501, 1001, ..., 98501, 99001, 99501], [ 2, 502, 1002, ..., 98502, 99002, 99502], ..., [ 497, 997, 1497, ..., 98997, 99497, 99997], [ 498, 998, 1498, ..., 98998, 99498, 99998], [ 499, 999, 1499, ..., 98999, 99499, 99999]])
NumPy 메서드처럼 Dask Array의 메서드도 연결하여 사용할 수 있습니다.
b = a.max(axis=1)[::-1]+10
b
|
b[:10].compute()
array([100009, 99509, 99009, 98509, 98009, 97509, 97009, 96509, 96009, 95509])
b.dask
HighLevelGraph
HighLevelGraph with 6 layers and 30 keys from all layers.
Layer1: array
array-cc776c6b9a569dbadabcea53d1c7a075
|
Layer2: chunk_max
chunk_max-3bc9e5bc8ce344a8a540ad6766f630e4
|
Layer3: chunk_max-partial
chunk_max-partial-f1f52f76bb5fa43515fce61754a0379b
|
Layer4: max-aggregate
max-aggregate-8dda418caf3a76e16f714cadc3579a77
|
Layer5: getitem
getitem-54bbd49a5504c058ce998827f3f1e423
|
Layer6: add
add-5c9048a2c61ec5b51c83b9801ed5d438
|
b.visualize()
Dask Bag¶
마지막으로 Dask Bag를 활용하여 마찬가지로 위와 동일한 작업을 진행해보겠습니다. 우선, Dask Bag 객체 b
를 생성해줍니다.
b = db.from_sequence([1, 2, 3, 4, 5, 6, 2, 1], npartitions=2)
b
dask.bag<from_sequence, npartitions=2>
8개 항목(items)으로 구성된 시퀀스(sequence)를 4개 항목으로 구성된 2개의 파티션(partitions)으로 나눠 Dask Bag 객체 b
를 생성했습니다. 여기서 파티션은 데이터 조각을 나타냅니다.
다음은 인덱싱(indexing)을 할 차례지만, Dask Bag는 앞선 Dask DataFrame, Dask Array와 다르게 인덱스(index)가 없는 데이터 구조입니다. 이는 Dask Bag가 파이썬의 리스트(list)처럼 반복(iterate)은 가능하지만, Dask Bag의 경우 정렬(ordering)은 되지 않기 때문입니다. 따라서 요소(elements) 간의 순서를 보장하지 않습니다.
b.compute()
[1, 2, 3, 4, 5, 6, 2, 1]
Dask Bag는 Python의 일반적인 컬렉션(예. 리스트) 객체처럼 map
, filter
, fold
, groupby
와 같은 작업들이 가능합니다.
b.filter(lambda x: x % 2)
dask.bag<filter-lambda, npartitions=2>
b.filter(lambda x: x % 2).compute()
[1, 3, 5, 1]
b.distinct()
dask.bag<distinct-aggregate, npartitions=1>
b.distinct().compute()
[1, 2, 3, 4, 5, 6]
마찬가지로 메서드들을 연결하여 함께 사용할 수 있습니다.
c = db.zip(b, b.map(lambda x: x * 10))
c
dask.bag<zip, npartitions=2>
c.compute()
[(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (6, 60), (2, 20), (1, 10)]
c.dask
HighLevelGraph
HighLevelGraph with 3 layers and 6 keys from all layers.
Layer1: from_sequence
from_sequence-cca2a33ba6e12645a0c9bc0fd3fe6c88
|
Layer2: lambda
lambda-534eafdc975b2e8bbd0d2cae52c464bd
|
Layer3: zip
zip-ca4afde52fa305468e6fa67447c40b20
|
c.visualize()
Low-Level 인터페이스 : Dask Delayed¶
기존에 있는 코드를 병렬화하거나 사용자 지정 알고리즘을 구축할 때, 위와 같은 DataFrame, 배열(array) 유형을 사용하지 않는 코드를 병렬화 해야 할 경우도 있습니다. 이러한 경우, Dask Delayed를 이용할 수 있습니다.
Dask Delayed를 사용하면 개별 함수 호출들을 작업 그래프로 생성할 수 있습니다.
import dask
@dask.delayed
def inc(x):
return x + 1
@dask.delayed
def add(x, y):
return x + y
a = inc(1) # 실제 연산이 실행되지 않음
b = inc(2) # 실제 연산이 실행되지 않음
c = add(a, b) # 실제 연산이 실행되지 않음
c = c.compute() # compute를 호출해 위의 연산들을 실제로 실행
c
5
또는 함수가 실행되는 즉시 연산이 시작되길 원한다면, 아래와 같이 dask.distributed.Client
를 사용하는 방법도 있습니다 (Futures 참조). 단, 이 방법은 분산 클러스터에서만 사용 가능합니다.
from dask.distributed import Client
client = Client()
def inc(x):
return x + 1
def add(x, y):
return x + y
a = client.submit(inc, 1) # 실제 연산이 바로 실행됨
b = client.submit(inc, 2) # 실제 연산이 바로 실행됨
c = client.submit(add, a, b) # 실제 연산이 바로 실행됨
c = c.result() # 결과들을 수집해서 산출
c
5
스케줄링 Client¶
작업 그래프(Task graph)를 생성한 후, 이를 실행하는 것은 스케줄러의 역할입니다 (스케줄링 참조).
기본적으로, 대부분의 Dask API (Dask Array, Dask DataFrame, Dask Delayed)의 경우 Dask 객체에서 compute
를 호출하면, Dask는 컴퓨터의 스레드 풀(스레드 스케줄러)을 사용하여 병렬 연산을 실행합니다. 그 외, Dask Bag만 예외적으로 다중처리 스케줄러를 사용합니다.
더 많은 리소스를 활용해 제어하기를 원한다면 분산 스케줄러를 사용해야 합니다. 이 분산 스케줄러는 단일 머신과 다중 머신에서 모두 작동시킬 수 있습니다.
로컬 환경 (Local)¶
아래 코드는 로컬 컴퓨터만 사용해 클러스터를 설정하는 방법입니다.
from dask.distributed import Client
client = Client()
client
C:\Users\BEGAS_15\PycharmProjects\test_dask\venv\lib\site-packages\distributed\node.py:182: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 53421 instead warnings.warn(
Client
Client-0f60577c-3d7a-11ee-a6c4-002b6765fa53
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:53421/status |
Cluster Info
LocalCluster
47bc4bec
Dashboard: http://127.0.0.1:53421/status | Workers: 4 |
Total threads: 8 | Total memory: 15.87 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-6b34d5bb-d079-415f-89ec-58fcce3a46dd
Comm: tcp://127.0.0.1:53424 | Workers: 4 |
Dashboard: http://127.0.0.1:53421/status | Total threads: 8 |
Started: Just now | Total memory: 15.87 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:53456 | Total threads: 2 |
Dashboard: http://127.0.0.1:53457/status | Memory: 3.97 GiB |
Nanny: tcp://127.0.0.1:53427 | |
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-m414m7d1 |
Worker: 1
Comm: tcp://127.0.0.1:53459 | Total threads: 2 |
Dashboard: http://127.0.0.1:53460/status | Memory: 3.97 GiB |
Nanny: tcp://127.0.0.1:53428 | |
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-zi8mm6cu |
Worker: 2
Comm: tcp://127.0.0.1:53453 | Total threads: 2 |
Dashboard: http://127.0.0.1:53454/status | Memory: 3.97 GiB |
Nanny: tcp://127.0.0.1:53429 | |
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-b0kna_uz |
Worker: 3
Comm: tcp://127.0.0.1:53450 | Total threads: 2 |
Dashboard: http://127.0.0.1:53451/status | Memory: 3.97 GiB |
Nanny: tcp://127.0.0.1:53430 | |
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-muklls29 |
원격 환경 (서버)¶
이번에는 이미 실행 중인 원격 클러스터에 연결하는 방법을 알아보겠습니다. 이외에도 원격 클러스터를 설정하는 방법에는 여러 가지가 있습니다. 자세한 내용은 dask 클러스터 배포 방법(Deploy Dask Clusters)을 참조하시길 바랍니다.
from dask.distributed import Client
client = Client("<url-of-scheduler>") # 원격 서버 url 주소 입력
client
진단 Dashboard¶
분산 클러스터를 사용할 때, Dask는 작업이 처리되는 것을 볼 수 있는 대시보드(dashboard)를 제공합니다. Client
객체의 대시보드(Dashboard) 주소를 통해 리소스 및 작업 현황들을 실시간으로 확인할 수 있습니다. 자세히 알아보려면 진단 대시보드(Dashboard Diagnostics)를 참조하시길 바랍니다.
client.dashboard_link
'http://127.0.0.1:53421/status'
'인공지능 > 빅데이터 SQL' 카테고리의 다른 글
Dask #5. 파이썬 Dask Bag (0) | 2023.08.21 |
---|---|
Dask #4. 파이썬 Dask Array (0) | 2023.08.18 |
Dask #2. 파이썬 Dask 설치 방법 (0) | 2023.08.17 |
Dask #1. 파이썬 대규모 데이터 처리 Dask란? (0) | 2023.08.17 |
데이터 제공 사이트 (0) | 2023.07.31 |