diff --git a/crates/ruff_linter/resources/test/fixtures/airflow/AIR303.py b/crates/ruff_linter/resources/test/fixtures/airflow/AIR303.py new file mode 100644 index 00000000000000..e4c5dd2f5ec22f --- /dev/null +++ b/crates/ruff_linter/resources/test/fixtures/airflow/AIR303.py @@ -0,0 +1,23 @@ +from airflow.operators.python import PythonOperator +from airflow import DAG + +# Using deprecated task_concurrency parameter +task1 = PythonOperator( + task_id="task1", + task_concurrency=2, # This should trigger AIR303 + python_callable=lambda: None, +) + +# Using new max_active_tis_per_dag parameter +task2 = PythonOperator( + task_id="task2", + max_active_tis_per_dag=2, # This should be fine + python_callable=lambda: None, +) + +# Another example with task_concurrency +task3 = PythonOperator( + task_id="task3", + task_concurrency=5, # This should trigger AIR303 + python_callable=lambda: None, +) diff --git a/crates/ruff_linter/src/checkers/ast/analyze/expression.rs b/crates/ruff_linter/src/checkers/ast/analyze/expression.rs index 7c27f1a1f01d1c..02b6ac98b7032f 100644 --- a/crates/ruff_linter/src/checkers/ast/analyze/expression.rs +++ b/crates/ruff_linter/src/checkers/ast/analyze/expression.rs @@ -1073,6 +1073,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) { if checker.enabled(Rule::AirflowDagNoScheduleArgument) { airflow::rules::dag_no_schedule_argument(checker, expr); } + if checker.enabled(Rule::AirflowDeprecatedTaskConcurrency) { + airflow::rules::task_concurrency_check(checker, expr); + } } Expr::Dict(dict) => { if checker.any_enabled(&[ diff --git a/crates/ruff_linter/src/codes.rs b/crates/ruff_linter/src/codes.rs index a14bc7e23a58dd..cb6fb8d72814ca 100644 --- a/crates/ruff_linter/src/codes.rs +++ b/crates/ruff_linter/src/codes.rs @@ -1034,6 +1034,7 @@ pub fn code_to_rule(linter: Linter, code: &str) -> Option<(RuleGroup, Rule)> { // airflow (Airflow, "001") => (RuleGroup::Stable, rules::airflow::rules::AirflowVariableNameTaskIdMismatch), (Airflow, "301") => (RuleGroup::Preview, rules::airflow::rules::AirflowDagNoScheduleArgument), + (Airflow, "303") => (RuleGroup::Preview, rules::airflow::rules::AirflowDeprecatedTaskConcurrency), // perflint (Perflint, "101") => (RuleGroup::Stable, rules::perflint::rules::UnnecessaryListCast), diff --git a/crates/ruff_linter/src/rules/airflow/mod.rs b/crates/ruff_linter/src/rules/airflow/mod.rs index 4e4130766022f9..bc7c000c0aae27 100644 --- a/crates/ruff_linter/src/rules/airflow/mod.rs +++ b/crates/ruff_linter/src/rules/airflow/mod.rs @@ -14,6 +14,7 @@ mod tests { #[test_case(Rule::AirflowVariableNameTaskIdMismatch, Path::new("AIR001.py"))] #[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR301.py"))] + #[test_case(Rule::AirflowDeprecatedTaskConcurrency, Path::new("AIR303.py"))] fn rules(rule_code: Rule, path: &Path) -> Result<()> { let snapshot = format!("{}_{}", rule_code.noqa_code(), path.to_string_lossy()); let diagnostics = test_path( diff --git a/crates/ruff_linter/src/rules/airflow/rules/mod.rs b/crates/ruff_linter/src/rules/airflow/rules/mod.rs index b01391092ca77a..fd1ff1e4a0dc5d 100644 --- a/crates/ruff_linter/src/rules/airflow/rules/mod.rs +++ b/crates/ruff_linter/src/rules/airflow/rules/mod.rs @@ -1,5 +1,7 @@ pub(crate) use dag_schedule_argument::*; +pub(crate) use task_concurrency::*; pub(crate) use task_variable_name::*; mod dag_schedule_argument; +mod task_concurrency; mod task_variable_name; diff --git a/crates/ruff_linter/src/rules/airflow/rules/task_concurrency.rs b/crates/ruff_linter/src/rules/airflow/rules/task_concurrency.rs new file mode 100644 index 00000000000000..796169901bad6f --- /dev/null +++ b/crates/ruff_linter/src/rules/airflow/rules/task_concurrency.rs @@ -0,0 +1,78 @@ +use ruff_diagnostics::{Diagnostic, Violation}; +use ruff_macros::{derive_message_formats, violation}; +use ruff_python_ast as ast; +use ruff_python_ast::Expr; +use ruff_text_size::Ranged; + +use crate::checkers::ast::Checker; + +/// ## What it does +/// Checks for usage of the deprecated `task_concurrency` parameter in Airflow Operators. +/// +/// ## Why is this bad? +/// The `task_concurrency` parameter has been deprecated and renamed to `max_active_tis_per_dag` +/// in Airflow 3.0. Code using the old parameter name needs to be updated to ensure +/// compatibility with Airflow 3.0. +/// +/// ## Example +/// ```python +/// from airflow.operators.python import PythonOperator +/// +/// # Invalid: using deprecated parameter +/// task = PythonOperator(task_id="my_task", task_concurrency=2) +/// ``` +/// +/// Use instead: +/// ```python +/// from airflow.operators.python import PythonOperator +/// +/// # Valid: using new parameter name +/// task = PythonOperator(task_id="my_task", max_active_tis_per_dag=2) +/// ``` +#[violation] +pub struct AirflowDeprecatedTaskConcurrency { + pub old_param: String, + pub new_param: String, +} + +impl Violation for AirflowDeprecatedTaskConcurrency { + #[derive_message_formats] + fn message(&self) -> String { + let AirflowDeprecatedTaskConcurrency { + old_param, + new_param, + } = self; + format!("Use `{new_param}` instead of deprecated `{old_param}` parameter") + } +} + +/// AIR303 +pub(crate) fn task_concurrency_check(checker: &mut Checker, value: &Expr) { + // If the value is not a call, we can't do anything. + let Expr::Call(ast::ExprCall { + func, arguments, .. + }) = value + else { + return; + }; + + // If the function doesn't come from Airflow, we can't do anything. + if !checker + .semantic() + .resolve_qualified_name(func) + .is_some_and(|qualified_name| matches!(qualified_name.segments()[0], "airflow")) + { + return; + } + + // Check for the deprecated parameter + if let Some(keyword) = arguments.find_keyword("task_concurrency") { + checker.diagnostics.push(Diagnostic::new( + AirflowDeprecatedTaskConcurrency { + old_param: "task_concurrency".to_string(), + new_param: "max_active_tis_per_dag".to_string(), + }, + keyword.range(), + )); + } +} diff --git a/crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR303_AIR303.py.snap b/crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR303_AIR303.py.snap new file mode 100644 index 00000000000000..5bf8a97609c9ae --- /dev/null +++ b/crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR303_AIR303.py.snap @@ -0,0 +1,23 @@ +--- +source: crates/ruff_linter/src/rules/airflow/mod.rs +snapshot_kind: text +--- +AIR303.py:7:5: AIR303 Use `max_active_tis_per_dag` instead of deprecated `task_concurrency` parameter + | +5 | task1 = PythonOperator( +6 | task_id="task1", +7 | task_concurrency=2, # This should trigger AIR303 + | ^^^^^^^^^^^^^^^^^^ AIR303 +8 | python_callable=lambda: None, +9 | ) + | + +AIR303.py:21:5: AIR303 Use `max_active_tis_per_dag` instead of deprecated `task_concurrency` parameter + | +19 | task3 = PythonOperator( +20 | task_id="task3", +21 | task_concurrency=5, # This should trigger AIR303 + | ^^^^^^^^^^^^^^^^^^ AIR303 +22 | python_callable=lambda: None, +23 | ) + | diff --git a/ruff.schema.json b/ruff.schema.json index f45b698932d92e..dfc673d86275fb 100644 --- a/ruff.schema.json +++ b/ruff.schema.json @@ -2797,6 +2797,7 @@ "AIR3", "AIR30", "AIR301", + "AIR303", "ALL", "ANN", "ANN0",