import json
from datetime import timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.operators.email import EmailOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils import timezone

import requests

DAG_FOLDER = "/opt/airflow/dags"

def _get_weather_data():
    # assert 1 == 2

    # API_KEY = os.environ.get("WEATHER_API_KEY")
    API_KEY = Variable.get("weather_api_key")

    payload = {
        "q": "bangkok",
        "appid": API_KEY,
        "units": "metric"
    }
    url = "<https://api.openweathermap.org/data/2.5/weather>"
    response = requests.get(url, params=payload)
    print(response.url)

    data = response.json()
    print(data)

    with open(f"{DAG_FOLDER}/data.json", "w") as f:
        json.dump(data, f)

def _validate_data():
    with open(f"{DAG_FOLDER}/data.json", "r") as f:
        data = json.load(f)

    assert data.get("main") is not None

def _create_weather_table():
    pg_hook = PostgresHook(
        postgres_conn_id="weather_postgres_conn",
        schema="postgres"
    )
    connection = pg_hook.get_conn()
    cursor = connection.cursor()

    sql = """
        CREATE TABLE IF NOT EXISTS weathers (
            dt BIGINT NOT NULL,
            temp FLOAT NOT NULL
        )
    """
    cursor.execute(sql)
    connection.commit()

def _load_data_to_postgres():
    pg_hook = PostgresHook(
        postgres_conn_id="weather_postgres_conn",
        schema="postgres"
    )
    connection = pg_hook.get_conn()
    cursor = connection.cursor()

    with open(f"{DAG_FOLDER}/data.json", "r") as f:
        data = json.load(f)

    temp = data["main"]["temp"]
    dt = data["dt"]
    sql = f"""
        INSERT INTO weathers (dt, temp) VALUES ({dt}, {temp})
    """
    cursor.execute(sql)
    connection.commit()

default_args = {
    "email": ["[email protected]"],
    "retries": 3,
    "retry_delay": timedelta(minutes=1),
}
with DAG(
    "weather_api_dag",
    default_args=default_args,
    schedule="0 */3 * * *",
    start_date=timezone.datetime(2025, 2, 1),
    tags=["dpu"],
):
    start = EmptyOperator(task_id="start")

    get_weather_data = PythonOperator(
        task_id="get_weather_data",
        python_callable=_get_weather_data,
    )

    validate_data = PythonOperator(
        task_id="validate_data",
        python_callable=_validate_data,
    )

    create_weather_table = PythonOperator(
        task_id="create_weather_table",
        python_callable=_create_weather_table,
    )

    load_data_to_postgres = PythonOperator(
        task_id="load_data_to_postgres",
        python_callable=_load_data_to_postgres,
    )

    send_email = EmailOperator(
        task_id="send_email",
        to=["[email protected]"],
        subject="Finished getting open weather data",
        html_content="Done",
    )

    end = EmptyOperator(task_id="end")

    start >> get_weather_data >> validate_data >> load_data_to_postgres >> send_email
    start >> create_weather_table >> load_data_to_postgres
    send_email >> end

Configuring Email

ไฟล์ docker-compose.yaml

x-airflow-common:
  &airflow-common
  ...

  environment:
    AIRFLOW__SMTP__SMTP_HOST: mailhog
    AIRFLOW__SMTP__SMTP_STARTTLS: 'false'
    AIRFLOW__SMTP__SMTP_SSL: 'false'
    AIRFLOW__SMTP__SMTP_PORT: 1025
services:
  ...

  mailhog:
    image: mailhog/mailhog
    ports:
    - 1025:1025
	  - 8025:8025

Setting up Monitoring

Reference: https://github.com/sarahmk125/airflow-docker-metrics

mkdir -p grafana prometheus/volume
x-airflow-common:
  &airflow-common
  ...

  environment:
		AIRFLOW__SCHEDULER__STATSD_ON: True
		AIRFLOW__SCHEDULER__STATSD_HOST: statsd-exporter
		AIRFLOW__SCHEDULER__STATSD_PORT: 8125
		AIRFLOW__SCHEDULER__STATSD_PREFIX: airflow
services:
	...

	statsd-exporter:
	  image: prom/statsd-exporter
	  container_name: airflow-statsd-exporter
	  command: "--statsd.listen-udp=:8125 --web.listen-address=:9102"
	  ports:
	    - 9123:9102
	    - 8125:8125/udp
	
	prometheus:
	  image: prom/prometheus
	  container_name: airflow-prometheus
	  user: "0"
	  ports:
	    - 9090:9090
	  volumes:
	    - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
	    - ./prometheus/volume:/prometheus
	    
	grafana:
	  image: grafana/grafana:7.1.5
	  container_name: airflow-grafana
	  environment:
	    GF_SECURITY_ADMIN_USER: admin
	    GF_SECURITY_ADMIN_PASSWORD: password
	    GF_PATHS_PROVISIONING: /grafana/provisioning
	  ports:
	    - 3000:3000
	  volumes:
	    - ./grafana/volume/data:/grafana
	    - ./grafana/volume/datasources:/grafana/datasources
	    - ./grafana/volume/dashboards:/grafana/dashboards
	    - ./grafana/volume/provisioning:/grafana/provisioning