Data Modeling

_src.yml

version: 2

sources:
  - name: weathers
    schema: public
    database: postgres

    tables:
      - name: weathers

_models.yml

version: 2

models:
  - name: stg_weathers
    description: Staging weather data
    columns:
      - name: temp
        description: Temperature

Converting EPOCH time to timestamp in PostgreSQL

select
    TIMEZONE('Asia/Bangkok', TO_TIMESTAMP(dt)) as timestamp
    , temp

from {{ source('open_weathers', 'weathers') }}
version: 2

models:
  - name: average_temperature_in_bangkok
    description: Average temperature in Bangkok
    columns:
      - name: avg_temp
        description: Averate temperature
        tests:
          - not_null

Scheduling dbt

แก้ไฟล์ .env

AIRFLOW_UID=1000
_PIP_ADDITIONAL_REQUIREMENTS=dbt-core~=1.7.0 dbt-postgres~=1.7.0 astronomer-cosmos==1.9.0

เสร็จแล้วกด Ctrl+C ที่ Terminal ที่รัน Docker แล้วสั่ง

docker compose down
docker compose up

ไฟล์ dbt_dag.py ในโฟลเดอร์ dags

import os

from airflow.utils import timezone

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=PostgresUserPasswordProfileMapping(
        conn_id="weather_postgres_conn",
        profile_args={"schema": "dbt_kan"},
    ),
)

dbt_dag = DbtDag(
    dag_id="dbt_dag",
    project_config=ProjectConfig(
        "/opt/airflow/dbt/weather",
    ),
    profile_config=profile_config,
    schedule_interval="@daily",
    start_date=timezone.datetime(2025, 2, 22),
    catchup=False,
    default_args={"retries": 2},
    tags=["dpu"],
)