Super-charge Your SaaS & LLM Workflows
Part 3: Working with DAGs
Overview
By the end of this post, you'll have run and modified your first DAG. You'll also have learned about best practices when working with DAGs. You'll be ready to move on to the next step: integrating LLMs.
Table of Contents
Accessing the UI
Let's start by accessing the Airflow UI. Open up your browser and navigate to http://localhost:8080/login
You'll need to sign in the first time you access the UI. The credentials we configured in Part 2 are:
Username: admin
Password: password

Troubleshooting
Port 8080 Already in Use
If you see an error like "bind: address already in use", another process is using port 8080.
Solution: Either stop the other process or modify the port mapping in docker-compose.yml from "8080:8080" to something like "8081:8080".
DAGs Not Appearing in the UI
If your DAGs aren't showing up in the Airflow UI, this is usually due to a parsing error or incorrect file permissions between your host machine and the container.
Solution: Check the scheduler logs using docker-compose logs airflow-scheduler
. Make sure there weren't any errors associated to file permissions while docker-compose started up.
Database Connection Issues
If you see "OperationalError: unable to connect to database" or similar errors.
Solution: Ensure Postgres is running with docker-compose ps
. If needed, reset the database with:
Warning: The following commands will delete all existing DAG run history and metadata!
docker-compose down --volumes
docker-compose up
Docker Desktop Connection Error
If you see an error containing "open //./pipe/dockerDesktopLinuxEngine: The system cannot find the file specified".
Solution: This usually means Docker Desktop isn't running. Start Docker Desktop and try again. If the error persists, try restarting Docker Desktop.
Incorrect Password
If you've changed the Airflow username and password variables in your environment and they're not working, its likely because the previous set of credentials are being used for the Airflow user profile. You'll need to either delete the user profile, rebuild the container, or reset the Airflow database.
Solution: This usually means Docker Desktop isn't running. Start Docker Desktop and try again. If the error persists, try restarting Docker Desktop.
Working with DAGs
Running a DAG
You'll see the sequential
DAG listed in the UI along with many other example DAGs. Scroll down to find it.
Step 1: Before you can run the DAG, you'll need to unpause it. Newly deployed DAGs are paused by default to avoid inadvertent runs. This is a common gotcha, so keep it in mind.

Step 2: Once unpaused, you can trigger the DAG by pressing the play button, followed by selecting "Trigger DAG" from the dropdown menu.

Note on UI Updates
I've noticed that the UI on a locally hosted Airflow instance sometimes doesn't update immediately after triggering a DAG. If this happens, just refresh the page.
Step 3: Make sure that Auto-refresh is toggled on in the top-right corner of the UI. This will allow the UI to update with the latest DAG statuses.

Step 4: You'll notice that the DAG run failed.

Let's take a look at the logs to see what happened.
Inspecting A Failed Task
When a task fails, you need to inspect the logs to understand what went wrong.
Step 1: Click on the failed tasks counter in the UI.

Step 2: Then, click on the failed DAG's "Run Id" link.

Step 3: Select the "Logs" tab to view the task's execution logs and error details.

Step 4: The error is plainly spelled out for you in the task logs. Open dags/sequential.py
in the cloned repository on your host machine using your preferred IDE. Fix the issue and save the file.
Note on Development Environment
While Dev Containers allow direct file editing within the Airflow container, this is not recommended since the git repository isn't initialized inside the container.
Rerunning: the DAG or the task?
You have two options and the better choice largely depends on which task failed and the amount of work performed by upstream tasks that succeeded. While rerunning the DAG is easier to invoke on the UI, it's not always the best option if it means waiting for all upstream tasks to run again. You may also need to consider the state of your system and how those upstream tasks may have changed that state.
Option 1: New DAG Run
Click the same play button as before.

This will trigger a completely new run with a different run ID. Subsequently, new instances of all tasks will be executed.
Option 2: Clear Failed Task
First, click on the failed tasks counter in the UI.

Then, click on the failed DAG's "Run Id".

