ML_DL/MUJAKJUNG (무작정 시리즈)

Elasticsearch, Airflow 활용하기 - 3. Airflow 환경설정

swwho 2025. 3. 4. 01:35
728x90
반응형

Airflow 설치

cd /airflow/
docker-compose up

DAG 파일 생성

  • FastAPI의 서버와 Docker로 실행한 Airflow는 localhost라도 기본적으로 통신할 수 없기 때문에,  host.docker.internal:8000으로 해야 api 통신이 가능하다.
  • PythonOperator는 airflow의 operator의 한 종류로, python 함수를 실행시켜주는 operator이다.
  • schedule_interval은 해당 DAG의 실행 주기를 결정한다. UNIX 기반의 time scheduler인 cron 형식을 지원한다. 순서대로 * * * * * (분(0~59), 시간(0~23), 일(1~31), 월(1~12), 요일(0~6)(일~토)) 를 나타낸다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests

def index_articles():
    url = "http://host.docker.internal:8000/index_articles"
    response = requests.get(url)



default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2024, 3, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=1)
}

dag = DAG(
    "fetch_articles",
    default_args=default_args,
    description="기사 최신화 테스트",
    schedule_interval="*/5 0 * * *",
)

news_update_task = PythonOperator(
    task_id="fetch_and_store_articles",
    python_callable=index_articles,
    dag=dag
)

news_update_task

Airflow 웹 UI

  • docker-compose를 통해 Airflow를 실행하면, localhost:8080으로 웹 UI를 확인할 수 있다.
  • 위에서 작성한 DAG가 정상적으로 실행되기 위해서는 아래 확인 사항을 점검해야 한다.
    • Elasticsearch 실행 (elasticsearch.bat)
    • FastAPI 서버 실행 (uvicorn backend.news_main:app --reload)
    • Airflow DAG 파일의 API URL 확인
  • DAG파일을 Actions로 삭제할 경우, DB에 메타데이터가 저장되기 때문에 복원을 위해서 DB를 초기화 해야 한다.
  • Runs에 success가 보인다면 정상적인 실행이다. (에러 발생이나 실패시 자세한 log 확인 가능)

Airflow Web UI