이번 포스팅에서는 파이썬 Dask Bag를 활용한 간단한 예제를 다뤄보도록 하겠습니다.
목차¶
Dask Bag¶
Dask Bag은 map
, filter
, groupby
, 그 외 집계 작업처럼 파이썬 컬렉션 객체에 대해 수행할 수 있는 여러 작업들을 구현해놓았습니다. 이 작업들을 수행하기 위해 Python 반복자(iterators)를 사용하여, 적은 메모리에서도 병렬 작업을 수행합니다. 이러한 점은 병렬 버전의 Itertools 또는 PySpark RDD와 유사합니다.
특히 로그 파일, JSON records, 사용자 정의 Python 객체에 대해 간단한 전처리를 수행하는 데에 이 Dask Bag이 사용됩니다.
Dask 클라이언트¶
Dask 클라이언트(client)는 필요에 따라 실행해도 되고, 실행하지 않아도 됩니다. Dask 클라이언트는 연산 작업 현황에 대한 정보를 얻는데 유용한 대시보드(dashboard)를 제공합니다.
아래 코드와 같이 클라이언트를 생성하면 대시보드에 대한 링크가 표시됩니다. 이 링크를 클릭해, 작업을 실행하는 동안 다른 화면 한쪽에 대시보드를 열어 두는 것이 좋습니다. 대시보드를 작업 화면과 동시에 보는 것은 분석 및 학습을 수행할 때 매우 유용합니다.
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
client
Client
Client-a78e20d1-3fbe-11ee-b5e4-002b6765fa53
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
c75b9c02
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 4 | Total memory: 15.87 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-4b259d15-b413-4928-803b-af4fbd5e2489
Comm: tcp://127.0.0.1:64716 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 4 |
Started: Just now | Total memory: 15.87 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:64737 | Total threads: 1 |
Dashboard: http://127.0.0.1:64740/status | Memory: 3.97 GiB |
Nanny: tcp://127.0.0.1:64719 | |
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-tbmp3hti |
Worker: 1
Comm: tcp://127.0.0.1:64744 | Total threads: 1 |
Dashboard: http://127.0.0.1:64745/status | Memory: 3.97 GiB |
Nanny: tcp://127.0.0.1:64720 | |
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-hvgabzpp |
Worker: 2
Comm: tcp://127.0.0.1:64735 | Total threads: 1 |
Dashboard: http://127.0.0.1:64738/status | Memory: 3.97 GiB |
Nanny: tcp://127.0.0.1:64721 | |
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-i8g39g5g |
Worker: 3
Comm: tcp://127.0.0.1:64736 | Total threads: 1 |
Dashboard: http://127.0.0.1:64742/status | Memory: 3.97 GiB |
Nanny: tcp://127.0.0.1:64722 | |
Local directory: C:\Users\BEGAS_15\AppData\Local\Temp\dask-scratch-space\worker-up8_kgfo |
JSON 데이터 읽기¶
먼저, 여기에서 사용할 데이터셋을 만들기 위해 아래 코드와 같이 임의의 레코드 데이터셋을 생성하고, 여러 개의 JSON 파일에 저장합니다.
import dask
import json
import os
os.makedirs('data', exist_ok=True) # data 디렉토리 생성
b = dask.datasets.make_people() # dask.datasets에서 제공하는 make_people 데이터셋 사용
b.map(json.dumps).to_textfiles('data/*.json') # JSON 파일로 저장
['C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/0.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/1.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/2.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/3.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/4.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/5.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/6.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/7.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/8.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/9.json']
이제 JSON 파일 형식의 데이터가 있으므로 Dask Bag과 Python JSON 모듈을 사용하여 데이터를 읽어오겠습니다.
import dask.bag as db
import json
b = db.read_text('data/*.json').map(json.loads)
b
dask.bag<loads, npartitions=20>
b.take(2)
({'age': 18, 'name': ['Teodoro', 'Rose'], 'occupation': 'Machine Tool Fitter', 'telephone': '+18650301105', 'address': {'address': '70 Castillo Viaduct', 'city': 'Trotwood'}, 'credit-card': {'number': '4091 8347 1660 9247', 'expiration-date': '04/16'}}, {'age': 26, 'name': ['Pedro', 'Edwards'], 'occupation': 'Mill Worker', 'telephone': '+19311430557', 'address': {'address': '494 Lamartine Square', 'city': 'Berea'}, 'credit-card': {'number': '4538 8217 4491 6273', 'expiration-date': '11/22'}})
Map, Filter, Aggregate¶
특정 레코드만 필터링하고, 데이터를 처리하기 위해 함수를 매핑(mapping)하고, 그 결과를 집계하는 순서로 데이터를 처리해보겠습니다.
b.filter(lambda record: record['age'] > 30).take(2) # 나이가 30보다 큰 사람만 추출
({'age': 46, 'name': ['Ken', 'Cobb'], 'occupation': 'Theatre Manager', 'telephone': '+12781170521', 'address': {'address': '1160 Dutch Windmill Access Lake', 'city': 'Denton'}, 'credit-card': {'number': '3778 269896 73523', 'expiration-date': '09/16'}}, {'age': 58, 'name': ['Vanna', 'Boyd'], 'occupation': 'Night Porter', 'telephone': '+13044000542', 'address': {'address': '332 Buchanan Run', 'city': 'Forest Lake'}, 'credit-card': {'number': '4767 1366 3094 8146', 'expiration-date': '04/22'}})
b.map(lambda record: record['occupation']).take(2) # 직업만 추출
('Machine Tool Fitter', 'Mill Worker')
b.count().compute() # 총 레코드 개수
17104
위처럼 하나의 파이프라인에서 단일 작업만 수행해도 되지만, 일반적으로 하나의 파이프라인에서 여러 작업을 수행합니다. 이 경우, 파이프라인을 구성한 후 마지막에 compute
또는 take
를 호출하면 실행된 연산 결과를 얻을 수 있습니다.
result = (b.filter(lambda record: record['age'] > 30)
.map(lambda record: record['occupation'])
.frequencies(sort=True)
.topk(10, key=1))
result
dask.bag<topk-aggregate, npartitions=1>
result.compute()
[('Post Sorter', 24), ('Optical Advisor', 24), ('Historian', 24), ('Chandler', 24), ('Machine Setter', 23), ('Mill Operator', 22), ('Pipe Inspector', 22), ('Janitor', 21), ('Gallery Owner', 21), ('Church Officer', 21)]
변환 및 저장¶
위처럼 분석 결과를 바로 표출할 수도 있지만, 향후 추가적인 분석을 위해 전처리한 데이터를 디스크에 저장해야 할 경우도 있습니다. 이를 위해 to_textfiles
와 json.dumps
와 같은 메서드를 사용하거나, Dask Dataframes로 데이터 유형을 변환할 수도 있습니다.
(b.filter(lambda record: record['age'] > 30) # 나이가 30보다 큰 레코드만 추출
.map(json.dumps) # 파이썬 객체를 텍스트로 전환
.to_textfiles('data/processed.*.json')) # JSON 파일로 저장
['C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.00.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.01.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.02.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.03.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.04.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.05.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.06.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.07.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.08.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.09.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.10.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.11.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.12.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.13.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.14.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.15.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.16.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.17.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.18.json', 'C:/Users/BEGAS_15/PycharmProjects/test_dask/src/data/processed.19.json']
Dask DataFrame으로 변환¶
Dask Bag은 데이터를 읽고 간단한 전처리를 수행하고, 이후 복잡한 작업을 효율적으로 수행할 수 있는 Dask DataFrame과 같은 데이터 형식으로 변환하는 것이 좋습니다. Dask DataFrame은 Pandas 라이브러리의 기능을 내부적으로 사용하므로, 숫자 데이터를 처리하는 데에 훨씬 빠르고 복잡한 알고리즘 작업을 수행할 수 있습니다.
Dask Bag을 Dask Dataframe으로 변환하기 위해서는 열(column)로 구성된 데이터가 입력되어야 합니다. 위 예시에서 사용한 중첩된(nested) JSON 데이터(이 경우, Dask Bag이 더 적합)와 같은 형식은 Dask DataFrame에서 완벽히 지원되지 않기 때문에 적당한 형식으로 데이터를 변환해줄 필요가 있습니다. 따라서, 중첩된 데이터를 flatten하는 함수를 만든 후, 레코드 전체에 매핑한 다음, 이를 Dask Dataframe으로 변환합니다.
b.take(1)
({'age': 18, 'name': ['Teodoro', 'Rose'], 'occupation': 'Machine Tool Fitter', 'telephone': '+18650301105', 'address': {'address': '70 Castillo Viaduct', 'city': 'Trotwood'}, 'credit-card': {'number': '4091 8347 1660 9247', 'expiration-date': '04/16'}},)
def flatten(record):
return {
'age': record['age'],
'occupation': record['occupation'],
'telephone': record['telephone'],
'credit-card-number': record['credit-card']['number'],
'credit-card-expiration': record['credit-card']['expiration-date'],
'name': ' '.join(record['name']),
'street-address': record['address']['address'],
'city': record['address']['city']
}
flatten_b = b.map(flatten)
flatten_b.take(1)
({'age': 18, 'occupation': 'Machine Tool Fitter', 'telephone': '+18650301105', 'credit-card-number': '4091 8347 1660 9247', 'credit-card-expiration': '04/16', 'name': 'Teodoro Rose', 'street-address': '70 Castillo Viaduct', 'city': 'Trotwood'},)
df = flatten_b.to_dataframe()
df.head()
age | occupation | telephone | credit-card-number | credit-card-expiration | name | street-address | city | |
---|---|---|---|---|---|---|---|---|
0 | 18 | Machine Tool Fitter | +18650301105 | 4091 8347 1660 9247 | 04/16 | Teodoro Rose | 70 Castillo Viaduct | Trotwood |
1 | 26 | Mill Worker | +19311430557 | 4538 8217 4491 6273 | 11/22 | Pedro Edwards | 494 Lamartine Square | Berea |
2 | 27 | Health Nurse | +1-712-627-1654 | 4042 2888 2951 0224 | 11/20 | Dacia Bailey | 1300 Ord Point | Bellflower |
3 | 46 | Theatre Manager | +12781170521 | 3778 269896 73523 | 09/16 | Ken Cobb | 1160 Dutch Windmill Access Lake | Denton |
4 | 28 | Refractory Engineer | +1-602-714-4682 | 2291 8169 3723 1747 | 07/19 | Randal Foreman | 63 Battery East Turnpike | Naples |
이전과 동일한 계산을 수행합니다.
df[df.age > 30].occupation.value_counts().nlargest(10).compute()
occupation Chandler 24 Historian 24 Optical Advisor 24 Post Sorter 24 Machine Setter 23 Mill Operator 22 Pipe Inspector 22 Building Foreman 21 Cafe Owner 21 Church Officer 21 Name: count, dtype: int64[pyarrow]
'인공지능 > 빅데이터 SQL' 카테고리의 다른 글
데이터 산업 (0) | 2023.09.18 |
---|---|
Dask #6. 파이썬 Dask DataFrame (데이터프레임) (1) | 2023.08.28 |
Dask #4. 파이썬 Dask Array (0) | 2023.08.18 |
Dask #3. 파이썬 Dask 간단하게 사용해보기 (0) | 2023.08.18 |
Dask #2. 파이썬 Dask 설치 방법 (0) | 2023.08.17 |