diff --git a/.github/workflows/data/clickhouse/matrix.yml b/.github/workflows/data/clickhouse/matrix.yml index 1373d5e8..075d98a0 100644 --- a/.github/workflows/data/clickhouse/matrix.yml +++ b/.github/workflows/data/clickhouse/matrix.yml @@ -11,7 +11,7 @@ min: &min max: &max clickhouse-image: clickhouse/clickhouse-server clickhouse-version: 24.8.2.3-alpine - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/core/matrix.yml b/.github/workflows/data/core/matrix.yml index 504f1d4d..67d32ce3 100644 --- a/.github/workflows/data/core/matrix.yml +++ b/.github/workflows/data/core/matrix.yml @@ -6,7 +6,7 @@ min: &min os: ubuntu-latest max: &max - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/hdfs/matrix.yml b/.github/workflows/data/hdfs/matrix.yml index f8bae7d5..bb913214 100644 --- a/.github/workflows/data/hdfs/matrix.yml +++ b/.github/workflows/data/hdfs/matrix.yml @@ -8,7 +8,7 @@ min: &min max: &max hadoop-version: hadoop3-hdfs - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/hive/matrix.yml b/.github/workflows/data/hive/matrix.yml index 31b2120f..6bb53edf 100644 --- a/.github/workflows/data/hive/matrix.yml +++ b/.github/workflows/data/hive/matrix.yml @@ -6,7 +6,7 @@ min: &min os: ubuntu-latest max: &max - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/kafka/matrix.yml b/.github/workflows/data/kafka/matrix.yml index 4ff5fe64..309c0cae 100644 --- a/.github/workflows/data/kafka/matrix.yml +++ b/.github/workflows/data/kafka/matrix.yml @@ -12,7 +12,7 @@ min: &min max: &max kafka-version: 3.7.1 pydantic-version: 2 - spark-version: 3.5.2 + spark-version: 3.5.3 python-version: '3.12' java-version: 20 os: ubuntu-latest diff --git a/.github/workflows/data/mongodb/matrix.yml b/.github/workflows/data/mongodb/matrix.yml index 11892d65..6df9c853 100644 --- a/.github/workflows/data/mongodb/matrix.yml +++ b/.github/workflows/data/mongodb/matrix.yml @@ -9,7 +9,7 @@ min: &min max: &max mongodb-version: 7.0.14 - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/mssql/matrix.yml b/.github/workflows/data/mssql/matrix.yml index 3748a0a7..978faa17 100644 --- a/.github/workflows/data/mssql/matrix.yml +++ b/.github/workflows/data/mssql/matrix.yml @@ -1,14 +1,14 @@ min: &min - mssql-version: 2017-GA-ubuntu + mssql-version: 2017-latest spark-version: 2.3.1 pydantic-version: 1 python-version: '3.7' java-version: 8 - os: ubuntu-latest + os: ubuntu-20.04 max: &max - mssql-version: 2022-CU14-ubuntu-22.04 - spark-version: 3.5.2 + mssql-version: 2022-latest + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/mysql/matrix.yml b/.github/workflows/data/mysql/matrix.yml index 17dacdb2..b98c985c 100644 --- a/.github/workflows/data/mysql/matrix.yml +++ b/.github/workflows/data/mysql/matrix.yml @@ -10,7 +10,7 @@ min: &min max: &max mysql-version: 9.0.1 - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/oracle/matrix.yml b/.github/workflows/data/oracle/matrix.yml index ccafa20f..051f0df9 100644 --- a/.github/workflows/data/oracle/matrix.yml +++ b/.github/workflows/data/oracle/matrix.yml @@ -12,7 +12,7 @@ max: &max oracle-image: gvenzl/oracle-free oracle-version: 23.4-slim-faststart db-name: FREEPDB1 - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/postgres/matrix.yml b/.github/workflows/data/postgres/matrix.yml index cd63ae03..43b914e9 100644 --- a/.github/workflows/data/postgres/matrix.yml +++ b/.github/workflows/data/postgres/matrix.yml @@ -9,7 +9,7 @@ min: &min max: &max postgres-version: 16.4-alpine - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/s3/matrix.yml b/.github/workflows/data/s3/matrix.yml index 3990f312..5a408373 100644 --- a/.github/workflows/data/s3/matrix.yml +++ b/.github/workflows/data/s3/matrix.yml @@ -10,7 +10,7 @@ min: &min max: &max minio-version: 2024.8.29 - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.github/workflows/data/teradata/matrix.yml b/.github/workflows/data/teradata/matrix.yml index d9792be6..e9e4b5fa 100644 --- a/.github/workflows/data/teradata/matrix.yml +++ b/.github/workflows/data/teradata/matrix.yml @@ -1,5 +1,5 @@ max: &max - spark-version: 3.5.2 + spark-version: 3.5.3 pydantic-version: 2 python-version: '3.12' java-version: 20 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8d9215d3..6d2bb684 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,7 +3,7 @@ default_language_version: repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.6.0 + rev: v5.0.0 hooks: - id: check-ast - id: check-case-conflict @@ -90,7 +90,7 @@ repos: - id: text-unicode-replacement-char - repo: https://github.com/asottile/pyupgrade - rev: v3.17.0 + rev: v3.18.0 hooks: - id: pyupgrade args: [--py37-plus, --keep-runtime-typing] @@ -101,20 +101,20 @@ repos: - id: add-trailing-comma - repo: https://github.com/psf/black - rev: 24.8.0 + rev: 24.10.0 hooks: - id: black language_version: python3 - repo: https://github.com/asottile/blacken-docs - rev: 1.18.0 + rev: 1.19.0 hooks: - id: blacken-docs additional_dependencies: - black==24.4.2 - repo: https://github.com/pycqa/bandit - rev: 1.7.9 + rev: 1.7.10 hooks: - id: bandit args: diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index aa1a3c03..fe08ff0b 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -71,7 +71,7 @@ Create virtualenv and install dependencies: -r requirements/tests/postgres.txt \ -r requirements/tests/oracle.txt \ -r requirements/tests/pydantic-2.txt \ - -r requirements/tests/spark-3.5.2.txt + -r requirements/tests/spark-3.5.3.txt # TODO: remove after https://github.com/zqmillet/sphinx-plantuml/pull/4 pip install sphinx-plantuml --no-deps diff --git a/README.rst b/README.rst index 9def167f..79561c1d 100644 --- a/README.rst +++ b/README.rst @@ -184,7 +184,7 @@ Compatibility matrix +--------------------------------------------------------------+-------------+-------------+-------+ | `3.4.x `_ | 3.7 - 3.12 | 8u362 - 20 | 2.12 | +--------------------------------------------------------------+-------------+-------------+-------+ -| `3.5.x `_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 | +| `3.5.x `_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 | +--------------------------------------------------------------+-------------+-------------+-------+ .. _pyspark-install: @@ -199,7 +199,7 @@ or install PySpark explicitly: .. code:: bash - pip install onetl pyspark==3.5.2 # install a specific PySpark version + pip install onetl pyspark==3.5.3 # install a specific PySpark version or inject PySpark to ``sys.path`` in some other way BEFORE creating a class instance. **Otherwise connection object cannot be created.** @@ -540,7 +540,7 @@ Read files directly from S3 path, convert them to dataframe, transform it and th setup_logging() # Initialize new SparkSession with Hadoop AWS libraries and Postgres driver loaded - maven_packages = SparkS3.get_packages(spark_version="3.5.2") + Postgres.get_packages() + maven_packages = SparkS3.get_packages(spark_version="3.5.3") + Postgres.get_packages() spark = ( SparkSession.builder.appName("spark_app_onetl_demo") .config("spark.jars.packages", ",".join(maven_packages)) diff --git a/docker-compose.yml b/docker-compose.yml index d32f682a..9764ed00 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: context: . target: base args: - SPARK_VERSION: 3.5.2 + SPARK_VERSION: 3.5.3 env_file: .env.docker volumes: - ./:/app/ diff --git a/docker/Dockerfile b/docker/Dockerfile index 68f40a52..6dfbaf56 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -44,7 +44,7 @@ ENV PATH=${ONETL_USER_HOME}/.local/bin:${PATH} COPY --chown=onetl:onetl ./run_tests.sh ./pytest_runner.sh ./combine_coverage.sh /app/ RUN chmod +x /app/run_tests.sh /app/pytest_runner.sh /app/combine_coverage.sh -ARG SPARK_VERSION=3.5.2 +ARG SPARK_VERSION=3.5.3 # Spark is heavy, and version change is quite rare COPY --chown=onetl:onetl ./requirements/tests/spark-${SPARK_VERSION}.txt /app/requirements/tests/ RUN pip install -r /app/requirements/tests/spark-${SPARK_VERSION}.txt diff --git a/docs/changelog/0.12.1.rst b/docs/changelog/0.12.1.rst new file mode 100644 index 00000000..953cf3e1 --- /dev/null +++ b/docs/changelog/0.12.1.rst @@ -0,0 +1,23 @@ +0.12.1 (2024-10-28) +=================== + +Features +-------- + +- Log detected JDBC dialect while using ``DBWriter``. + + +Bug Fixes +--------- + +- Fix ``SparkMetricsRecorder`` failing when receiving ``SparkListenerTaskEnd`` without ``taskMetrics`` (e.g. executor was killed by OOM). (:github:pull:`313`) +- Call ``kinit`` before checking for HDFS active namenode. +- Wrap ``kinit`` with ``threading.Lock`` to avoid multithreading issues. +- Immediately show ``kinit`` errors to user, instead of hiding them. +- Use ``AttributeError`` instead of ``ImportError`` in module's ``__getattr__`` method, to make code compliant with Python spec. + + +Doc only Changes +---------------- + +- Add note about `spark-dialect-extension `_ package to Clickhouse connector documentation. (:github:pull:`310`) diff --git a/docs/changelog/index.rst b/docs/changelog/index.rst index 756f0cb1..812c5437 100644 --- a/docs/changelog/index.rst +++ b/docs/changelog/index.rst @@ -3,6 +3,7 @@ :caption: Changelog DRAFT + 0.12.1 0.12.0 0.11.2 0.11.1 diff --git a/docs/connection/db_connection/clickhouse/types.rst b/docs/connection/db_connection/clickhouse/types.rst index 0d8c5675..4f2dbf3f 100644 --- a/docs/connection/db_connection/clickhouse/types.rst +++ b/docs/connection/db_connection/clickhouse/types.rst @@ -3,6 +3,16 @@ Clickhouse <-> Spark type mapping ================================= +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. + +.. note:: + + It is recommended to use `spark-dialect-extension `_ package, + which implements writing Arrays from Spark to Clickhouse, fixes dropping fractions of seconds in ``TimestampType``, + and fixes other type conversion issues. + Type detection & casting ------------------------ @@ -106,8 +116,8 @@ References Here you can find source code with type conversions: * `Clickhouse -> JDBC `_ -* `JDBC -> Spark `_ -* `Spark -> JDBC `_ +* `JDBC -> Spark `_ +* `Spark -> JDBC `_ * `JDBC -> Clickhouse `_ Supported types diff --git a/docs/connection/db_connection/greenplum/prerequisites.rst b/docs/connection/db_connection/greenplum/prerequisites.rst index b3cf52d7..545b8399 100644 --- a/docs/connection/db_connection/greenplum/prerequisites.rst +++ b/docs/connection/db_connection/greenplum/prerequisites.rst @@ -93,7 +93,7 @@ Number of connections can be limited by 2 ways: spark = ( SparkSession.builder - # Spark will start EXACTLY 5 executors with 1 core each, so max number of parallel jobs is 10 + # Spark will run with 5 threads in local mode, allowing up to 5 parallel tasks .config("spark.master", "local[5]") .config("spark.executor.cores", 1) ).getOrCreate() diff --git a/docs/connection/db_connection/greenplum/types.rst b/docs/connection/db_connection/greenplum/types.rst index 3c0e664b..01282159 100644 --- a/docs/connection/db_connection/greenplum/types.rst +++ b/docs/connection/db_connection/greenplum/types.rst @@ -3,6 +3,10 @@ Greenplum <-> Spark type mapping ================================= +.. note:: + + The results below are valid for Spark 3.2.4, and may differ on other Spark versions. + Type detection & casting ------------------------ diff --git a/docs/connection/db_connection/hive/write.rst b/docs/connection/db_connection/hive/write.rst index cd707fae..b4376930 100644 --- a/docs/connection/db_connection/hive/write.rst +++ b/docs/connection/db_connection/hive/write.rst @@ -19,18 +19,17 @@ Examples df = ... # data is here # Create dataframe with specific number of Spark partitions. - # Use the Hive partitioning columns to group the data. Create only 20 files per Hive partition. - # Also sort the data by column which most data is correlated with, reducing files size. + # Use the Hive partitioning columns to group the data. Create max 20 files per Hive partition. + # Also sort the data by column which most data is correlated with (e.g. user_id), reducing files size. + + num_files_per_partition = 20 + partition_columns = ["country", "business_date"] + sort_columns = ["user_id"] write_df = df.repartition( - 20, - "country", - "business_date", - "user_id", - ).sortWithinPartitions( - "country", - "business_date", - "user_id", - ) + num_files_per_partition, + *partition_columns, + *sort_columns, + ).sortWithinPartitions(*partition_columns, *sort_columns) writer = DBWriter( connection=hive, @@ -38,8 +37,7 @@ Examples options=Hive.WriteOptions( if_exists="append", # Hive partitioning columns. - # `user_id`` column is not included, as it has a lot of distinct values. - partitionBy=["country", "business_date"], + partitionBy=partition_columns, ), ) diff --git a/docs/connection/db_connection/mongodb/types.rst b/docs/connection/db_connection/mongodb/types.rst index f701ac93..91a64a88 100644 --- a/docs/connection/db_connection/mongodb/types.rst +++ b/docs/connection/db_connection/mongodb/types.rst @@ -3,6 +3,10 @@ MongoDB <-> Spark type mapping ============================== +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. + Type detection & casting ------------------------ diff --git a/docs/connection/db_connection/mssql/types.rst b/docs/connection/db_connection/mssql/types.rst index 852289ad..13c7874a 100644 --- a/docs/connection/db_connection/mssql/types.rst +++ b/docs/connection/db_connection/mssql/types.rst @@ -1,7 +1,11 @@ .. _mssql-types: MSSQL <-> Spark type mapping -================================= +============================ + +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. Type detection & casting ------------------------ @@ -101,8 +105,8 @@ References Here you can find source code with type conversions: * `MSSQL -> JDBC `_ -* `JDBC -> Spark `_ -* `Spark -> JDBC `_ +* `JDBC -> Spark `_ +* `Spark -> JDBC `_ * `JDBC -> MSSQL `_ Supported types diff --git a/docs/connection/db_connection/mysql/types.rst b/docs/connection/db_connection/mysql/types.rst index 001a221f..a828cb41 100644 --- a/docs/connection/db_connection/mysql/types.rst +++ b/docs/connection/db_connection/mysql/types.rst @@ -1,7 +1,11 @@ .. _mysql-types: MySQL <-> Spark type mapping -================================= +============================ + +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. Type detection & casting ------------------------ @@ -97,8 +101,8 @@ References Here you can find source code with type conversions: * `MySQL -> JDBC `_ -* `JDBC -> Spark `_ -* `Spark -> JDBC `_ +* `JDBC -> Spark `_ +* `Spark -> JDBC `_ * `JDBC -> MySQL `_ Supported types diff --git a/docs/connection/db_connection/oracle/types.rst b/docs/connection/db_connection/oracle/types.rst index 2433b0f7..b5277660 100644 --- a/docs/connection/db_connection/oracle/types.rst +++ b/docs/connection/db_connection/oracle/types.rst @@ -3,6 +3,10 @@ Oracle <-> Spark type mapping ============================= +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. + Type detection & casting ------------------------ @@ -101,8 +105,8 @@ See `List of Oracle types Spark `_ -* `Spark -> JDBC `_ +* `JDBC -> Spark `_ +* `Spark -> JDBC `_ Numeric types ~~~~~~~~~~~~~ diff --git a/docs/connection/db_connection/postgres/types.rst b/docs/connection/db_connection/postgres/types.rst index f0fe8821..7eea2850 100644 --- a/docs/connection/db_connection/postgres/types.rst +++ b/docs/connection/db_connection/postgres/types.rst @@ -1,7 +1,11 @@ .. _postgres-types: Postgres <-> Spark type mapping -================================= +=============================== + +.. note:: + + The results below are valid for Spark 3.5.3, and may differ on other Spark versions. Type detection & casting ------------------------ @@ -109,8 +113,8 @@ See `List of Postgres types JDBC `_ -* `JDBC -> Spark `_ -* `Spark -> JDBC `_ +* `JDBC -> Spark `_ +* `Spark -> JDBC `_ Numeric types ~~~~~~~~~~~~~ diff --git a/onetl/VERSION b/onetl/VERSION index ac454c6a..34a83616 100644 --- a/onetl/VERSION +++ b/onetl/VERSION @@ -1 +1 @@ -0.12.0 +0.12.1 diff --git a/onetl/_metrics/executor.py b/onetl/_metrics/executor.py index bbb6d732..819f220d 100644 --- a/onetl/_metrics/executor.py +++ b/onetl/_metrics/executor.py @@ -7,12 +7,17 @@ from humanize import naturalsize, precisedelta +try: + from pydantic.v1 import Field +except (ImportError, AttributeError): + from pydantic import Field # type: ignore[no-redef, assignment] + from onetl.impl import BaseModel class SparkExecutorMetrics(BaseModel): - total_run_time: timedelta = timedelta() - total_cpu_time: timedelta = timedelta() + total_run_time: timedelta = Field(default_factory=timedelta) + total_cpu_time: timedelta = Field(default_factory=timedelta) peak_memory_bytes: int = 0 memory_spilled_bytes: int = 0 disk_spilled_bytes: int = 0 diff --git a/onetl/_metrics/extract.py b/onetl/_metrics/extract.py index 4b058092..bd293779 100644 --- a/onetl/_metrics/extract.py +++ b/onetl/_metrics/extract.py @@ -70,7 +70,7 @@ def extract_metrics_from_execution(execution: SparkListenerExecution) -> SparkCo disk_spilled_bytes += stage.metrics.disk_spilled_bytes result_size_bytes += stage.metrics.result_size_bytes - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L467-L473 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L467-L473 input_file_count = ( _get_int(execution.metrics, SparkSQLMetricNames.NUMBER_OF_FILES_READ) or _get_int(execution.metrics, SparkSQLMetricNames.STATIC_NUMBER_OF_FILES_READ) diff --git a/onetl/_metrics/listener/base.py b/onetl/_metrics/listener/base.py index bbc6431c..d1b9555d 100644 --- a/onetl/_metrics/listener/base.py +++ b/onetl/_metrics/listener/base.py @@ -16,7 +16,7 @@ class BaseSparkListener: """Base no-op SparkListener implementation. - See `SparkListener `_ interface. + See `SparkListener `_ interface. """ spark: SparkSession diff --git a/onetl/_metrics/listener/execution.py b/onetl/_metrics/listener/execution.py index a0d2a522..ef2e7ffd 100644 --- a/onetl/_metrics/listener/execution.py +++ b/onetl/_metrics/listener/execution.py @@ -22,18 +22,18 @@ class SparkSQLMetricNames(str, Enum): # noqa: WPS338 # Metric names passed to SQLMetrics.createMetric(...) # But only those we're interested in. - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L231 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L231 NUMBER_OF_PARTITIONS_READ = "number of partitions read" - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227 NUMBER_OF_FILES_READ = "number of files read" SIZE_OF_FILES_READ = "size of files read" - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L225-L227 STATIC_NUMBER_OF_FILES_READ = "static number of files read" STATIC_SIZE_OF_FILES_READ = "static size of files read" - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala#L241-L246 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala#L241-L246 NUMBER_OF_DYNAMIC_PART = "number of dynamic part" NUMBER_OF_WRITTEN_FILES = "number of written files" @@ -66,11 +66,11 @@ def jobs(self) -> list[SparkListenerJob]: return result def on_execution_start(self, event): - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44-L58 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44-L58 self.status = SparkListenerExecutionStatus.STARTED def on_execution_end(self, event): - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L61-L83 + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L61-L83 for job in self._jobs.values(): if job.status == SparkListenerJobStatus.FAILED: self.status = SparkListenerExecutionStatus.FAILED diff --git a/onetl/_metrics/listener/job.py b/onetl/_metrics/listener/job.py index 5581d76e..488282ed 100644 --- a/onetl/_metrics/listener/job.py +++ b/onetl/_metrics/listener/job.py @@ -38,8 +38,8 @@ def stages(self) -> list[SparkListenerStage]: @classmethod def create(cls, event): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerJobSubmitted.html - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerJobCompleted.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerJobSubmitted.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerJobCompleted.html result = cls( id=event.jobId(), description=event.properties().get("spark.job.description"), diff --git a/onetl/_metrics/listener/listener.py b/onetl/_metrics/listener/listener.py index 04fe53c2..22522432 100644 --- a/onetl/_metrics/listener/listener.py +++ b/onetl/_metrics/listener/listener.py @@ -75,7 +75,7 @@ def onExecutionEnd(self, event): # Get execution metrics from SQLAppStatusStore, # as SparkListenerSQLExecutionEnd event does not provide them: - # https://github.com/apache/spark/blob/v3.5.2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala + # https://github.com/apache/spark/blob/v3.5.3/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala session_status_store = self.spark._jsparkSession.sharedState().statusStore() # noqa: WPS437 raw_execution = session_status_store.execution(execution.id).get() metrics = raw_execution.metrics() diff --git a/onetl/_metrics/listener/stage.py b/onetl/_metrics/listener/stage.py index b858e151..3e233570 100644 --- a/onetl/_metrics/listener/stage.py +++ b/onetl/_metrics/listener/stage.py @@ -21,7 +21,7 @@ def __str__(self): @dataclass class SparkListenerStage: - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/StageInfo.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/StageInfo.html id: int status: SparkListenerStageStatus = SparkListenerStageStatus.PENDING metrics: SparkListenerTaskMetrics = field(default_factory=SparkListenerTaskMetrics, repr=False, init=False) @@ -39,11 +39,11 @@ def create(cls, stage_info): return cls(id=stage_info.stageId()) def on_stage_start(self, event): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerStageSubmitted.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerStageSubmitted.html self.status = SparkListenerStageStatus.ACTIVE def on_stage_end(self, event): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerStageCompleted.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerStageCompleted.html stage_info = event.stageInfo() if stage_info.failureReason().isDefined(): self.status = SparkListenerStageStatus.FAILED diff --git a/onetl/_metrics/listener/task.py b/onetl/_metrics/listener/task.py index 5a17ffc5..f979dbaa 100644 --- a/onetl/_metrics/listener/task.py +++ b/onetl/_metrics/listener/task.py @@ -61,6 +61,8 @@ class SparkListenerTaskMetrics: @classmethod def create(cls, task_metrics): + if not task_metrics: + return cls() return cls( executor_run_time_milliseconds=task_metrics.executorRunTime(), executor_cpu_time_nanoseconds=task_metrics.executorCpuTime(), @@ -81,14 +83,14 @@ class SparkListenerTask: @classmethod def create(cls, task_info): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/TaskInfo.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/TaskInfo.html return cls(id=task_info.taskId()) def on_task_start(self, event): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerTaskStart.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerTaskStart.html self.status = SparkListenerTaskStatus(event.taskInfo().status()) def on_task_end(self, event): - # https://spark.apache.org/docs/3.5.2/api/java/org/apache/spark/scheduler/SparkListenerTaskEnd.html + # https://spark.apache.org/docs/3.5.3/api/java/org/apache/spark/scheduler/SparkListenerTaskEnd.html self.status = SparkListenerTaskStatus(event.taskInfo().status()) self.metrics = SparkListenerTaskMetrics.create(event.taskMetrics()) diff --git a/onetl/_util/spark.py b/onetl/_util/spark.py index ab2090b0..3265e2c4 100644 --- a/onetl/_util/spark.py +++ b/onetl/_util/spark.py @@ -139,14 +139,16 @@ def get_spark_version(spark_session: SparkSession) -> Version: return Version(spark_session.version) -def estimate_dataframe_size(spark_session: SparkSession, df: DataFrame) -> int: +def estimate_dataframe_size(df: DataFrame) -> int: """ Estimate in-memory DataFrame size in bytes. If cannot be estimated, return 0. - Using Spark's `SizeEstimator `_. + Using Spark's `SizeEstimator `_. """ + try: - size_estimator = spark_session._jvm.org.apache.spark.util.SizeEstimator # type: ignore[union-attr] + spark_context = df._sc + size_estimator = spark_context._jvm.org.apache.spark.util.SizeEstimator # type: ignore[union-attr] return size_estimator.estimate(df._jdf) except Exception: # SizeEstimator uses Java reflection which may behave differently in different Java versions, diff --git a/onetl/connection/__init__.py b/onetl/connection/__init__.py index 608beb41..03402fd0 100644 --- a/onetl/connection/__init__.py +++ b/onetl/connection/__init__.py @@ -76,4 +76,4 @@ def __getattr__(name: str): submodule = file_df_connections_modules[name] return getattr(import_module(f"onetl.connection.file_df_connection.{submodule}"), name) - raise ImportError(f"cannot import name {name!r} from {__name__!r}") + raise AttributeError(name) diff --git a/onetl/connection/db_connection/jdbc_connection/connection.py b/onetl/connection/db_connection/jdbc_connection/connection.py index 0ea3078c..5bb5811c 100644 --- a/onetl/connection/db_connection/jdbc_connection/connection.py +++ b/onetl/connection/db_connection/jdbc_connection/connection.py @@ -187,6 +187,7 @@ def write_df_to_target( else write_options.if_exists.value ) log.info("|%s| Saving data to a table %r", self.__class__.__name__, target) + log.info("|%s| Detected dialect: '%s'", self.__class__.__name__, self._get_spark_dialect_name()) df.write.format("jdbc").mode(mode).options(dbtable=target, **jdbc_properties).save() log.info("|%s| Table %r successfully written", self.__class__.__name__, target) diff --git a/onetl/connection/db_connection/jdbc_mixin/connection.py b/onetl/connection/db_connection/jdbc_mixin/connection.py index a6830ae4..a2f1d192 100644 --- a/onetl/connection/db_connection/jdbc_mixin/connection.py +++ b/onetl/connection/db_connection/jdbc_mixin/connection.py @@ -228,7 +228,7 @@ def fetch( # Just create metrics by hand, and fill them up using information based on dataframe content. metrics = SparkCommandMetrics() metrics.input.read_rows = df.count() - metrics.driver.in_memory_bytes = estimate_dataframe_size(self.spark, df) + metrics.driver.in_memory_bytes = estimate_dataframe_size(df) log.info("|%s| Recorded metrics:", self.__class__.__name__) log_lines(log, str(metrics)) return df @@ -304,7 +304,7 @@ def execute( # Just create metrics by hand, and fill them up using information based on dataframe content. metrics = SparkCommandMetrics() metrics.input.read_rows = df.count() - metrics.driver.in_memory_bytes = estimate_dataframe_size(self.spark, df) + metrics.driver.in_memory_bytes = estimate_dataframe_size(df) log.info("|%s| Recorded metrics:", self.__class__.__name__) log_lines(log, str(metrics)) diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 93f9d821..71fa82bd 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -332,7 +332,7 @@ def write_df_to_target( write_options.update(options.dict(by_alias=True, exclude_none=True, exclude={"if_exists"})) write_options["topic"] = target - # As of Apache Spark version 3.5.2, the mode 'error' is not functioning as expected. + # As of Apache Spark version 3.5.3, the mode 'error' is not functioning as expected. # This issue has been reported and can be tracked at: # https://issues.apache.org/jira/browse/SPARK-44774 mode = options.if_exists diff --git a/onetl/connection/file_connection/file_connection.py b/onetl/connection/file_connection/file_connection.py index de5916c4..c85eead4 100644 --- a/onetl/connection/file_connection/file_connection.py +++ b/onetl/connection/file_connection/file_connection.py @@ -11,6 +11,11 @@ from humanize import naturalsize +try: + from pydantic.v1 import PrivateAttr +except (ImportError, AttributeError): + from pydantic import PrivateAttr # type: ignore[no-redef, assignment] + from onetl.base import ( BaseFileConnection, BaseFileFilter, @@ -41,7 +46,7 @@ @support_hooks class FileConnection(BaseFileConnection, FrozenModel): - _clients_cache: Any = None + _clients_cache: Any = PrivateAttr(default=None) @property def client(self): diff --git a/onetl/connection/file_connection/hdfs/connection.py b/onetl/connection/file_connection/hdfs/connection.py index 8cb6d1b5..74eae72d 100644 --- a/onetl/connection/file_connection/hdfs/connection.py +++ b/onetl/connection/file_connection/hdfs/connection.py @@ -5,15 +5,23 @@ import os import stat import textwrap +from contextlib import suppress from logging import getLogger from typing import TYPE_CHECKING, Optional, Tuple from etl_entities.instance import Cluster, Host try: - from pydantic.v1 import Field, FilePath, SecretStr, root_validator, validator + from pydantic.v1 import ( + Field, + FilePath, + PrivateAttr, + SecretStr, + root_validator, + validator, + ) except (ImportError, AttributeError): - from pydantic import Field, FilePath, SecretStr, root_validator, validator # type: ignore[no-redef, assignment] + from pydantic import Field, FilePath, SecretStr, PrivateAttr, root_validator, validator # type: ignore[no-redef, assignment] from onetl.base import PathStatProtocol from onetl.connection.file_connection.file_connection import FileConnection @@ -212,6 +220,8 @@ class HDFS(FileConnection, RenameDirMixin): # TODO: remove in v1.0.0 slots = Slots + _active_host: Optional[Host] = PrivateAttr(default=None) + @slot @classmethod def get_current(cls, **kwargs): @@ -273,6 +283,14 @@ def __str__(self): def path_exists(self, path: os.PathLike | str) -> bool: return self.client.status(os.fspath(path), strict=False) + @slot + def close(self): + super().close() + + with suppress(Exception): + self._active_host = None + return self + @validator("user", pre=True) def _validate_packages(cls, user): if user: @@ -378,7 +396,7 @@ def _get_active_namenode(self) -> str: raise RuntimeError(f"Cannot get list of namenodes for a cluster {self.cluster!r}") nodes_len = len(namenodes) - for i, namenode in enumerate(namenodes): + for i, namenode in enumerate(namenodes, start=1): log.debug("|%s| Trying namenode %r (%d of %d) ...", class_name, namenode, i, nodes_len) if self.Slots.is_namenode_active(namenode, self.cluster): log.info("|%s| Node %r is active!", class_name, namenode) @@ -412,10 +430,13 @@ def _get_host(self) -> str: raise RuntimeError(f"Host {self.host!r} is not an active namenode") - def _get_client(self) -> Client: - host = self._get_host() - conn_str = f"http://{host}:{self.webhdfs_port}" # NOSONAR + def _get_conn_str(self) -> str: + # cache active host to reduce number of requests. + if not self._active_host: + self._active_host = self._get_host() + return f"http://{self._active_host}:{self.webhdfs_port}" + def _get_client(self) -> Client: if self.user and (self.keytab or self.password): from hdfs.ext.kerberos import KerberosClient # noqa: F811 @@ -424,10 +445,13 @@ def _get_client(self) -> Client: keytab=self.keytab, password=self.password.get_secret_value() if self.password else None, ) + # checking if namenode is active requires a Kerberos ticket + conn_str = self._get_conn_str() client = KerberosClient(conn_str, timeout=self.timeout) else: from hdfs import InsecureClient # noqa: F401, WPS442, F811 + conn_str = self._get_conn_str() client = InsecureClient(conn_str, user=self.user) return client diff --git a/onetl/connection/file_df_connection/spark_hdfs/connection.py b/onetl/connection/file_df_connection/spark_hdfs/connection.py index 36d20d4b..12f17504 100644 --- a/onetl/connection/file_df_connection/spark_hdfs/connection.py +++ b/onetl/connection/file_df_connection/spark_hdfs/connection.py @@ -313,7 +313,7 @@ def _get_active_namenode(self) -> str: raise RuntimeError(f"Cannot get list of namenodes for a cluster {self.cluster!r}") nodes_len = len(namenodes) - for i, namenode in enumerate(namenodes): + for i, namenode in enumerate(namenodes, start=1): log.debug("|%s| Trying namenode %r (%d of %d) ...", class_name, namenode, i, nodes_len) if self.Slots.is_namenode_active(namenode, self.cluster): log.info("|%s| Node %r is active!", class_name, namenode) @@ -341,15 +341,15 @@ def _get_host(self) -> str: raise RuntimeError(f"Host {self.host!r} is not an active namenode of cluster {self.cluster!r}") + def _get_conn_str(self) -> str: + # cache active host to reduce number of requests. + if not self._active_host: + self._active_host = self._get_host() + return f"hdfs://{self._active_host}:{self.ipc_port}" + def _convert_to_url(self, path: PurePathProtocol) -> str: - # "hdfs://namenode:8020/absolute/path" if host is set - if self._active_host: - host = self._active_host - else: - host = self._get_host() - # cache value to avoid getting active namenode for every path - self._active_host = host - return f"hdfs://{host}:{self.ipc_port}" + path.as_posix() + # example "hdfs://namenode:8020/absolute/path" + return self._get_conn_str() + path.as_posix() def _get_default_path(self): return RemotePath("/user") / getpass.getuser() diff --git a/onetl/connection/file_df_connection/spark_s3/connection.py b/onetl/connection/file_df_connection/spark_s3/connection.py index 182955cd..2044ebb6 100644 --- a/onetl/connection/file_df_connection/spark_s3/connection.py +++ b/onetl/connection/file_df_connection/spark_s3/connection.py @@ -133,7 +133,7 @@ class SparkS3(SparkFileDFConnection): from pyspark.sql import SparkSession # Create Spark session with Hadoop AWS libraries loaded - maven_packages = SparkS3.get_packages(spark_version="3.5.2") + maven_packages = SparkS3.get_packages(spark_version="3.5.3") # Some dependencies are not used, but downloading takes a lot of time. Skipping them. excluded_packages = [ "com.google.cloud.bigdataoss:gcs-connector", @@ -236,8 +236,8 @@ def get_packages( from onetl.connection import SparkS3 - SparkS3.get_packages(spark_version="3.5.2") - SparkS3.get_packages(spark_version="3.5.2", scala_version="2.12") + SparkS3.get_packages(spark_version="3.5.3") + SparkS3.get_packages(spark_version="3.5.3", scala_version="2.12") """ diff --git a/onetl/connection/kerberos_helpers.py b/onetl/connection/kerberos_helpers.py index b1dd5019..d0b3c0aa 100644 --- a/onetl/connection/kerberos_helpers.py +++ b/onetl/connection/kerberos_helpers.py @@ -5,35 +5,39 @@ import os import subprocess from logging import getLogger +from threading import Lock from onetl._util.file import is_file_readable log = getLogger(__name__) +_kinit_lock = Lock() def kinit_keytab(user: str, keytab: str | os.PathLike) -> None: path = is_file_readable(keytab) - cmd = ["kinit", user, "-k", "-t", os.fspath(path)] - log.info("|onETL| Executing kerberos auth command: %s", " ".join(cmd)) - subprocess.check_call(cmd) + with _kinit_lock: + cmd = ["kinit", user, "-k", "-t", os.fspath(path)] + log.info("|onETL| Executing kerberos auth command: %s", " ".join(cmd)) + subprocess.check_call(cmd) def kinit_password(user: str, password: str) -> None: cmd = ["kinit", user] log.info("|onETL| Executing kerberos auth command: %s", " ".join(cmd)) - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - ) - - _stdout, stderr = proc.communicate(password.encode("utf-8")) - exit_code = proc.poll() - if exit_code: - raise subprocess.CalledProcessError(exit_code, cmd, output=stderr) + with _kinit_lock: + with subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + # do not show user 'Please enter password' banner + stdout=subprocess.PIPE, + # do not capture stderr, immediately show all errors to user + ) as proc: + proc.communicate(password.encode("utf-8")) + exit_code = proc.poll() + if exit_code: + raise subprocess.CalledProcessError(exit_code, cmd) def kinit(user: str, keytab: os.PathLike | None = None, password: str | None = None) -> None: diff --git a/onetl/core/__init__.py b/onetl/core/__init__.py index 9796b173..abbfd0e3 100644 --- a/onetl/core/__init__.py +++ b/onetl/core/__init__.py @@ -40,4 +40,4 @@ def __getattr__(name: str): ) return getattr(import_module(f"onetl.{submodule}"), name) - raise ImportError(f"cannot import name {name!r} from {__name__!r}") + raise AttributeError(name) diff --git a/onetl/file/format/avro.py b/onetl/file/format/avro.py index 1f6e2e0e..426eb30f 100644 --- a/onetl/file/format/avro.py +++ b/onetl/file/format/avro.py @@ -88,7 +88,7 @@ class Avro(ReadWriteFileFormat): from pyspark.sql import SparkSession # Create Spark session with Avro package loaded - maven_packages = Avro.get_packages(spark_version="3.5.2") + maven_packages = Avro.get_packages(spark_version="3.5.3") spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) diff --git a/onetl/file/format/xml.py b/onetl/file/format/xml.py index 2e1ad003..508ec829 100644 --- a/onetl/file/format/xml.py +++ b/onetl/file/format/xml.py @@ -119,7 +119,7 @@ class XML(ReadWriteFileFormat): from pyspark.sql import SparkSession # Create Spark session with XML package loaded - maven_packages = XML.get_packages(spark_version="3.5.2") + maven_packages = XML.get_packages(spark_version="3.5.3") spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) @@ -184,10 +184,10 @@ def get_packages( # noqa: WPS231 from onetl.file.format import XML - XML.get_packages(spark_version="3.5.2") - XML.get_packages(spark_version="3.5.2", scala_version="2.12") + XML.get_packages(spark_version="3.5.3") + XML.get_packages(spark_version="3.5.3", scala_version="2.12") XML.get_packages( - spark_version="3.5.2", + spark_version="3.5.3", scala_version="2.12", package_version="0.18.0", ) diff --git a/onetl/hwm/store/__init__.py b/onetl/hwm/store/__init__.py index 4fb2d991..929b1d40 100644 --- a/onetl/hwm/store/__init__.py +++ b/onetl/hwm/store/__init__.py @@ -45,4 +45,4 @@ def __getattr__(name: str): return getattr(import_module("etl_entities.hwm_store"), name) - raise ImportError(f"cannot import name {name!r} from {__name__!r}") + raise AttributeError(name) diff --git a/onetl/strategy/hwm_store/__init__.py b/onetl/strategy/hwm_store/__init__.py index de994c20..2f89a595 100644 --- a/onetl/strategy/hwm_store/__init__.py +++ b/onetl/strategy/hwm_store/__init__.py @@ -39,7 +39,7 @@ def __getattr__(name: str): if name not in __all__: - raise ImportError(f"cannot import name {name!r} from {__name__!r}") + raise AttributeError(name) message = f""" Imports from module {__name__!r} are deprecated since v0.6.0 and will be removed in v1.0.0. diff --git a/requirements/tests/spark-3.5.2.txt b/requirements/tests/spark-3.5.3.txt similarity index 76% rename from requirements/tests/spark-3.5.2.txt rename to requirements/tests/spark-3.5.3.txt index 214f0d63..c5c1408a 100644 --- a/requirements/tests/spark-3.5.2.txt +++ b/requirements/tests/spark-3.5.3.txt @@ -1,5 +1,5 @@ numpy>=1.16 pandas>=1.0 pyarrow>=1.0 -pyspark==3.5.2 +pyspark==3.5.3 sqlalchemy diff --git a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_file_df.py b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_file_df.py index f59acf89..14d88250 100644 --- a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_file_df.py +++ b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_file_df.py @@ -169,3 +169,62 @@ def test_spark_metrics_recorder_file_df_writer_empty_input( metrics = recorder.metrics() assert not metrics.output.written_rows assert not metrics.output.written_bytes + + +def test_spark_metrics_recorder_file_df_writer_driver_failed( + spark, + local_fs_file_df_connection_with_path, + file_df_dataframe, +): + local_fs, target_path = local_fs_file_df_connection_with_path + + df = file_df_dataframe + + writer = FileDFWriter( + connection=local_fs, + format=CSV(), + target_path=target_path, + options=FileDFWriter.Options(if_exists="error"), + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + assert not metrics.output.written_bytes + + +def test_spark_metrics_recorder_file_df_writer_executor_failed( + spark, + local_fs_file_df_connection_with_path, + file_df_dataframe, +): + from pyspark.sql.functions import udf + from pyspark.sql.types import IntegerType + + @udf(returnType=IntegerType()) + def raise_exception(): + raise ValueError("Force task failure") + + local_fs, target_path = local_fs_file_df_connection_with_path + + failing_df = file_df_dataframe.select(raise_exception().alias("some")) + + writer = FileDFWriter( + connection=local_fs, + format=CSV(), + target_path=target_path, + options=FileDFWriter.Options(if_exists="append"), + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(failing_df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + assert not metrics.output.written_bytes diff --git a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_hive.py b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_hive.py index 7e8dc218..79116a3e 100644 --- a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_hive.py +++ b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_hive.py @@ -1,4 +1,5 @@ import time +from contextlib import suppress import pytest @@ -137,6 +138,53 @@ def test_spark_metrics_recorder_hive_write_empty(spark, processing, get_schema_t assert not metrics.output.written_rows +def test_spark_metrics_recorder_hive_write_driver_failed(spark, processing, prepare_schema_table): + df = processing.create_spark_df(spark).limit(0) + + mismatch_df = df.withColumn("mismatch", df.id_int) + + hive = Hive(cluster="rnd-dwh", spark=spark) + writer = DBWriter( + connection=hive, + target=prepare_schema_table.full_name, + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(mismatch_df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + + +def test_spark_metrics_recorder_hive_write_executor_failed(spark, processing, get_schema_table): + from pyspark.sql.functions import udf + from pyspark.sql.types import IntegerType + + df = processing.create_spark_df(spark).limit(0) + + @udf(returnType=IntegerType()) + def raise_exception(): + raise ValueError("Force task failure") + + failing_df = df.select(raise_exception().alias("some")) + + hive = Hive(cluster="rnd-dwh", spark=spark) + writer = DBWriter( + connection=hive, + target=get_schema_table.full_name, + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(failing_df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + + def test_spark_metrics_recorder_hive_execute(request, spark, processing, get_schema_table): df = processing.create_spark_df(spark) view_name = rand_str() diff --git a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_postgres.py b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_postgres.py index 67e31591..aa391eb8 100644 --- a/tests/tests_integration/test_metrics/test_spark_metrics_recorder_postgres.py +++ b/tests/tests_integration/test_metrics/test_spark_metrics_recorder_postgres.py @@ -1,4 +1,5 @@ import time +from contextlib import suppress import pytest @@ -167,6 +168,67 @@ def test_spark_metrics_recorder_postgres_write_empty(spark, processing, get_sche assert not metrics.output.written_rows +def test_spark_metrics_recorder_postgres_write_driver_failed(spark, processing, prepare_schema_table): + postgres = Postgres( + host=processing.host, + port=processing.port, + user=processing.user, + password=processing.password, + database=processing.database, + spark=spark, + ) + df = processing.create_spark_df(spark).limit(0) + + mismatch_df = df.withColumn("mismatch", df.id_int) + + writer = DBWriter( + connection=postgres, + target=prepare_schema_table.full_name, + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(mismatch_df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + + +def test_spark_metrics_recorder_postgres_write_executor_failed(spark, processing, get_schema_table): + from pyspark.sql.functions import udf + from pyspark.sql.types import IntegerType + + postgres = Postgres( + host=processing.host, + port=processing.port, + user=processing.user, + password=processing.password, + database=processing.database, + spark=spark, + ) + + @udf(returnType=IntegerType()) + def raise_exception(): + raise ValueError("Force task failure") + + df = processing.create_spark_df(spark).limit(0) + failing_df = df.select(raise_exception().alias("some")) + + writer = DBWriter( + connection=postgres, + target=get_schema_table.full_name, + ) + + with SparkMetricsRecorder(spark) as recorder: + with suppress(Exception): + writer.run(failing_df) + + time.sleep(0.1) # sleep to fetch late metrics from SparkListener + metrics = recorder.metrics() + assert not metrics.output.written_rows + + def test_spark_metrics_recorder_postgres_fetch(spark, processing, load_table_data): postgres = Postgres( host=processing.host, diff --git a/tests/tests_unit/test_file/test_format_unit/test_avro_unit.py b/tests/tests_unit/test_file/test_format_unit/test_avro_unit.py index 53c7a67a..37862cf7 100644 --- a/tests/tests_unit/test_file/test_format_unit/test_avro_unit.py +++ b/tests/tests_unit/test_file/test_format_unit/test_avro_unit.py @@ -29,14 +29,14 @@ def test_avro_get_packages_scala_version_not_supported(): [ # Detect Scala version by Spark version ("2.4.0", None, "org.apache.spark:spark-avro_2.11:2.4.0"), - ("3.5.2", None, "org.apache.spark:spark-avro_2.12:3.5.2"), + ("3.5.3", None, "org.apache.spark:spark-avro_2.12:3.5.3"), # Override Scala version ("2.4.0", "2.11", "org.apache.spark:spark-avro_2.11:2.4.0"), ("2.4.0", "2.12", "org.apache.spark:spark-avro_2.12:2.4.0"), - ("3.5.2", "2.12", "org.apache.spark:spark-avro_2.12:3.5.2"), - ("3.5.2", "2.13", "org.apache.spark:spark-avro_2.13:3.5.2"), + ("3.5.3", "2.12", "org.apache.spark:spark-avro_2.12:3.5.3"), + ("3.5.3", "2.13", "org.apache.spark:spark-avro_2.13:3.5.3"), # Scala version contain three digits when only two needed - ("3.5.2", "2.12.1", "org.apache.spark:spark-avro_2.12:3.5.2"), + ("3.5.3", "2.12.1", "org.apache.spark:spark-avro_2.12:3.5.3"), ], ) def test_avro_get_packages(spark_version, scala_version, package): diff --git a/tests/tests_unit/test_file/test_format_unit/test_excel_unit.py b/tests/tests_unit/test_file/test_format_unit/test_excel_unit.py index ecacb2ca..9f949651 100644 --- a/tests/tests_unit/test_file/test_format_unit/test_excel_unit.py +++ b/tests/tests_unit/test_file/test_format_unit/test_excel_unit.py @@ -35,17 +35,17 @@ def test_excel_get_packages_package_version_not_supported(): [ # Detect Scala version by Spark version ("3.2.4", None, None, ["com.crealytics:spark-excel_2.12:3.2.4_0.20.4"]), - ("3.5.2", None, None, ["com.crealytics:spark-excel_2.12:3.5.2_0.20.4"]), + ("3.5.3", None, None, ["com.crealytics:spark-excel_2.12:3.5.3_0.20.4"]), # Override Scala version ("3.2.4", "2.12", None, ["com.crealytics:spark-excel_2.12:3.2.4_0.20.4"]), ("3.2.4", "2.13", None, ["com.crealytics:spark-excel_2.13:3.2.4_0.20.4"]), - ("3.5.2", "2.12", None, ["com.crealytics:spark-excel_2.12:3.5.2_0.20.4"]), - ("3.5.2", "2.13", None, ["com.crealytics:spark-excel_2.13:3.5.2_0.20.4"]), + ("3.5.3", "2.12", None, ["com.crealytics:spark-excel_2.12:3.5.3_0.20.4"]), + ("3.5.3", "2.13", None, ["com.crealytics:spark-excel_2.13:3.5.3_0.20.4"]), # Override package version ("3.2.0", None, "0.16.0", ["com.crealytics:spark-excel_2.12:3.2.0_0.16.0"]), - ("3.5.2", None, "0.18.0", ["com.crealytics:spark-excel_2.12:3.5.2_0.18.0"]), + ("3.5.3", None, "0.18.0", ["com.crealytics:spark-excel_2.12:3.5.3_0.18.0"]), # Scala version contain three digits when only two needed - ("3.5.2", "2.12.1", None, ["com.crealytics:spark-excel_2.12:3.5.2_0.20.4"]), + ("3.5.3", "2.12.1", None, ["com.crealytics:spark-excel_2.12:3.5.3_0.20.4"]), ], ) def test_excel_get_packages(caplog, spark_version, scala_version, package_version, packages): diff --git a/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py b/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py index 891406ba..f0a25216 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py @@ -65,7 +65,7 @@ def normalize_cluster_name(cluster: str) -> str: assert Hive(cluster="RND-DWH", spark=spark_mock).cluster == "rnd-dwh" -def test_hive_known_get_current_cluster_hook(request, spark_mock, mocker): +def test_hive_known_get_current_cluster_hook(request, spark_mock): # no exception Hive(cluster="rnd-prod", spark=spark_mock).check() Hive(cluster="rnd-dwh", spark=spark_mock).check() diff --git a/tests/tests_unit/tests_file_df_connection_unit/test_spark_s3_unit.py b/tests/tests_unit/tests_file_df_connection_unit/test_spark_s3_unit.py index 9a5e6fac..59254361 100644 --- a/tests/tests_unit/tests_file_df_connection_unit/test_spark_s3_unit.py +++ b/tests/tests_unit/tests_file_df_connection_unit/test_spark_s3_unit.py @@ -10,9 +10,9 @@ @pytest.mark.parametrize( "spark_version, scala_version, package", [ - ("3.5.2", None, "org.apache.spark:spark-hadoop-cloud_2.12:3.5.2"), - ("3.5.2", "2.12", "org.apache.spark:spark-hadoop-cloud_2.12:3.5.2"), - ("3.5.2", "2.13", "org.apache.spark:spark-hadoop-cloud_2.13:3.5.2"), + ("3.5.3", None, "org.apache.spark:spark-hadoop-cloud_2.12:3.5.3"), + ("3.5.3", "2.12", "org.apache.spark:spark-hadoop-cloud_2.12:3.5.3"), + ("3.5.3", "2.13", "org.apache.spark:spark-hadoop-cloud_2.13:3.5.3"), ], ) def test_spark_s3_get_packages(spark_version, scala_version, package):