Super-charge Your SaaS & LLM Workflows

Part 3: Working with DAGs

Overview

By the end of this post, you'll have run and modified your first DAG. You'll also have learned about best practices when working with DAGs. You'll be ready to move on to the next step: integrating LLMs.

Accessing the UI

Let's start by accessing the Airflow UI. Open up your browser and navigate to http://localhost:8080/login

You'll need to sign in the first time you access the UI. The credentials we configured in Part 2 are:

Username: admin
Password: password
Airflow login

Troubleshooting

Port 8080 Already in Use

If you see an error like "bind: address already in use", another process is using port 8080.

Solution: Either stop the other process or modify the port mapping in docker-compose.yml from "8080:8080" to something like "8081:8080".

DAGs Not Appearing in the UI

If your DAGs aren't showing up in the Airflow UI, this is usually due to a parsing error or incorrect file permissions between your host machine and the container.

Solution: Check the scheduler logs using docker-compose logs airflow-scheduler. Make sure there weren't any errors associated to file permissions while docker-compose started up.

Database Connection Issues

If you see "OperationalError: unable to connect to database" or similar errors.

Solution: Ensure Postgres is running with docker-compose ps. If needed, reset the database with:

Warning: The following commands will delete all existing DAG run history and metadata!

docker-compose down --volumes
docker-compose up
Docker Desktop Connection Error

If you see an error containing "open //./pipe/dockerDesktopLinuxEngine: The system cannot find the file specified".

Solution: This usually means Docker Desktop isn't running. Start Docker Desktop and try again. If the error persists, try restarting Docker Desktop.

Incorrect Password

If you've changed the Airflow username and password variables in your environment and they're not working, its likely because the previous set of credentials are being used for the Airflow user profile. You'll need to either delete the user profile, rebuild the container, or reset the Airflow database.

Solution: This usually means Docker Desktop isn't running. Start Docker Desktop and try again. If the error persists, try restarting Docker Desktop.

Working with DAGs

Running a DAG

You'll see the sequential DAG listed in the UI along with many other example DAGs. Scroll down to find it.

Step 1: Before you can run the DAG, you'll need to unpause it. Newly deployed DAGs are paused by default to avoid inadvertent runs. This is a common gotcha, so keep it in mind.

Unpausing a DAG

Step 2: Once unpaused, you can trigger the DAG by pressing the play button, followed by selecting "Trigger DAG" from the dropdown menu.

Triggering a DAG

Note on UI Updates

I've noticed that the UI on a locally hosted Airflow instance sometimes doesn't update immediately after triggering a DAG. If this happens, just refresh the page.

Step 3: Make sure that Auto-refresh is toggled on in the top-right corner of the UI. This will allow the UI to update with the latest DAG statuses.

Auto-refreshing the UI

Step 4: You'll notice that the DAG run failed.

Failed DAG run

Let's take a look at the logs to see what happened.

Inspecting A Failed Task

When a task fails, you need to inspect the logs to understand what went wrong.

Step 1: Click on the failed tasks counter in the UI.

Failed task counter location

Step 2: Then, click on the failed DAG's "Run Id" link.

DAG run ID location

Step 3: Select the "Logs" tab to view the task's execution logs and error details.

Logs view location

Step 4: The error is plainly spelled out for you in the task logs. Open dags/sequential.py in the cloned repository on your host machine using your preferred IDE. Fix the issue and save the file.

Note on DAG Navigation

Learn to pay attention to the DAG navigation bar because the UI can and will vary in otherwise unexpected ways depending on which part of the DAG is currently in focus.

DAG navigation bar location

Note on Development Environment

While Dev Containers allow direct file editing within the Airflow container, this is not recommended since the git repository isn't initialized inside the container.

Rerunning: the DAG or the task?

You have two options and the better choice largely depends on which task failed and the amount of work performed by upstream tasks that succeeded. While rerunning the DAG is easier to invoke on the UI, it's not always the best option if it means waiting for all upstream tasks to run again. You may also need to consider the state of your system and how those upstream tasks may have changed that state.

Option 1: New DAG Run

Click the same play button as before.

DAG play button location

This will trigger a completely new run with a different run ID. Subsequently, new instances of all tasks will be executed.

Option 2: Clear Failed Task

First, click on the failed tasks counter in the UI.

Failed tasks counter in Airflow UI

Then, click on the failed DAG's "Run Id".

DAG Run ID link in Airflow UI

The DAG run's graph view should already be in focus. If not, open it by clicking on the "Graph" tab.

Graph tab in DAG run view

Click on the failed task in the DAG graph to put it into focus.

Failed task node in DAG graph

Then, click on "Clear Task". Followed by "Clear" in the dialog window that pops up.

