Big Data , ML/ETL, ELT

Airflow dag 인자 받기와 병렬처리법

Dev.BeryL 2022. 2. 4. 15:16
728x90

Airflow 에서 실행하는 task에 인자를 받는 함수가 있을 경우에 해당 방법을 사용하면 된다.

 

def call_api(category, **kwargs):
import json
import req
import requests
url = f'http://test/{category}'

category 값을 넘겨받아 사용하고싶은 경우

 

call_api_task1 = PythonOperator(
  task_id = 'call_api_task1',
  python_callable = call_api,
  op_kwargs = {'category': '카테고리 값'},
  dag = dag
)

병렬 처리로 수행을 하고 싶은 경우

하단 처리 pipeline에서 [] 로 묶어서 실행하면 병렬처리가 가능하다.

 

start_task >> [task1, task2] >> end_task

[task1, task2] >> end_task

병렬 처리 결과

 

반응형