Skip to main content

Airflow

Airflow is a platform to programmatically author, schedule, and monitor workflows.

Anyscale provides a native integration with Airflow to orchestrate:

  • Anyscale Jobs: Submit and monitor an Anyscale Job using the operator.
  • Anyscale Services: Deploy and rollout Anyscale Services using the operator.

The operators from this integration use deferrable polling to check the status of the Anyscale Job or Service. This way, they don't occupy a full worker slot while waiting for the job to finish or the service to be deployed.

Installation

Install the Anyscale provider using the command below:

pip install astro-provider-anyscale

Connecting Anyscale to Airflow

Generate an Anyscale platform API key

  1. Go to the Anyscale console.
  2. Click on your username in the top right corner and select API Keys.
  3. Select the AI Platform tab and click Create.

Airflow Connection configuration

To integrate Airflow with Anyscale, configure an Airflow Connection with a unique name and set the password to the API token copied from the Anyscale console.

  1. Access Airflow Web UI: Open the Airflow web interface and log in using your Airflow credentials.
  2. Create a new Connection in Airflow: Go to the Admin tab and select Connections from the dropdown menu. Click the Add a new record button to create a new Connection.
  3. Configure the Connection:
    1. Conn Id: Enter a unique identifier for the connection, example: anyscale_conn.
    2. Conn Type: Select Anyscale.
    3. Password: Paste the API token you copied from the Anyscale console.
  4. Save the Connection: After filling in the required details, click the Save button at the bottom of the form to save the new Connection.

Usage

Orchestrating an Anyscale Job

To orchestrate an Anyscale Job within an Airflow DAG, use the SubmitAnyscaleJob operator.

from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG

from anyscale_provider.operators.anyscale import SubmitAnyscaleJob

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 4, 2),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"

# Constants
FOLDER_PATH = Path(__file__).parent / "ray_scripts"

dag = DAG(
"sample_anyscale_job_workflow",
default_args=default_args,
description="A DAG to interact with Anyscale triggered manually",
schedule=None, # This DAG is not scheduled, only triggered manually
catchup=False,
)

submit_anyscale_job = SubmitAnyscaleJob(
task_id="submit_anyscale_job",
conn_id=ANYSCALE_CONN_ID,
name="AstroJob",
image_uri="IMAGE_URI",
compute_config="COMPUTE_CONFIG",
working_dir=str(FOLDER_PATH),
entrypoint="python ray-job.py",
requirements=["requests", "pandas", "numpy", "torch"],
max_retries=1,
job_timeout_seconds=3000,
poll_interval=30,
dag=dag,
)


# Defining the task sequence
submit_anyscale_job

Orchestrating an Anyscale Service

To orchestrate an Anyscale Service within an Airflow DAG, use the RolloutAnyscaleService operator.

import uuid
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

from anyscale_provider.hooks.anyscale import AnyscaleHook
from anyscale_provider.operators.anyscale import RolloutAnyscaleService

default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 4, 2),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"
SERVICE_NAME = f"AstroService-CICD-{uuid.uuid4()}"

dag = DAG(
"sample_anyscale_service_workflow",
default_args=default_args,
description="A DAG to interact with Anyscale triggered manually",
schedule=None, # This DAG is not scheduled, only triggered manually
catchup=False,
)

deploy_anyscale_service = RolloutAnyscaleService(
task_id="rollout_anyscale_service",
conn_id=ANYSCALE_CONN_ID,
name=SERVICE_NAME,
image_uri="IMAGE_URI",
compute_config="COMPUTE_CONFIG",
working_dir="https://github.com/anyscale/docs_examples/archive/refs/heads/main.zip",
applications=[{"import_path": "sentiment_analysis.app:model"}],
requirements=["transformers", "requests", "pandas", "numpy", "torch"],
in_place=False,
canary_percent=None,
service_rollout_timeout_seconds=600,
poll_interval=30,
dag=dag,
)


def terminate_service():
hook = AnyscaleHook(conn_id=ANYSCALE_CONN_ID)
result = hook.terminate_service(service_name=SERVICE_NAME, time_delay=5)
print(result)


terminate_anyscale_service = PythonOperator(
task_id="initialize_anyscale_hook",
python_callable=terminate_service,
trigger_rule=TriggerRule.ALL_DONE,
dag=dag,
)

# Defining the task sequence
deploy_anyscale_service >> terminate_anyscale_service

Learn more