Clear Task button and confirmation dialog

After a short while, you should see that the DAG ran successfully.

Clear task button location

Understanding DAG Parsing

Before we dive into best practices, it's important to understand how Airflow parses DAG files. Let's break down the sequential DAG into three sections to understand which parts get executed during parsing versus runtime:

1. Imports and Setup

This section is executed every time the DAG file is parsed by the scheduler:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.decorators import task
from datetime import datetime

2. Task Definitions

The content of task definitions is only executed when the task runs:

@task
def get_timestamp():
    return datetime.now()

@task
def format_timestamp(timestamp):
    return timestamp.strftime("%Y-%m-%d %H:%M:%S")

@task
def print_timestamp(formatted_timestamp):
    print(f"The provided timestamp is {formatted_timestamp}")

3. DAG Definition

The DAG context and task dependencies are evaluated every time the DAG file is parsed. When tasks are seemingly invoked (ex. timestamp = get_timestamp()), Task objects are being created so that the tasks and their return values can be used to define the DAG's structure:

with DAG(
    dag_id="sequential",
    description="A DAG made up of sequential tasks",
    start_date=days_ago(1),
    schedule_interval="@daily",
) as dag:
    timestamp = get_timestamp()
    formatted_timestamp = format_timestamp(timestamp)
    print_timestamp(formatted_timestamp)

The scheduler periodically parses all DAG files to check for updates. This means sections 1 and 3 are executed constantly throughout the day unless set otherwise, which is why it's crucial to keep these sections lightweight. Heavy operations like API calls or database queries should be moved inside task definitions (section 2) where they'll only be executed when the task actually runs.

Best Practices

I want to save you time and headaches with best practices I've learned working with Airflow in production. Keep these in mind as you start putting together DAGs for your own workflows using the local Airflow instance you now have running.

Minimal Processing in DAG Definitions

Avoid complex processing in DAG definitions. The scheduler parses DAG files frequently to check for updates - any heavy processing here will slow down the scheduler. Keep imports light, avoid API calls, and defer data processing to task execution time.

Discouraged

Loading configuration during DAG parsing

import requests

# The following API call runs during DAG parsing!
# If the response must be used to dynamically define the workflow
# then consider adding a caching mechanism when the API call is expensive, 
# or there's a risk of rate limiting.

response = requests.get('https://api.config.com/settings')
config = response.json()

@task
def process_data():
    return config['processing_settings']

with DAG('your_dag',     
    start_date=days_ago(1),
    schedule_interval='@daily',
) as dag:
    process_data()

Delegation of Work

The scheduler can only service a limited number of tasks at a time. Tasks that involve long-running, data-heavy operations should run a containerized job and be paired with a Sensor to wait for completion.

Discouraged

Using a task that blocks an Airflow worker while waiting for delegated work to complete

import time
import requests

@task
def process_and_wait():
    # Starts the processing job
    response = requests.post(
        "https://processing-service.example.com/jobs",
        json={
            "input_path": "s3://bucket/huge_file.csv",
            "output_path": "s3://bucket/output.csv"
        }
    )
    job_id = response.json()["job_id"]
    
    # Blocks worker with polling loop
    while True:
        response = requests.get(
            f"https://processing-service.example.com/jobs/{job_id}"
        )
        status = response.json()["status"]
        
        if status == "completed":
            break
        if status == "failed":
            raise AirflowException("Processing job failed")
            
        # Sleeps for 30 seconds before checking again
        # The worker cannot service other DAG's tasks during this time
        # and you only have a limited number of workers
        time.sleep(30)

with DAG('process_data_dag',
    start_date=days_ago(1),
    schedule_interval='@daily',
) as dag:
    job = process_and_wait()

Idempotent Tasks

Design tasks to be safely retriable without side effects. This means a task can be run multiple times with the same input and produce the same output.

Discouraged

Using non-deterministic paths and INSERT statements

@task
def process_sales_data(date):
    # Non-deterministic filename will change on every run
    output_path = f's3://bucket/sales_{datetime.now().timestamp()}.csv'
    
    # Non-idempotent database operation
    with postgres.connect() as conn:
        conn.execute("""
            INSERT INTO sales_summary (date, total)
            SELECT %s, SUM(amount)
            FROM daily_sales
            WHERE date = %s
        """, [date, date])
    
    return output_path

with DAG(
    'sales_processing',
    start_date=days_ago(1),
    schedule_interval='@daily'
) as dag:
    # Passing the DAG's logical date's datestring, which might not be 
    # the same date as when you rerun the DAG for whatever reason
    process_sales_data('{{ ds }}')

Atomic Tasks

Keep tasks focused and independent for better maintainability. Each task should do one thing well and be self-contained. This also makes tasks reusable and easier to test.