The DAG run's graph view should already be in focus. If not, open it by clicking on the "Graph" tab.

Click on the failed task in the DAG graph to put it into focus.

Then, click on "Clear Task". Followed by "Clear" in the dialog window that pops up.

After a short while, you should see that the DAG ran successfully.

Understanding DAG Parsing
Before we dive into best practices, it's important to understand how Airflow parses DAG files. Let's break down the sequential
DAG into three sections to understand which parts get executed during parsing versus runtime:
1. Imports and Setup
This section is executed every time the DAG file is parsed by the scheduler:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.decorators import task
from datetime import datetime
2. Task Definitions
The content of task definitions is only executed when the task runs:
@task
def get_timestamp():
return datetime.now()
@task
def format_timestamp(timestamp):
return timestamp.strftime("%Y-%m-%d %H:%M:%S")
@task
def print_timestamp(formatted_timestamp):
print(f"The provided timestamp is {formatted_timestamp}")
3. DAG Definition
The DAG context and task dependencies are evaluated every time the DAG file is parsed. When tasks are seemingly invoked (ex. timestamp = get_timestamp()
), Task objects are being created so that the tasks and their return values can be used to define the DAG's structure:
with DAG(
dag_id="sequential",
description="A DAG made up of sequential tasks",
start_date=days_ago(1),
schedule_interval="@daily",
) as dag:
timestamp = get_timestamp()
formatted_timestamp = format_timestamp(timestamp)
print_timestamp(formatted_timestamp)
The scheduler periodically parses all DAG files to check for updates. This means sections 1 and 3 are executed constantly throughout the day unless set otherwise, which is why it's crucial to keep these sections lightweight. Heavy operations like API calls or database queries should be moved inside task definitions (section 2) where they'll only be executed when the task actually runs.
Best Practices
I want to save you time and headaches with best practices I've learned working with Airflow in production. Keep these in mind as you start putting together DAGs for your own workflows using the local Airflow instance you now have running.
Minimal Processing in DAG Definitions
Avoid complex processing in DAG definitions. The scheduler parses DAG files frequently to check for updates - any heavy processing here will slow down the scheduler. Keep imports light, avoid API calls, and defer data processing to task execution time.
Discouraged
Loading configuration during DAG parsing
import requests
# The following API call runs during DAG parsing!
# If the response must be used to dynamically define the workflow
# then consider adding a caching mechanism when the API call is expensive,
# or there's a risk of rate limiting.
response = requests.get('https://api.config.com/settings')
config = response.json()
@task
def process_data():
return config['processing_settings']
with DAG('your_dag',
start_date=days_ago(1),
schedule_interval='@daily',
) as dag:
process_data()
Recommended
Loading configuration during task execution
import requests
@task
def get_config():
response = requests.get('https://api.config.com/settings')
return response.json()
@task
def process_data(config):
return config['processing_settings']
with DAG('your_dag', ...) as dag:
config = get_config()
process_data(config)
Delegation of Work
The scheduler can only service a limited number of tasks at a time. Tasks that involve long-running, data-heavy operations should run a containerized job and be paired with a Sensor to wait for completion.
Discouraged
Using a task that blocks an Airflow worker while waiting for delegated work to complete
import time
import requests
@task
def process_and_wait():
# Starts the processing job
response = requests.post(
"https://processing-service.example.com/jobs",
json={
"input_path": "s3://bucket/huge_file.csv",
"output_path": "s3://bucket/output.csv"
}
)
job_id = response.json()["job_id"]
# Blocks worker with polling loop
while True:
response = requests.get(
f"https://processing-service.example.com/jobs/{job_id}"
)
status = response.json()["status"]
if status == "completed":
break
if status == "failed":
raise AirflowException("Processing job failed")
# Sleeps for 30 seconds before checking again
# The worker cannot service other DAG's tasks during this time
# and you only have a limited number of workers
time.sleep(30)
with DAG('process_data_dag',
start_date=days_ago(1),
schedule_interval='@daily',
) as dag:
job = process_and_wait()
Recommended
Using a non-blocking Sensor to wait for delegated work to complete
from airflow.sensors.base import BaseSensorOperator
import requests
# Replace with EcsTaskStateSensor or your own custom sensor
# implementation depending on what process was launched upstream
class ProcessingJobSensor(BaseSensorOperator):
def __init__(
self,
*,
job_id: str,
api_url: str,
**kwargs
) -> None:
super().__init__(**kwargs)
self.job_id = job_id
self.api_url = api_url
def poke(self, context) -> bool:
response = requests.get(f"{self.api_url}/jobs/{self.job_id}")
status = response.json()["status"]
if status == "completed":
return True
if status == "failed":
raise AirflowException("Processing job failed")
return False
@task
def start_processing():
# Start a pretend processing job via API.
# This also pairs perfectly with containerized jobs launched
# using EcsRunTaskOperator and KubernetesPodOperator
response = requests.post(
"https://processing-service.example.com/jobs",
json={
"input_path": "s3://bucket/huge_file.csv",
"output_path": "s3://bucket/output.csv"
}
)
return response.json()["job_id"]
with DAG('process_data_dag',
start_date=days_ago(1),
schedule_interval='@daily',
) as dag:
job = start_processing()
wait_for_processing = ProcessingJobSensor(
task_id='wait_for_processing',
job_id="{{ task_instance.xcom_pull(task_ids='start_processing') }}",
api_url="https://processing-service.example.com",
mode="reschedule" # reschedule will cause the task to be retried without blocking the worker
)
job >> wait_for_processing
Idempotent Tasks
Design tasks to be safely retriable without side effects. This means a task can be run multiple times with the same input and produce the same output.
Discouraged
Using non-deterministic paths and INSERT statements
@task
def process_sales_data(date):
# Non-deterministic filename will change on every run
output_path = f's3://bucket/sales_{datetime.now().timestamp()}.csv'
# Non-idempotent database operation
with postgres.connect() as conn:
conn.execute("""
INSERT INTO sales_summary (date, total)
SELECT %s, SUM(amount)
FROM daily_sales
WHERE date = %s
""", [date, date])
return output_path
with DAG(
'sales_processing',
start_date=days_ago(1),
schedule_interval='@daily'
) as dag:
# Passing the DAG's logical date's datestring, which might not be
# the same date as when you rerun the DAG for whatever reason
process_sales_data('{{ ds }}')
Recommended
Using deterministic paths and UPSERT statements
@task
def process_sales_data(date):
# Deterministic filename based on logical date
output_path = f's3://bucket/sales_{date:%Y/%m/%d}.csv'
# Idempotent database operation
with postgres.connect() as conn:
conn.execute("""
INSERT INTO sales_summary (date, total)
SELECT %s, SUM(amount)
FROM daily_sales
WHERE date = %s
ON CONFLICT (date)
DO UPDATE SET total = EXCLUDED.total
""", [date, date])
return output_path
with DAG(
'sales_processing',
start_date=days_ago(1),
schedule_interval='@daily'
) as dag:
process_sales_data('{{ ds }}')
Atomic Tasks
Keep tasks focused and independent for better maintainability. Each task should do one thing well and be self-contained. This also makes tasks reusable and easier to test.
Discouraged
One large task doing multiple operations
@task
def process_and_load_data():
# Download data
response = requests.get('https://api.example.com/data')
data = response.json()
# Process data
df = pd.DataFrame(data)
df['processed'] = df['value'].apply(complex_calculation)
# Load to database and S3
df.to_sql('processed_data', engine)
df.to_parquet('s3://bucket/processed_data.parquet')
# Send notification
send_slack_message("Data processing complete!")
Recommended
Multiple focused tasks with clear dependencies
@task
def fetch_data():
response = requests.get('https://api.example.com/data')
return response.json()
@task
def process_data(raw_data):
df = pd.DataFrame(raw_data)
df['processed'] = df['value'].apply(complex_calculation)
return df.to_dict()
@task
def load_to_database(processed_data):
df = pd.DataFrame(processed_data)
df.to_sql('processed_data', engine)
return "Database load complete"
@task
def save_to_s3(processed_data):
df = pd.DataFrame(processed_data)
path = 's3://bucket/processed_data.parquet'
df.to_parquet(path)
return path
@task
def notify_completion(db_status, s3_path):
send_slack_message(
f"Processing complete!\nDB Status: {db_status}\nS3 Path: {s3_path}"
)
with DAG('atomic_processing_dag', ...) as dag:
data = fetch_data()
processed = process_data(data)
db_status = load_to_database(processed)
s3_path = save_to_s3(processed)
notify_completion(db_status, s3_path)
Just-in-Time Tokens
Fetch credentials that expire when needed, not during DAG parsing or in separate upstream tasks. The scheduler might not get to a task before the token expires when processing many parallel DAG runs.
Additionally, passing tokens between tasks exposes them as plaintext XComs values, which can be a security risk.
Discouraged
Fetching tokens during DAG parsing or in upstream tasks
@task
def fetch_token_task():
# Token fetched in upstream task
return get_oauth_token()
@task
def process_data(token):
# Token passed from upstream task, it might be expired by now
# if all available workers were busy servicing other DAG's tasks
# before returning to this one
headers = {"Authorization": f"Bearer {token}"}
response = requests.get("https://api.example.com/data", headers=headers)
return process(response.json())
with DAG('token_dag_bad', ...) as dag:
# Fetch token once and pass to downstream task
token = fetch_token_task()
process_data(token)
Recommended
Fetching tokens within the task that needs them
@task
def process_data():
# Token fetched just before it's needed
token = get_oauth_token()
headers = {"Authorization": f"Bearer {token}"}
response = requests.get("https://api.example.com/data", headers=headers)
return process(response.json())
with DAG('token_dag_good', ...) as dag:
process_data()
Dynamic Workflows
When building dynamic workflows, prefer task-level dynamism over DAG-level dynamism.
Discouraged
Complex conditional logic during parsing
config = fetch_config()
tasks = []
for item in config['items']: # Loop during parsing
if item['enabled']:
@task
def process_item(item=item): # Dynamic task creation
process_data(item)
tasks.append(process_item())
with DAG('dynamic_dag_bad', ...) as dag:
for i, task in enumerate(tasks[:-1]):
task >> tasks[i + 1]
Recommended
Using dynamic task mapping
@task
def get_config():
# Config fetched during execution
return fetch_config()['items']
@task
def process_item(item):
return process_data(item)
@task
def summarize_results(results):
return {
'processed_count': len(results),
'success_count': len([r for r in results if r['status'] == 'success'])
}
with DAG('dynamic_dag_good', ...) as dag:
config = get_config()
results = process_item.expand(item=config)
summarize_results(results)
One-time Workflows
For one-time data processing jobs, define them as Airflow DAGs rather than standalone scripts. Just as you version control database migrations, tracking one-time workflows in your DAGs repository provides the following benefits: version history, execution tracking, automatic retries, and comprehensive logging. This approach makes workflows reproducible, shareable with teammates, and easily convertible to recurring jobs if needed later. After successful execution, remove the DAG definition to keep your Airflow instance clean and prevent accidental reruns.
Discouraged
Running scripts manually outside Airflow
import pandas as pd
import psycopg2
from datetime import datetime
def main():
try:
# Manual connection management
conn = psycopg2.connect("postgresql://user:pass@host/db")
# No automatic retries
df = pd.read_csv('old_data.csv')
df['processed'] = df['value'] * 1.1
# No execution tracking
df.to_sql('new_table', conn)
print(f"Migration completed at {datetime.now()}")
except Exception as e:
print(f"Error: {e}")
# Manual error handling
if __name__ == "__main__":
main()
Recommended
Defining one-time workflow as DAG
@task(retries=3, retry_delay=timedelta(minutes=5))
def load_legacy_data():
return pd.read_csv('old_data.csv').to_dict()
@task
def transform_data(data):
df = pd.DataFrame(data)
df['processed'] = df['value'] * 1.1
return df.to_dict()
@task
def save_to_new_table(data):
hook = PostgresHook(postgres_conn_id='your_postgres')
df = pd.DataFrame(data)
df.to_sql('new_table', hook.get_sqlalchemy_engine())
return {'rows_processed': len(df)}
@task
def notify_completion(stats):
send_slack_message(
f"Data migration completed. {stats['rows_processed']} rows processed."
)
with DAG(
'one_time_migration_2024_03',
schedule_interval=None, # Manual triggers only
start_date=days_ago(1),
catchup=False,
tags=['one-time', 'migration'],
) as dag:
data = load_legacy_data()
processed = transform_data(data)
stats = save_to_new_table(processed)
notify_completion(stats)
Object Storage
Use object storage services like AWS S3 for data persistence between tasks when intermediate results are larger than what can fit into XComs. Container filesystems are ephemeral and might not be shared between workers certain configurations of Airflow.
Discouraged
Using local filesystem for data passing
@task
def generate_report():
df = process_data()
# Using local filesystem
df.to_csv('/tmp/report.csv')
return '/tmp/report.csv'
@task
def send_report(file_path):
# File might not exist if task runs on different worker
with open(file_path, 'rb') as f:
send_email(
to='team@company.com',
subject='Daily Report',
attachment=f
)
with DAG('report_dag_bad', ...) as dag:
file_path = generate_report()
send_report(file_path)
Recommended
Using S3 for data persistence
@task
def generate_report(execution_date):
df = process_data()
# Using S3 with date partitioning
path = f's3://your-bucket/reports/{execution_date:%Y/%m/%d}/report.csv'
df.to_csv(path)
return path
@task
def send_report(file_path):
s3_hook = S3Hook()
with s3_hook.get_key(file_path).get()['Body'] as f:
send_email(
to='team@company.com',
subject='Daily Report',
attachment=f
)
# Optionally move to archive location
archive_path = file_path.replace('/reports/', '/archive/reports/')
s3_hook.copy_object(file_path, archive_path)
with DAG('report_dag_good', ...) as dag:
file_path = generate_report('{{ ds }}')
send_report(file_path)
Error Handling
Implement comprehensive error handling with appropriate retry strategies. Use Airflow's retry mechanisms with exponential backoff for transient failures.
Discouraged
Basic try-except without proper retry handling
@task
def process_api_data():
try:
response = requests.get('https://api.example.com/data')
data = response.json()
process_data(data)
except Exception as e:
# Generic error handling
print(f"Error: {e}")
# No retries for transient failures
# No error classification
# No alerts
raise
with DAG('basic_error_dag', ...) as dag:
process_api_data()
Recommended
Comprehensive error handling with retries
from airflow.exceptions import AirflowException
from requests.exceptions import RequestException
import tenacity
class DataQualityError(AirflowException):
"""Raised when data doesn't meet quality standards."""
pass
@task(
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
max_retry_delay=timedelta(minutes=30),
# Retry only on specific exceptions
retry_on_exception=lambda e: isinstance(e, RequestException)
)
def process_api_data():
try:
response = requests.get(
'https://api.example.com/data',
timeout=10
)
response.raise_for_status()
data = response.json()
# Validate data quality
if not validate_data(data):
raise DataQualityError("Data validation failed")
process_data(data)
except RequestException as e:
# Network/API errors - will be retried
send_alert("API Error", f"Retrying due to: {str(e)}")
raise
except DataQualityError as e:
# Data quality issues - no retry
send_alert("Data Quality Error", str(e), severity="high")
raise
except Exception as e:
# Unexpected errors
send_alert(
"Critical Error",
f"Unexpected error: {str(e)}",
severity="critical"
)
raise
with DAG('robust_error_dag', ...) as dag:
process_api_data()
Conclusion
You've now learned how to work with DAGs in Apache Airflow. We covered accessing the UI, running your first DAG, and best practices. You now have the foundational knowledge needed to build robust, maintainable workflows.
In the next part of this series, I'll dive into how to integrate LLMs into your workflows.