Skip to content

Commit

Permalink
Add a test for /dev/stdin
Browse files Browse the repository at this point in the history
  • Loading branch information
rhpvorderman committed Mar 28, 2024
1 parent 7060bf3 commit aa922e8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 37 deletions.
47 changes: 10 additions & 37 deletions src/xopen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import dataclasses
import gzip
import stat
import sys
import io
import os
Expand Down Expand Up @@ -509,26 +508,15 @@ def _open_zst(
assert compresslevel != 0
if compresslevel is None:
compresslevel = XOPEN_DEFAULT_ZST_COMPRESSION
if zstandard:
max_window_bits = zstandard.WINDOWLOG_MAX
else:
max_window_bits = 31
if threads != 0:
try:
# zstd can compress using multiple cores
program_args: Tuple[str, ...] = ("zstd",)
if "r" in mode:
# Only use --long=31 for decompression. Using it for
# compression overrides level settings for window size and
# forces other zstd users to use `--long=31` to decompress any
# archive that has been compressed by xopen.
program_args += (f"--long={max_window_bits}",)
return _PipedCompressionProgram(
filename,
mode,
compresslevel,
threads,
_ProgramSettings(program_args, tuple(range(1, 20)), "-T"),
_PROGRAM_SETTINGS["zstd"],
)
except OSError:
if zstandard is None:
Expand All @@ -537,9 +525,11 @@ def _open_zst(

if zstandard is None:
raise ImportError("zstandard module (python-zstandard) not available")
dctx = zstandard.ZstdDecompressor(max_window_size=2**max_window_bits)
cctx = zstandard.ZstdCompressor(level=compresslevel)
f = zstandard.open(filename, mode, cctx=cctx, dctx=dctx) # type: ignore
if compresslevel is not None and "r" not in mode:
cctx = zstandard.ZstdCompressor(level=compresslevel)
else:
cctx = None
f = zstandard.open(filename, mode, cctx=cctx) # type: ignore
if mode == "rb":
return io.BufferedReader(f)
return io.BufferedWriter(f) # mode "ab" and "wb"
Expand Down Expand Up @@ -702,6 +692,8 @@ def _file_or_path_to_binary_stream(
file_or_path: FileOrPath, binary_mode: str
) -> Tuple[BinaryIO, bool]:
assert binary_mode in ("rb", "wb", "ab")
if file_or_path == "-":
return _open_stdin_or_out(binary_mode), False
if isinstance(file_or_path, (str, bytes)) or hasattr(file_or_path, "__fspath__"):
return open(os.fspath(file_or_path), binary_mode), True # type: ignore
if isinstance(file_or_path, io.TextIOWrapper):
Expand All @@ -721,22 +713,10 @@ def _filepath_from_path_or_filelike(fileorpath: FileOrPath) -> str:
except TypeError:
pass
if hasattr(fileorpath, "name"):
name = fileorpath.name
if isinstance(name, str):
return name
elif isinstance(name, bytes):
return name.decode()
return fileorpath.name
return ""


def _file_is_a_socket_or_pipe(filepath):
try:
mode = os.stat(filepath).st_mode
return stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISPORT(mode)
except (OSError, TypeError): # Type error for unexpected types in stat.
return False


@overload
def xopen(
filename: FileOrPath,
Expand Down Expand Up @@ -767,7 +747,7 @@ def xopen(
...


def xopen( # noqa: C901
def xopen(
filename: FileOrPath,
mode: Literal["r", "w", "a", "rt", "rb", "wt", "wb", "at", "ab"] = "r",
compresslevel: Optional[int] = None,
Expand Down Expand Up @@ -830,13 +810,6 @@ def xopen( # noqa: C901
binary_mode = mode[0] + "b"
filepath = _filepath_from_path_or_filelike(filename)

# Open non-regular files such as pipes and sockets here to force opening
# them once.
if filename == "-":
filename = _open_stdin_or_out(binary_mode)
elif _file_is_a_socket_or_pipe(filename):
filename = open(filename, binary_mode) # type: ignore

if format not in (None, "gz", "xz", "bz2", "zst"):
raise ValueError(
f"Format not supported: {format}. "
Expand Down
18 changes: 18 additions & 0 deletions tests/test_xopen.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,3 +690,21 @@ def test_xopen_write_to_pipe(threads, ext):
process.wait()
process.stdout.close()
assert data == CONTENT


@pytest.mark.skipif(
not os.path.exists("/dev/stdin"), reason="/dev/stdin does not exist"
)
@pytest.mark.parametrize("threads", (0, 1))
def test_xopen_dev_stdin_read(threads, ext):
if ext == ".zst" and zstandard is None:
return
file = str(Path(__file__).parent / f"file.txt{ext}")
result = subprocess.run(
f"cat {file} | python -c 'import xopen; "
f'f=xopen.xopen("/dev/stdin", "rt", threads={threads});print(f.read())\'',
shell=True,
stdout=subprocess.PIPE,
encoding="ascii",
)
assert result.stdout == CONTENT + "\n"

0 comments on commit aa922e8

Please sign in to comment.