Discouraged

One large task doing multiple operations

@task
def process_and_load_data():
    # Download data
    response = requests.get('https://api.example.com/data')
    data = response.json()
    
    # Process data
    df = pd.DataFrame(data)
    df['processed'] = df['value'].apply(complex_calculation)
    
    # Load to database and S3
    df.to_sql('processed_data', engine)
    df.to_parquet('s3://bucket/processed_data.parquet')
    
    # Send notification
    send_slack_message("Data processing complete!")

Just-in-Time Tokens

Fetch credentials that expire when needed, not during DAG parsing or in separate upstream tasks. The scheduler might not get to a task before the token expires when processing many parallel DAG runs.

Additionally, passing tokens between tasks exposes them as plaintext XComs values, which can be a security risk.

Discouraged

Fetching tokens during DAG parsing or in upstream tasks

@task
def fetch_token_task():
    # Token fetched in upstream task
    return get_oauth_token()

@task
def process_data(token):
    # Token passed from upstream task, it might be expired by now
    # if all available workers were busy servicing other DAG's tasks
    # before returning to this one
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get("https://api.example.com/data", headers=headers)
    return process(response.json())

with DAG('token_dag_bad', ...) as dag:
    # Fetch token once and pass to downstream task
    token = fetch_token_task()
    process_data(token)

Dynamic Workflows

When building dynamic workflows, prefer task-level dynamism over DAG-level dynamism.

Discouraged

Complex conditional logic during parsing

config = fetch_config()
tasks = []

for item in config['items']:  # Loop during parsing
    if item['enabled']:
        @task
        def process_item(item=item):  # Dynamic task creation
            process_data(item)
        tasks.append(process_item())

with DAG('dynamic_dag_bad', ...) as dag:
    for i, task in enumerate(tasks[:-1]):
        task >> tasks[i + 1]

One-time Workflows

For one-time data processing jobs, define them as Airflow DAGs rather than standalone scripts. Just as you version control database migrations, tracking one-time workflows in your DAGs repository provides the following benefits: version history, execution tracking, automatic retries, and comprehensive logging. This approach makes workflows reproducible, shareable with teammates, and easily convertible to recurring jobs if needed later. After successful execution, remove the DAG definition to keep your Airflow instance clean and prevent accidental reruns.

Discouraged

Running scripts manually outside Airflow

import pandas as pd
import psycopg2
from datetime import datetime

def main():
    try:
        # Manual connection management
        conn = psycopg2.connect("postgresql://user:pass@host/db")
        
        # No automatic retries
        df = pd.read_csv('old_data.csv')
        df['processed'] = df['value'] * 1.1
        
        # No execution tracking
        df.to_sql('new_table', conn)
        
        print(f"Migration completed at {datetime.now()}")
    except Exception as e:
        print(f"Error: {e}")
        # Manual error handling
        
if __name__ == "__main__":
    main()

Object Storage

Use object storage services like AWS S3 for data persistence between tasks when intermediate results are larger than what can fit into XComs. Container filesystems are ephemeral and might not be shared between workers certain configurations of Airflow.

Discouraged

Using local filesystem for data passing

@task
def generate_report():
    df = process_data()
    # Using local filesystem
    df.to_csv('/tmp/report.csv')
    return '/tmp/report.csv'

@task
def send_report(file_path):
    # File might not exist if task runs on different worker
    with open(file_path, 'rb') as f:
        send_email(
            to='team@company.com',
            subject='Daily Report',
            attachment=f
        )

with DAG('report_dag_bad', ...) as dag:
    file_path = generate_report()
    send_report(file_path)

Error Handling

Implement comprehensive error handling with appropriate retry strategies. Use Airflow's retry mechanisms with exponential backoff for transient failures.

Discouraged

Basic try-except without proper retry handling

@task
def process_api_data():
    try:
        response = requests.get('https://api.example.com/data')
        data = response.json()
        process_data(data)
    except Exception as e:
        # Generic error handling
        print(f"Error: {e}")
        # No retries for transient failures
        # No error classification
        # No alerts
        raise

with DAG('basic_error_dag', ...) as dag:
    process_api_data()

Conclusion

You've now learned how to work with DAGs in Apache Airflow. We covered accessing the UI, running your first DAG, and best practices. You now have the foundational knowledge needed to build robust, maintainable workflows.

In the next part of this series, I'll dive into how to integrate LLMs into your workflows.

Next Steps

  • Practice creating your own DAGs using the local environment you've set up
  • Think about the additional dependencies and credentials Airflow needs to implement workflows for your application
  • Review the best practices section as you build your workflows
  • Join the mailing list below to be notified when Part 4 becomes available

Need advice?

Ask a Question