[Dask] dask 사용 기본 [1편]
이번 게시물에는 지난번 게시글에서 알아보던 리눅스 명렁어를 계속해서 알아보도록 하겠습니다.
1. Dask란?
Dask는 정형 데이터로된 데이터를 분산 및 병렬처리를 통해 대규모 데이터 처리를 효율적으로 할 수 있게 도와줍니다. 특히 compute() 함수를 통해 실제 연산을 실행하는데 이전까지는 작업 그래프를 생성하고 실제 메모리를 사용하지 않아 메모리를 효율적으로 사용 할 수 있습니다.
2. 기본 사용법
대표적인 작업 순서로는 하나 또는 여러개의 정형 데이터를 로드한 후 데이터 처리 후에 하나 또는 여러개의 파일로 저장하거나 활용합니다. 아래 대표 예시를 보여드리겠습니다.
Dask 사용
1
2
3
4
5
6
7
8
def filter_last_two_digits_00(df, column_name):
return df[df[column_name].astype(str).str.endswith('00')]
csv_202404_li = sorted(glob.glob(os.path.join('/yellow_dust/data/stn_grid_datasets/202404/aeropm10/*.csv'))) # 여러개의 csv 파일 경로를 리스트로 만듦
df_202404 = dd.read_csv(csv_202404_li) # csv read
filtered_df = df_202404.map_partitions(filter_last_two_digits_00, 'now') # 데이터 처리
df_pd_202404 = filtered_df.compute() # 실제 연산
df_pd_202404.to_csv('dask_test.csv', index=False)
Pandas 사용
1
2
3
4
5
6
7
8
9
10
11
csv_202404_li = sorted(glob.glob(os.path.join('/yellow_dust/data/stn_grid_datasets/202404/aeropm10/*.csv')))
df_list = []
for idx, file in enumerate(csv_202404_li):
each_df = pd.read_csv(file)
filtered_df = each_df[each_df['now'].astype(str).str.endswith('00')]
df_list.append(filtered_df)
total_df = pd.concat(df_list)
total_df.to_csv('pandas_test.csv', index=False)
메모리가 부족할 경우 데이터 처리를 통해 필요한 데이터만 필터링 하여 실제 연산을 하는 Dask가 효율적입니다. 또한 데이터가 많아질수록 Dask를 활용한 방법이 속도가 더 빠를 수가 있습니다. 다만 파일 수가 싱글 프로세스로 충분히 빠르게 처리 가능할 경우는 Pandas를 이용한 반복문이 더 빠를 수 있습니다.
This post is licensed under CC BY 4.0 by the author.