Skip to content

Commit

Permalink
AIR303: Add rule for deprecated task_concurrency parameter
Browse files Browse the repository at this point in the history
- Add new rule to detect deprecated task_concurrency parameter
- Update documentation and tests
- Add snapshot tests
  • Loading branch information
abhishekbhakat committed Nov 27, 2024
1 parent c84c690 commit 9a60fb9
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 0 deletions.
23 changes: 23 additions & 0 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR303.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from airflow.operators.python import PythonOperator
from airflow import DAG

# Using deprecated task_concurrency parameter
task1 = PythonOperator(
task_id="task1",
task_concurrency=2, # This should trigger AIR303
python_callable=lambda: None,
)

# Using new max_active_tis_per_dag parameter
task2 = PythonOperator(
task_id="task2",
max_active_tis_per_dag=2, # This should be fine
python_callable=lambda: None,
)

# Another example with task_concurrency
task3 = PythonOperator(
task_id="task3",
task_concurrency=5, # This should trigger AIR303
python_callable=lambda: None,
)
3 changes: 3 additions & 0 deletions crates/ruff_linter/src/checkers/ast/analyze/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) {
if checker.enabled(Rule::AirflowDagNoScheduleArgument) {
airflow::rules::dag_no_schedule_argument(checker, expr);
}
if checker.enabled(Rule::AirflowDeprecatedTaskConcurrency) {
airflow::rules::task_concurrency_check(checker, expr);
}
}
Expr::Dict(dict) => {
if checker.any_enabled(&[
Expand Down
1 change: 1 addition & 0 deletions crates/ruff_linter/src/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,7 @@ pub fn code_to_rule(linter: Linter, code: &str) -> Option<(RuleGroup, Rule)> {
// airflow
(Airflow, "001") => (RuleGroup::Stable, rules::airflow::rules::AirflowVariableNameTaskIdMismatch),
(Airflow, "301") => (RuleGroup::Preview, rules::airflow::rules::AirflowDagNoScheduleArgument),
(Airflow, "303") => (RuleGroup::Preview, rules::airflow::rules::AirflowDeprecatedTaskConcurrency),

// perflint
(Perflint, "101") => (RuleGroup::Stable, rules::perflint::rules::UnnecessaryListCast),
Expand Down
1 change: 1 addition & 0 deletions crates/ruff_linter/src/rules/airflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod tests {

#[test_case(Rule::AirflowVariableNameTaskIdMismatch, Path::new("AIR001.py"))]
#[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR301.py"))]
#[test_case(Rule::AirflowDeprecatedTaskConcurrency, Path::new("AIR303.py"))]
fn rules(rule_code: Rule, path: &Path) -> Result<()> {
let snapshot = format!("{}_{}", rule_code.noqa_code(), path.to_string_lossy());
let diagnostics = test_path(
Expand Down
2 changes: 2 additions & 0 deletions crates/ruff_linter/src/rules/airflow/rules/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub(crate) use dag_schedule_argument::*;
pub(crate) use task_concurrency::*;
pub(crate) use task_variable_name::*;

mod dag_schedule_argument;
mod task_concurrency;
mod task_variable_name;
75 changes: 75 additions & 0 deletions crates/ruff_linter/src/rules/airflow/rules/task_concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use ruff_diagnostics::{Diagnostic, Violation};
use ruff_macros::{derive_message_formats, violation};
use ruff_python_ast as ast;
use ruff_python_ast::Expr;
use ruff_python_semantic::Modules;
use ruff_text_size::Ranged;

use crate::checkers::ast::Checker;

/// ## What it does
/// Checks for usage of the deprecated `task_concurrency` parameter in Airflow Operators.
///
/// ## Why is this bad?
/// The `task_concurrency` parameter has been deprecated and renamed to `max_active_tis_per_dag`
/// in Airflow 3.0. Code using the old parameter name needs to be updated to ensure
/// compatibility with Airflow 3.0.
///
/// ## Example
/// ```python
/// from airflow.operators.python import PythonOperator
///
/// # Invalid: using deprecated parameter
/// task = PythonOperator(task_id="my_task", task_concurrency=2)
/// ```
///
/// Use instead:
/// ```python
/// from airflow.operators.python import PythonOperator
///
/// # Valid: using new parameter name
/// task = PythonOperator(task_id="my_task", max_active_tis_per_dag=2)
/// ```
#[violation]
pub struct AirflowDeprecatedTaskConcurrency {
pub old_param: String,
pub new_param: String,
}

impl Violation for AirflowDeprecatedTaskConcurrency {
#[derive_message_formats]
fn message(&self) -> String {
let AirflowDeprecatedTaskConcurrency {
old_param,
new_param,
} = self;
format!("Use `{new_param}` instead of deprecated `{old_param}` parameter")
}
}

/// AIR303
pub(crate) fn task_concurrency_check(checker: &mut Checker, value: &Expr) {
// If the value is not a call, we can't do anything.
let Expr::Call(ast::ExprCall {
func: _, arguments, ..
}) = value
else {
return;
};

// If we haven't seen any airflow imports, we can't do anything.
if !checker.semantic().seen_module(Modules::AIRFLOW) {
return;
}

// Check for the deprecated parameter
if let Some(keyword) = arguments.find_keyword("task_concurrency") {
checker.diagnostics.push(Diagnostic::new(
AirflowDeprecatedTaskConcurrency {
old_param: "task_concurrency".to_string(),
new_param: "max_active_tis_per_dag".to_string(),
},
keyword.range(),
));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
source: crates/ruff_linter/src/rules/airflow/mod.rs
snapshot_kind: text
---
AIR303.py:7:5: AIR303 Use `max_active_tis_per_dag` instead of deprecated `task_concurrency` parameter
|
5 | task1 = PythonOperator(
6 | task_id="task1",
7 | task_concurrency=2, # This should trigger AIR303
| ^^^^^^^^^^^^^^^^^^ AIR303
8 | python_callable=lambda: None,
9 | )
|

AIR303.py:21:5: AIR303 Use `max_active_tis_per_dag` instead of deprecated `task_concurrency` parameter
|
19 | task3 = PythonOperator(
20 | task_id="task3",
21 | task_concurrency=5, # This should trigger AIR303
| ^^^^^^^^^^^^^^^^^^ AIR303
22 | python_callable=lambda: None,
23 | )
|
1 change: 1 addition & 0 deletions ruff.schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9a60fb9

Please sign in to comment.