티스토리 뷰

develop

Airflow 기본 시용법 정리

yogae 2021. 1. 22. 16:53

Python을 사용하여 행동 로그를 분석하고 있다. 스크립트를 작성하여 필요할 때마다 실행하여 분석 데이터를 수집하였다. 로그 데이터를 분석을 시작하니 정말 다양한 요구사항이 있었다. 이러한 요구사항에 따라 script를 작성하다보니 script가 많아졌고 많은 script를 관리해야하는 문제가 생겼다. 또한, script간의 순서가 생기면서 workflow를 정의하고 관리하는 일이 많아졌다.

 

AWS에서 airflow를 관리형 서비스로 출시했다는 소식을 듣게 되었다. AWS의 관리형 airflow를 사용하면 좋지만 현재 python script를 관리의 목적 및 분석 workflow구성을 위해서만 사용하고 있어서 필요할 때마다 airflow를 local 환경에서 실행하여 사용하기로 했다. script를 주기적으로 실행해야하는 경우나 여러사람이 공유해야한다면 MWAA(Amazon Managed Workflows for Apache Airflow)를 찾아보면 좋을 것이다.

 

Basic Airflow architecture

Airflow 설치

export AIRFLOW_HOME=~/airflow

pip install apache-airflow

airflow의 설치는 위와같이 쉽게 할 수 있다. $AIRFLOW_HOME에 설정한 폴더에 airflow 설정 file airflow.cfg, sqlite db file airflow.db이 생성된다.

데이터 베이스

airflow db init

airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

db를 초기화 하고 사용자를 생성한다.

airflow를 상태를 저장하기 위해 database를 사용한다. 기본으로 sqlite를 사용한다. 동시에 여러 task를 실행하기 위해서는 MySQL 또는 PostgreSQL을 사용해야한다. sqlite를 사용하는 경우 병렬로 task를 실행하여도 하나씩 처리하게 된다.

Airflow web server 실행

# start the web server, default port is 8080
airflow webserver --port 8080

# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler

Airflow Webserver와 Scheduler는 독립된 process에서 실행된다. Webserver만 실행하고 0.0.0.0:8080으로 접속하면 Airflow Web page를 볼 수 있다. 하지만 DAG를 실행하게 되면 Task가 실행중으로 남게 된다. Task가 계속 실행 중으로 남아있다면 Airflow scheduler를 확인해보자!

Airflow web server를 처음 실행하면 위의 사진과 다르게 여러개의 예제 DAGs를 볼수 있다. 예제 DAGs를 삭제하려면 airflow.cfgload_examples을 False로 설정해하고 DB를 초기화 해야한다. load_examples값만 False로 변경하면 예제 DAGs가 그대로 남아 있다.

위쪽 끝에 위치한 pause 토글 버튼이 처음 생성되었을 때는 pause로 되어 있다. DAG가 pause되어 있어도 DAG를 실행할 수는 있지만 task가 실행되지 않는다. 실행전에 pause 버튼을 확인하기 바란다.

DAG 작성

Airflow의 DAG 작성은 Python으로 작성해야한다.

from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['example'],
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

t1 >> [t2, t3]

Task를 정의할 때는 operator를 사용한다. Airflow에서 제공하는 operator는 여러가지가 있다. 위의 예제에서와 같은 Bash을 실행하는 BashOperator와 python을 실행하는 PythonOperator가 있다. DB와 aws s3 등 저장소 ETL 작업과 관련된 operator 등 다양한 operator를 제공한다. operator를 custom하게 생성하여 사용할 수도 있다.

 

Creating a custom Operator — Airflow Documentation

Airflow params 전달

Trigger DAG버튼을 클릭하면 위와 같은 json 입력창이 보인다. json형식으로 Task에 전달할 params를 작성하고 Trigger하면 params를 전달할 수 있다.

from airflow import DAG
from airflow.operators.python import PythonOperator

args = {
    'owner': 'airflow',
}

dag = DAG(
    dag_id='analysis_dag',
    default_args=args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['example'],
)

def process_func(ds, **kwargs):
    params = kwargs['params']
    START_TIMESTAMP = params["START_TIMESTAMP"]
    END_TIMESTAMP = params["END_TIMESTAMP"]
      ...

PythonOperator(
  task_id='process_task',
  python_callable= process_func,
  dag=dag,
)

sh로 실행하는 경우

airflow dags trigger {dag_id} -c '{"START_TIMESTAMP": "START_TIMESTAMP", "END_TIMESTAMP": "END_TIMESTAMP"}'

airflow tags test {dag_id} {execution_date} -t '{"START_TIMESTAMP": "START_TIMESTAMP", "END_TIMESTAMP": "END_TIMESTAMP"}'

Variables

DB url과 같은 보안이 필요한 값은 Admin > Variables에 등록하여 사용할 수 있다.

from airflow.models import Variable

def example_func(ds, **kwargs):
    DB = Variable.get("DB_LOCAL")
      ....

PythonOperator(
    task_id='example_task',
    python_callable=example_func,
    dag=dag,
)

Task간 message 전송

Task간 message를 전송하는 방법이 조금 까다롭다. Task간에 message가 작은 경우(48KM 이하) XCOM을 사용하여 message를 전달할 수 있다. message 크기 제한 때문에 가능한 다른 저장소를 사용하고 XCOM에는 간단한 key 값이나 path값만 전달하여 사용하는 방식이 맞다고 생각된다.

from airflow.operators.python import PythonOperator

def example_xcom_push(ds, **kwargs):
     kwargs['ti'].xcom_push(
      key="example_xcom",value=str(1))

def example_xcom_pull(ds, **kwargs):
     pull_value = kwargs['ti'].xcom_pull(
        task_ids='', key='example_xcom')
     print(pull_value)    

push_task = PythonOperator(
   task_id='example_xcom_push_task',
   python_callable= example_xcom_push,
   dag=dag,
)

pull_task = PythonOperator(
   task_id='example_xcom_pull_task',
   python_callable= example_xcom_pull,
   dag=dag,
)

push_task >> pull_task

큰 사이즈 message 경우 local filesystem을 사용하거나 다른 저장소를 사용하여 전달해야 한다.

AWS configuration

Amazon Web Services Connection — apache-airflow-providers-amazon Documentation

Reference

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/04   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함