인공지능/빅데이터 SQL

Dask #1. 파이썬 대규모 데이터 처리 Dask란?

백관구 2023. 8. 17. 17:21
반응형

    파이썬에서 대규모 데이터 처리 및 분석을 하기 위한 Dask 라이브러리를 소개하려 합니다.

 

Dask — Dask documentation

.rst .pdf to have style consistency -->

docs.dask.org

반응형

 


 

Dask

Dask

    Dask는 Python의 병렬 연산을 위한 라이브러리입니다. 즉, 대규모 데이터의 처리·연산에 필요한 분산 환경을 사용자들이 쉽게 사용할 수 있도록 해주는 라이브러리로 보시면 됩니다. 이러한 Dask는 크게 두 파트 "동적 작업 스케줄링"과 "빅데이터 컬렉션"이라는 파트로 나누어 구성됩니다.

    동적 작업 스케줄링실제 작업을 배정하고 실행하는 부분이고, 빅데이터 컬렉션은 스케줄링에서 실행할 작업을 구성하기 위해 필요한 분산 환경에 최적화된 여러 기능들을 제공하는 부분으로 보시면 됩니다.

 

1. 동적 작업 스케줄링 (Dynamic Task Scheduling)

- 아파치 AirflowLuigiCeleryMake와 같은 스케줄러와 비슷하지만 Dask는 연산에 최적화 됨

※ 스케줄링(Scheduling) : 다중 프로그래밍을 가능케 하는 기법으로, 프로세스들에게 CPU 등 자원 배정을 적절히 함으로써 시스템의 성능을 개선할 수 있음

※ 동적 스케줄링(Dynamic Scheduling) : 스케줄링 과정에서 프로세스의 우선순위를 변동시킴

 

2. 빅데이터 컬렉션 (Big Data Collections)

- 배열(array), 데이터프레임(dataframe), 리스트(list)를 사용하는 NumPy, Pandas 등 기본적인 파이썬 인터페이스들을 대용량 데이터, 분산 환경에서도 사용할 수 있도록 확장함

- 빅데이터 컬렉션은 동적 작업 스케줄러에서 실행 됨

 

    그럼 위의 두 파트, 동적 작업 스케줄링과 빅데이터 컬렉션이 어떤 구조로 사용되는지 알아보겠습니다. 아래 그림과 함께 보시면 보다 이해하기 쉬우실 것입니다. 그림의 왼쪽부터 오른쪽으로 순서대로 살펴보겠습니다.

그림 출처 : https://docs.dask.org/en/stable/

* 가장 왼쪽에 Collections는 Dask에서 제공하는 high-level 컬렉션들을 가리킴

    - Dask Array (NumPy와 유사), Dask DataFrame (Pandas와 유사), Dask Bag (파이썬 iterators, PySpark와 유사) 등

* 이러한 컬렉션들은 작업 그래프(Task Graph)를 생성하는 데에 사용됨

* 작업 그래프를 실행하는 것이 스케줄러(Schedulers)

* 스케줄러는 단일 머신 또는 클러스터에서 동작

 

    Dask는 다음과 같은 장점들이 있습니다.

1. 사용자 친화적 인터페이스

* 파이썬에서 대중적으로 사용되는 Pandas, NumPy와 같은 인터페이스와 사용 방법이 유사하여, 사용자가 이용하기 편리함

* Dask DataFrame은 Pandas와 유사함

    - Pandas의 DataFrame과 동일한 구조이지만, Dask는 병렬 연산 처리를 지원

    - import dask.dataframe

import pandas as pd                     import dask.dataframe as dd
df = pd.read_csv('2015-01-01.csv')      df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean()     df.groupby(df.user_id).value.mean().compute()

* Dask Array는 NumPy와 유사함

    - import dask.array

import numpy as np                       import dask.array as da
f = h5py.File('myfile.hdf5')             f = h5py.File('myfile.hdf5')
x = np.array(f['/small-data'])           x = da.from_array(f['/big-data'],
                                                           chunks=(1000, 1000))
x - x.mean(axis=1)                       x - x.mean(axis=1).compute()

* Dask Bag은 파이썬 iterators, Toolz, PySpark와 유사함

    - map, filter, fold, groupby와 같은 작업을 제공

    - import dask.bag

import dask.bag as db
b = db.read_text('2015-*-*.json.gz').map(json.loads)
b.pluck('name').frequencies().topk(10, lambda pair: pair[1]).compute()

* Dask Delayed는 for 반복문과 유사하며, 사용자 지정 코드를 처리함

from dask import delayed
L = []
for fn in filenames:                  # for 반복문을 사용하여 연산 구축
    data = delayed(load)(fn)          # 사용자 지정 함수를 delayed를 통해 실행
    L.append(delayed(process)(data))  # 변수와 delayed 간 연결

result = delayed(summarize)(L)
result.compute()

 

2. 유연한 스케일링

* Dask는 개인 노트북(laptop)부터 대용량 서버까지 대부분의 환경에서 사용할 수 있을만큼 범용적이고 편리함

    - 분석 및 개발 환경에서 사용가능한 데이터셋의 크기가 메모리 용량에 제한되지 않고 디스크 용량까지 확장 가능

    - 단일 머신부터 클러스터까지 상호간에 쉽게 전환할 수 있으므로, 사용자는 필요할 때 원하는만큼 확장 가능

    - 분산 스케줄러(distributed scheduler)를 통해 수백 대의 머신으로 구성된 클러스터로 확장 가능

* Dask의 설치 또한 conda나 pip를 사용하여 간단하게 수행할 수 있음

 

3. 복잡한 알고리즘에 대처

* Dask는 작업 그래프(task graphs)로 병렬 연산을 표현함

    - 이러한 그래프는 임의의 구조를 가질 수 있으므로, 개발자와 사용자 모두 자유롭게 알고리즘을 구축하고 복잡한 구조더라도 대처할 수 있음

 

반응형