อ่านรายละเอียดเกี่ยวกับข้อมูล Weather และการลงทะเบียนได้ที่ Dataset: Weather Data
ในเวิร์คชอปนี้เราจะลองตั้ง schedule ของ data pipelines ของเรากัน ซึ่งเราสามารถใช้ Crontab.guru เพื่อดูว่าเราจะเขียน Cron expression อย่างไรได้บ้าง
เราจะสร้าง 5 DAGs ตาม schedule ด้านล่างนี้ โดยที่แต่ละ DAG จะมีการใช้งาน BashOperator
กับ PythonOperator
0 0 * * 1
หรือ @daily
@monthly
0 0 * * 1
30 8 * * 2
0 17 1,16 * *
เราอาจจะใช้โค้ดด้านล่างนี้เป็นโค้ดตั้งต้น แล้วปรับ Schedule จาก None
ให้เป็นไปตามโจทย์
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils import timezone
with DAG(
dag_id="my_dag",
schedule=None,
start_date=timezone.datetime(2024, 1, 27),
catchup=False,
):
hello = BashOperator(
task_id="hello",
bash_command="echo 'Hello'",
)
world = BashOperator(
task_id="world",
bash_command="echo 'Word!'",
)
hello >> world
จาก DAG ที่เราสร้างเพื่อใช้ดึงข้อมูล Weather มา ให้เราตั้ง Schedule ให้รันทุก ๆ 1 ชม.
ไฟล์ชื่อ weather_api_dag.py
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
import requests
def _get_weather_data():
API_KEY = "YOUR_API_KEY"
# API_KEY = os.environ.get("WEATHER_API_KEY")
# API_KEY = Variable.get("weather_api_key")
payload = {
"q": "bangkok",
"appid": API_KEY,
"units": "metric"
}
url = "<https://api.openweathermap.org/data/2.5/weather>"
response = requests.get(url, params=payload)
print(response.url)
data = response.json()
print(data)
with DAG(
"weather_api_dag",
schedule="@hourly",
start_date=timezone.datetime(2024, 1, 27),
):
start = EmptyOperator(task_id="start")
get_weather_data = PythonOperator(
task_id="get_weather_data",
python_callable=_get_weather_data,
)
end = EmptyOperator(task_id="end")
start >> end