_src.yml
version: 2
sources:
- name: weathers
schema: public
database: postgres
tables:
- name: weathers
_models.yml
version: 2
models:
- name: stg_weathers
description: Staging weather data
columns:
- name: temp
description: Temperature
Converting EPOCH time to timestamp in PostgreSQL
select
TIMEZONE('Asia/Bangkok', TO_TIMESTAMP(dt)) as timestamp
, temp
from {{ source('open_weathers', 'weathers') }}
version: 2
models:
- name: average_temperature_in_bangkok
description: Average temperature in Bangkok
columns:
- name: avg_temp
description: Averate temperature
tests:
- not_null
แก้ไฟล์ .env
AIRFLOW_UID=1000
_PIP_ADDITIONAL_REQUIREMENTS=dbt-core~=1.7.0 dbt-postgres~=1.7.0 astronomer-cosmos==1.9.0
เสร็จแล้วกด Ctrl+C ที่ Terminal ที่รัน Docker แล้วสั่ง
docker compose down
docker compose up
ไฟล์ dbt_dag.py
ในโฟลเดอร์ dags
import os
from airflow.utils import timezone
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="weather_postgres_conn",
profile_args={"schema": "dbt_kan"},
),
)
dbt_dag = DbtDag(
dag_id="dbt_dag",
project_config=ProjectConfig(
"/opt/airflow/dbt/weather",
),
profile_config=profile_config,
schedule_interval="@daily",
start_date=timezone.datetime(2025, 2, 22),
catchup=False,
default_args={"retries": 2},
tags=["dpu"],
)