Big Data , ML/ETL, ELT

Airflow정의와 설치 과정

Dev.BeryL 2022. 2. 4. 15:12
728x90

에어플로우는 워크플로우 관리 플랫폼이며,  2014년에, 숙박 공유 서비스인 Airbnb에서 회사가 성장하면서, 점점 복잡해지는 워크플로우를 관리하기 위해서 개발되었다.
그 후에 지금까지 발전되어 많은 회사에서 사용되고 있고, 지금은 Apache 재단에서 최상위 프로젝트로 관리되고있다.

 

1. 워크플로우

순서가 있는 과정을 의미한다.

ex )  로그를 활용하기 위해 로그의 전처리, 가공 및 분석을 진행하고 처리한 결과를 확인하는 그 모든 과정들을 워크플로우라고 한다.

 

사용성 측면에서 루이지는 복잡한 API를 가지고 있어서 잘 알고 있으면 에어플로우보다 좀 더 유연하게 사용할 수 있지만 cron을 사용하기 때문에 때문에 확장성이 낮음.

반면, 에어플로우는 사이즈에 따라 로컬에서도 실행할 수 있지만 Kubernetes나 Celery를 사용하여 외부에서도 실행할 수도 있어서 확장성이 높다. 결론은 개발 난이도와 확장성 측면에서 에어플로우를 사용해볼만하다.

 

airflow architecture

  • 스케줄러는 스케줄들을 관리하고, Executor에 작업 실행을 요청
  • Executor는 Worker에게 요청을 하는데, 작업을 실제로 실행하는 Worker는 에어플로우를 구성하는 방법에 따라, 로컬에 둘 수도 있고, 외부에 확장성 있게 둘 수있음
  • 웹서버에서는 UI를 제공하고 사용자가 스케줄을 관리하고 디버깅역할
  • DAG Directory에는사용자가 작성한 스케줄 관련 코드가 존재
  • Metadata Database에는 스케줄러나 Executor, 웹 서버에서 사용하는 메타데이터를 저장

에어플로우에서 어떤 작업을 실행시키기 위해서는 Operator를 사용합니다. Operator는 cron처럼 bash 명령어를 실행시킬 수도 있고,

docker container를 실행 시킬 수도 있다. 이렇게 다양하게 실행시킬 수 있도록 미리 정의되어 있어서, 연결을 위해 직접 개발하고 유지보수 할 필요가 없음

 

Airflow의 특징

  1. 스케줄을 작성할 때 크론 표현식으로 사용해서 기존에 크론을 사용했다면 보다 쉬운 사용성
  2. 이미 지나간 과거의 스케줄을 관리가 가능
  3. 프로그래밍 방식으로 간단한 코드를 작성해서 스케줄들을 관리하며 비교적 자유롭다.
  4. 모니터링하고 관리를 위한 웹 형태의 대쉬보드를 제공하기에 시각적으로 확인이 가능

Airflow 동작원리

 

성공시에 녹색 ,

실패빨간색

실행중연두색

대기중회색으로 표기

 

 

 

 

설치 순서

  1. python 설치
  2. Airflow 설치

pip install airflow DAG정의파일과 Airflow플러그인이 저장되는 AIRFLOW_HOME 생성 AIRFLOW_HOME 환경변수 설정

  1. airflow version 실행 version이 샐행되면 AIRFLOW_HOME에 airflow.cfg가 생성된다.
  2. airflow db 초기화 airflow initdb 초기화하면 airflow_home에 airflow.db가 생성된다. sqlite가 default인데 sqlite의 경우 동시 액세스를 지원하지 않기 때문에 postgres나 mysql등을 고려해 보아야 한다.
  3. Airflow web server start airflow webserver 8080포트로 접속해서 airflow ui 방문

 

docker-compose 설치

docker-compose에 airflow를 배포하기 위해 docker-compose.yaml을 다운 curl -Lf0 'https://airflow.apache.org/docs/apache-airflow/2.1.3/docker-compose.yaml' > ./docker-compose.yaml

이 파일에는 아래와 같은 서비스 정의가 포함되어 있다

  • airflow-scheduler- 스케줄러 는 모든 작업과 DAG를 모니터링한 다음 종속성이 완료되면 작업 인스턴스를 트리거합니다.
  • airflow-webserver- 웹서버는 http://localhost:8080.
  • airflow-worker - 스케줄러에 의해 주어진 작업을 실행하는 작업자.
  • airflow-init - 초기화 서비스.
  • flower- 환경 모니터링을 위한 꽃 앱 . 에서 이용 가능 http://localhost:5555합니다.
  • postgres - 데이터베이스.
  • redis- 스케줄러에서 작업자에게 메시지를 전달하는 브로커.

환경 초기화 airflow를 처음 시작하기 전에 필요한 디렉터리를 만들고 데이터베이스를 초기화 해야한다.

mkdir -p ./dags ./logs ./plugins echo -e "AIRFLOW_UID=$(id-u)\nAIRFLOW_GID=0" > .env

환경변수를 설정해준 뒤 cat .env 를 하여 제대로 할당되었는지 확인해보자 다음으로 데이터베이스를 초기화하고 계정을 생성해야 한다. 초기 계정은 airflow / airflow이다

docker-compose up airflow-init


설정초기화

설정 초기화법

docker-compose down --volumes --remove-orphans docker-compose.yaml rm -rf ''

 

 

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:master-python3.8
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# AIRFLOW_GID                  - Group ID in Airflow containers
#                                Default: 0
#
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.3}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-apache-spark}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
  depends_on:
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.1.0
        min_airlfow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airlfow_version_comparable )); then
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo -e "\033[1;31mERROR!!!: AIRFLOW_UID not set!\e[0m"
          echo "Please follow these instructions to set AIRFLOW_UID and AIRFLOW_GID environment variables:
            https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#initializing-environment"
          exit 1
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:${AIRFLOW_GID}" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
    user: "0:${AIRFLOW_GID:-0}"
    volumes:
      - .:/sources

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

volumes:
  postgres-db-volume:
반응형