From aa922e8e0d8f754c0e7cb971e9cbfd85c034699b Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Thu, 28 Mar 2024 06:36:39 +0100 Subject: [PATCH] Add a test for /dev/stdin --- src/xopen/__init__.py | 47 +++++++++---------------------------------- tests/test_xopen.py | 18 +++++++++++++++++ 2 files changed, 28 insertions(+), 37 deletions(-) diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index 873ae40..7c81b26 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -10,7 +10,6 @@ import dataclasses import gzip -import stat import sys import io import os @@ -509,26 +508,15 @@ def _open_zst( assert compresslevel != 0 if compresslevel is None: compresslevel = XOPEN_DEFAULT_ZST_COMPRESSION - if zstandard: - max_window_bits = zstandard.WINDOWLOG_MAX - else: - max_window_bits = 31 if threads != 0: try: # zstd can compress using multiple cores - program_args: Tuple[str, ...] = ("zstd",) - if "r" in mode: - # Only use --long=31 for decompression. Using it for - # compression overrides level settings for window size and - # forces other zstd users to use `--long=31` to decompress any - # archive that has been compressed by xopen. - program_args += (f"--long={max_window_bits}",) return _PipedCompressionProgram( filename, mode, compresslevel, threads, - _ProgramSettings(program_args, tuple(range(1, 20)), "-T"), + _PROGRAM_SETTINGS["zstd"], ) except OSError: if zstandard is None: @@ -537,9 +525,11 @@ def _open_zst( if zstandard is None: raise ImportError("zstandard module (python-zstandard) not available") - dctx = zstandard.ZstdDecompressor(max_window_size=2**max_window_bits) - cctx = zstandard.ZstdCompressor(level=compresslevel) - f = zstandard.open(filename, mode, cctx=cctx, dctx=dctx) # type: ignore + if compresslevel is not None and "r" not in mode: + cctx = zstandard.ZstdCompressor(level=compresslevel) + else: + cctx = None + f = zstandard.open(filename, mode, cctx=cctx) # type: ignore if mode == "rb": return io.BufferedReader(f) return io.BufferedWriter(f) # mode "ab" and "wb" @@ -702,6 +692,8 @@ def _file_or_path_to_binary_stream( file_or_path: FileOrPath, binary_mode: str ) -> Tuple[BinaryIO, bool]: assert binary_mode in ("rb", "wb", "ab") + if file_or_path == "-": + return _open_stdin_or_out(binary_mode), False if isinstance(file_or_path, (str, bytes)) or hasattr(file_or_path, "__fspath__"): return open(os.fspath(file_or_path), binary_mode), True # type: ignore if isinstance(file_or_path, io.TextIOWrapper): @@ -721,22 +713,10 @@ def _filepath_from_path_or_filelike(fileorpath: FileOrPath) -> str: except TypeError: pass if hasattr(fileorpath, "name"): - name = fileorpath.name - if isinstance(name, str): - return name - elif isinstance(name, bytes): - return name.decode() + return fileorpath.name return "" -def _file_is_a_socket_or_pipe(filepath): - try: - mode = os.stat(filepath).st_mode - return stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISPORT(mode) - except (OSError, TypeError): # Type error for unexpected types in stat. - return False - - @overload def xopen( filename: FileOrPath, @@ -767,7 +747,7 @@ def xopen( ... -def xopen( # noqa: C901 +def xopen( filename: FileOrPath, mode: Literal["r", "w", "a", "rt", "rb", "wt", "wb", "at", "ab"] = "r", compresslevel: Optional[int] = None, @@ -830,13 +810,6 @@ def xopen( # noqa: C901 binary_mode = mode[0] + "b" filepath = _filepath_from_path_or_filelike(filename) - # Open non-regular files such as pipes and sockets here to force opening - # them once. - if filename == "-": - filename = _open_stdin_or_out(binary_mode) - elif _file_is_a_socket_or_pipe(filename): - filename = open(filename, binary_mode) # type: ignore - if format not in (None, "gz", "xz", "bz2", "zst"): raise ValueError( f"Format not supported: {format}. " diff --git a/tests/test_xopen.py b/tests/test_xopen.py index 47974a1..86234bc 100644 --- a/tests/test_xopen.py +++ b/tests/test_xopen.py @@ -690,3 +690,21 @@ def test_xopen_write_to_pipe(threads, ext): process.wait() process.stdout.close() assert data == CONTENT + + +@pytest.mark.skipif( + not os.path.exists("/dev/stdin"), reason="/dev/stdin does not exist" +) +@pytest.mark.parametrize("threads", (0, 1)) +def test_xopen_dev_stdin_read(threads, ext): + if ext == ".zst" and zstandard is None: + return + file = str(Path(__file__).parent / f"file.txt{ext}") + result = subprocess.run( + f"cat {file} | python -c 'import xopen; " + f'f=xopen.xopen("/dev/stdin", "rt", threads={threads});print(f.read())\'', + shell=True, + stdout=subprocess.PIPE, + encoding="ascii", + ) + assert result.stdout == CONTENT + "\n"