# --- T2-COPYRIGHT-BEGIN --- # t2/package/*/xar/0001-Update-tests-for-Python-3-and-Nix-sandbox.patch # Copyright (C) 2025 The T2 SDE Project # SPDX-License-Identifier: GPL-2.0 or patched project license # --- T2-COPYRIGHT-END --- From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Ivan Trubach Date: Sat, 27 Jul 2024 12:53:54 +0300 Subject: [PATCH 01/19] Update tests for Python 3 and Nix sandbox This change updates integration tests for Python 3 and fixes some assumptions to work under Nix sandbox (in particular, extended attributes are not allowed). Also updates xar/test/validate.c for modern OpenSSL versions. --- xar/test/attr.py | 54 +++++++++++++++++++------- xar/test/buffer.c | 3 +- xar/test/checksums.py | 75 +++++++++++++++++++----------------- xar/test/compression.py | 27 ++++++++----- xar/test/data.py | 19 +++++---- xar/test/hardlink.py | 12 ++++-- xar/test/heap.py | 27 +++++++------ xar/test/integrity.py | 45 ++++++++++++---------- xar/test/run-all.py | 25 ++++++++++++ xar/test/util.py | 85 ++++++++++++++++++++++++++++++++++++----- xar/test/validate.c | 32 +++++++++------- 11 files changed, 282 insertions(+), 122 deletions(-) create mode 100755 xar/test/run-all.py diff --git a/xar/test/attr.py b/xar/test/attr.py index adc2c56..c28a4e6 100755 --- a/xar/test/attr.py +++ b/xar/test/attr.py @@ -6,6 +6,7 @@ import os import os.path import shutil import subprocess +import sys import xattr import util @@ -26,20 +27,27 @@ import util class MissingExtendedAttributeError(AssertionError): pass -def _random_big_data(bytes=65536, path="/dev/random"): +def _random_big_data(bytes=65536): """ Returns a random string with the number of bytes requested. Due to xar implementation details, this should be greater than 4096 (32768 for compressed heap testing). """ - with open(path, "r") as f: - return f.read(bytes) + return os.urandom(bytes) + +def _to_bytes(s): + if isinstance(s, str): + return s.encode("utf-8") + return s def _test_xattr_on_file_with_contents(filename, file_contents, xattrs=[], xar_create_flags=[], xar_extract_flags=[]): + file_contents = _to_bytes(file_contents) + xattr_prefix = "user." if sys.platform != "darwin" else "" + xattrs = [(xattr_prefix + k, _to_bytes(v)) for k, v in xattrs] try: # Write file out - with open(filename, "w") as f: + with open(filename, "wb") as f: f.write(file_contents) for (key, value) in xattrs: xattr.setxattr(f, key, value) @@ -51,14 +59,16 @@ def _test_xattr_on_file_with_contents(filename, file_contents, xattrs=[], xar_cr with util.directory_created("extracted") as directory: # Validate resulting xattrs subprocess.check_call(["xar", "-x", "-C", directory, "-f", path] + xar_extract_flags) + extracted_filename = os.path.join(directory, filename) + expected_set = {key for key, _ in xattrs} + actual_set = set(xattr.listxattr(os.path.join(directory, filename))) + for key in expected_set - actual_set: + raise MissingExtendedAttributeError("extended attribute \"{n}\" missing after extraction".format(n=key)) for (key, value) in xattrs: - try: - assert xattr.getxattr(os.path.join(directory, filename), key) == value, "extended attribute \"{n}\" has incorrect contents after extraction".format(n=key) - except KeyError: - raise MissingExtendedAttributeError("extended attribute \"{n}\" missing after extraction".format(n=key)) + assert xattr.getxattr(extracted_filename, key) == value, "extended attribute \"{n}\" has incorrect contents after extraction".format(n=key) # Validate file contents - with open(os.path.join(directory, filename), "r") as f: + with open(os.path.join(directory, filename), "rb") as f: if f.read() != file_contents: raise MissingExtendedAttributeError("archived file \"{f}\" has has incorrect contents after extraction".format(f=filename)) finally: @@ -73,36 +83,47 @@ def _test_xattr_on_file_with_contents(filename, file_contents, xattrs=[], xar_cr # tests are commented out awaiting a day when this might be different. # def empty_xattr_empty_file(filename): +# util.skip_if_no_xattrs_support() # _test_xattr_on_file_with_contents(filename, "", xattrs=[("foo", "")]) def small_xattr_empty_file(filename): + util.skip_if_no_xattrs_support() _test_xattr_on_file_with_contents(filename, "", xattrs=[("foo", "1234")]) def large_xattr_empty_file(filename): + util.skip_if_no_xattrs_support() _test_xattr_on_file_with_contents(filename, "", xattrs=[("foo", _random_big_data(5000))]) # def empty_xattr_small_file(filename): +# util.skip_if_no_xattrs_support() # _test_xattr_on_file_with_contents(filename, "small.file.contents", xattrs=[("foo", "")]) def small_xattr_small_file(filename): + util.skip_if_no_xattrs_support() _test_xattr_on_file_with_contents(filename, "small.file.contents", xattrs=[("foo", "1234")]) def large_xattr_small_file(filename): + util.skip_if_no_xattrs_support() _test_xattr_on_file_with_contents(filename, "small.file.contents", xattrs=[("foo", _random_big_data(4567))]) # def empty_xattr_large_file(filename): +# util.skip_if_no_xattrs_support() # _test_xattr_on_file_with_contents(filename, _random_big_data(10000000), xattrs=[("foo", "")]) def small_xattr_large_file(filename): + util.skip_if_no_xattrs_support() _test_xattr_on_file_with_contents(filename, _random_big_data(5000000), xattrs=[("foo", "1234")]) def large_xattr_large_file(filename): + util.skip_if_no_xattrs_support() _test_xattr_on_file_with_contents(filename, _random_big_data(9876543), xattrs=[("foo", _random_big_data(6543))]) def multiple_xattrs(filename): + util.skip_if_no_xattrs_support() _test_xattr_on_file_with_contents(filename, "", xattrs=[("foo", "bar"), ("baz", "1234"), ("quux", "more")]) # ("empty", "") def distribution_create(filename): + util.skip_if_no_xattrs_support() try: _test_xattr_on_file_with_contents(filename, "dummy", xattrs=[("foo", "bar")], xar_create_flags=["--distribution"]) except MissingExtendedAttributeError: @@ -114,6 +135,7 @@ def distribution_create(filename): # when it can. # def distribution_extract(filename): +# util.skip_if_no_xattrs_support() # try: # _test_xattr_on_file_with_contents(filename, "dummy", xattrs=[("foo", "bar")], xar_extract_flags=["--distribution"]) # except MissingExtendedAttributeError: @@ -128,12 +150,18 @@ TEST_CASES = (small_xattr_empty_file, large_xattr_empty_file, multiple_xattrs, distribution_create) if __name__ == "__main__": + failed = False for case in TEST_CASES: + func_name = case.__name__ try: - case(case.func_name) - print("PASSED: {f}".format(f=case.func_name)) + case(func_name) + print("PASSED: {f}".format(f=func_name)) except (AssertionError, IOError, subprocess.CalledProcessError): - import sys, os - print("FAILED: {f}".format(f=case.func_name)) + failed = True + print("FAILED: {f}".format(f=func_name)) sys.excepthook(*sys.exc_info()) print("") + except util.TestCaseSkipError as e: + print("SKIPPED: {f}: {m}".format(f=func_name, m=e)) + if failed: + sys.exit(1) diff --git a/xar/test/buffer.c b/xar/test/buffer.c index a353cef..e4c5639 100644 --- a/xar/test/buffer.c +++ b/xar/test/buffer.c @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -50,7 +51,7 @@ int main(int argc, char *argv[]) if( red < sb.st_size ) fprintf(stderr, "Incomplete read\n"); - x = xar_open("/tmp/test.xar", WRITE); + x = xar_open("test.xar", WRITE); if( x == NULL ) { fprintf(stderr, "Error creating xarchive\n"); exit(6); diff --git a/xar/test/checksums.py b/xar/test/checksums.py index 7080d7c..0f39e63 100755 --- a/xar/test/checksums.py +++ b/xar/test/checksums.py @@ -2,6 +2,7 @@ from __future__ import print_function +import contextlib import hashlib import os import os.path @@ -9,6 +10,7 @@ import re import shutil import struct import subprocess +import sys import util @@ -17,15 +19,21 @@ import util # Utility Functions # +@contextlib.contextmanager +def _test_archive_created(filename, directory, *args, **kwargs): + with util.test_directory_created(directory) as test_directory: + with util.archive_created(filename, test_directory, *args, **kwargs) as path: + yield path + def _get_numeric_value_from_header(archive_name, key): """ Dumps the header of the specified xar archive and extracts the header size from the output, in bytes. """ - header = subprocess.check_output(["xar", "--dump-header", "-f", archive_name]) + header = subprocess.check_output(["xar", "--dump-header", "-f", archive_name], text=True) for line in header.splitlines(): - matchdata = re.match("^(.+):\s+(.+)$", line) # magic: 0x78617221 (OK) + matchdata = re.match(r"^(.+):\s+(.+)$", line) # magic: 0x78617221 (OK) assert matchdata, "unexpected output from `xar --dump-header`:\n{h}".format(h=header) if matchdata.groups()[0] == key: return int(matchdata.groups()[1]) @@ -38,17 +46,14 @@ def _get_toc_size(archive_name): return _get_numeric_value_from_header(archive_name, "Compressed TOC length") def _clobber_bytes_at(clobber_range, path): - with open(path, "r+") as f: + with open(path, "rb+") as f: f.seek(clobber_range[0]) - with open("/dev/random", "r") as r: - random_bytes = r.read(len(clobber_range)) - f.write(random_bytes) + f.write(os.urandom(len(clobber_range))) def _verify_extraction_failed(filename): with util.directory_created("extracted") as directory: try: - with open("/dev/null", "w") as n: - returncode = subprocess.call(["xar", "-x", "-C", directory, "-f", filename], stdout=n, stderr=n) + returncode = subprocess.call(["xar", "-x", "-C", directory, "-f", filename], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) assert returncode != 0, "xar reported success extracting an archive with a broken TOC" finally: if os.path.exists(directory): @@ -63,7 +68,7 @@ def _verify_header_checksum(filename, algorithm): header_size = _get_header_size(filename) toc_length = _get_toc_size(filename) - with open(filename, "r") as f: + with open(filename, "rb") as f: f.seek(header_size) h = hashlib.new(algorithm, f.read(toc_length)) computed_digest = h.digest() @@ -76,23 +81,23 @@ def _verify_header_checksum(filename, algorithm): # def default_toc_checksum_validity(filename): - with util.archive_created(filename, "/bin") as path: + with _test_archive_created(filename, "testdir") as path: _verify_header_checksum(path, "sha1") def sha1_toc_checksum_validity(filename): - with util.archive_created(filename, "/bin", "--toc-cksum", "sha1") as path: + with _test_archive_created(filename, "testdir", "--toc-cksum", "sha1") as path: _verify_header_checksum(path, "sha1") def sha256_toc_checksum_validity(filename): - with util.archive_created(filename, "/bin", "--toc-cksum", "sha256") as path: + with _test_archive_created(filename, "testdir", "--toc-cksum", "sha256") as path: _verify_header_checksum(path, "sha256") def sha512_toc_checksum_validity(filename): - with util.archive_created(filename, "/bin", "--toc-cksum", "sha512") as path: + with _test_archive_created(filename, "testdir", "--toc-cksum", "sha512") as path: _verify_header_checksum(path, "sha512") def broken_toc_default_checksum(filename): - with util.archive_created(filename, "/bin") as path: + with _test_archive_created(filename, "testdir") as path: # Mess up the archive toc_start = _get_header_size(path) _clobber_bytes_at(range(toc_start + 4, toc_start + 4 + 100), path) # Why did the original test specify 4? No idea. @@ -101,7 +106,7 @@ def broken_toc_default_checksum(filename): _verify_extraction_failed(filename) def broken_toc_sha1_checksum(filename): - with util.archive_created(filename, "/bin", "--toc-cksum", "sha1") as path: + with _test_archive_created(filename, "testdir", "--toc-cksum", "sha1") as path: # Mess up the archive toc_start = _get_header_size(path) _clobber_bytes_at(range(toc_start + 4, toc_start + 4 + 100), path) # Why did the original test specify 4? No idea. @@ -110,7 +115,7 @@ def broken_toc_sha1_checksum(filename): _verify_extraction_failed(filename) def broken_toc_sha256_checksum(filename): - with util.archive_created(filename, "/bin", "--toc-cksum", "sha256") as path: + with _test_archive_created(filename, "testdir", "--toc-cksum", "sha256") as path: # Mess up the archive toc_start = _get_header_size(path) _clobber_bytes_at(range(toc_start + 4, toc_start + 4 + 100), path) # Why did the original test specify 4? No idea. @@ -119,7 +124,7 @@ def broken_toc_sha256_checksum(filename): _verify_extraction_failed(filename) def broken_toc_sha512_checksum(filename): - with util.archive_created(filename, "/bin", "--toc-cksum", "sha512") as path: + with _test_archive_created(filename, "testdir", "--toc-cksum", "sha512") as path: # Mess up the archive toc_start = _get_header_size(path) _clobber_bytes_at(range(toc_start + 4, toc_start + 4 + 100), path) # Why did the original test specify 4? No idea. @@ -128,7 +133,7 @@ def broken_toc_sha512_checksum(filename): _verify_extraction_failed(filename) def broken_heap_default_checksum(filename): - with util.archive_created(filename, "/bin") as path: + with _test_archive_created(filename, "testdir") as path: # Mess up the archive toc_start = _get_header_size(path) toc_size = _get_toc_size(path) @@ -139,11 +144,11 @@ def broken_heap_default_checksum(filename): _verify_extraction_failed(filename) def default_checksum_algorithm(filename): - with util.archive_created(filename, "/bin") as path: - header = subprocess.check_output(["xar", "--dump-header", "-f", path]) + with _test_archive_created(filename, "testdir") as path: + header = subprocess.check_output(["xar", "--dump-header", "-f", path], text=True) found = False for line in header.splitlines(): - matchdata = re.match("^Checksum algorithm:\s+(\d+)\s+\\((\w+)\\)$", line) + matchdata = re.match(r"^Checksum algorithm:\s+(\d+)\s+\((\w+)\)$", line) if not matchdata: continue found = True @@ -156,7 +161,7 @@ def default_checksum_algorithm(filename): # # def invalid_checksum_algorithm(filename): # try: -# with util.archive_created(filename, "/bin", "--toc-cksum", "invalid-algorithm") as path: +# with _test_archive_created(filename, "testdir", "--toc-cksum", "invalid-algorithm") as path: # raise AssertionError("xar succeeded when it should have failed") # except subprocess.CalledProcessError: # pass @@ -164,17 +169,15 @@ def default_checksum_algorithm(filename): # It does fail for md5 explicitly, however def md5_toc_checksum_failure(filename): try: - with open("/dev/null", "a") as devnull: - with util.archive_created(filename, "/bin", "--toc-cksum", "md5", stderr=devnull) as path: - raise AssertionError("xar succeeded when it should have failed") + with _test_archive_created(filename, "testdir", "--toc-cksum", "md5", stderr=subprocess.DEVNULL) as path: + raise AssertionError("xar succeeded when it should have failed") except subprocess.CalledProcessError: pass def md5_file_checksum_failure(filename): try: - with open("/dev/null", "a") as devnull: - with util.archive_created(filename, "/bin", "--file-cksum", "md5", stderr=devnull) as path: - raise AssertionError("xar succeeded when it should have failed") + with _test_archive_created(filename, "testdir", "--file-cksum", "md5", stderr=subprocess.DEVNULL) as path: + raise AssertionError("xar succeeded when it should have failed") except subprocess.CalledProcessError: pass @@ -185,8 +188,8 @@ def _verify_checksum_algorithm(filename, algorithm): else: algorithm = "sha1" - with util.archive_created(filename, "/bin", *additional_args) as path: - toc = subprocess.check_output(["xar", "--dump-toc=-", "-f", path]) + with _test_archive_created(filename, "testdir", *additional_args) as path: + toc = subprocess.check_output(["xar", "--dump-toc=-", "-f", path], text=True) found = False for line in toc.splitlines(): if ''.format(a=algorithm) in line or ''.format(a=algorithm) in line: @@ -214,12 +217,16 @@ TEST_CASES = (default_toc_checksum_validity, sha1_toc_checksum_validity, sha256_ md5_toc_checksum_failure, md5_file_checksum_failure,) if __name__ == "__main__": + failed = False for case in TEST_CASES: + func_name = case.__name__ try: - case("{f}.xar".format(f=case.func_name)) - print("PASSED: {f}".format(f=case.func_name)) + case("{f}.xar".format(f=func_name)) + print("PASSED: {f}".format(f=func_name)) except (AssertionError, IOError, subprocess.CalledProcessError): - import sys, os - print("FAILED: {f}".format(f=case.func_name)) + failed = True + print("FAILED: {f}".format(f=func_name)) sys.excepthook(*sys.exc_info()) print("") + if failed: + sys.exit(1) diff --git a/xar/test/compression.py b/xar/test/compression.py index 2b3b2ec..7ed30ca 100755 --- a/xar/test/compression.py +++ b/xar/test/compression.py @@ -2,10 +2,10 @@ from __future__ import print_function -import cStringIO import os import os.path import subprocess +import sys import tempfile import util @@ -16,10 +16,15 @@ import util # def _check_compression(filename, *args, **kwargs): - with util.archive_created(filename, "/bin", *args, **kwargs) as path: + with ( + util.directory_created("temp") as temp_directory, + util.chdir(temp_directory), + util.test_directory_created("testdir") as test_directory, + util.archive_created(filename, "testdir", *args, **kwargs) as path, + ): with util.directory_created("extracted") as directory: subprocess.check_call(["xar", "-x", "-f", path, "-C", directory]) - util.assert_identical_directories("/bin", os.path.join(directory, "bin")) + util.assert_identical_directories(test_directory, os.path.join(directory, "testdir")) # @@ -61,14 +66,18 @@ TEST_CASES = (no_compression, default_compression, gzip_compression_short, bzip2_compression_short, lzma_compression_short) if __name__ == "__main__": + failed = False for case in TEST_CASES: + func_name = case.__name__ try: - case("{f}.xar".format(f=case.func_name)) - print("PASSED: {f}".format(f=case.func_name)) + case("{f}.xar".format(f=func_name)) + print("PASSED: {f}".format(f=func_name)) except (AssertionError, IOError, subprocess.CalledProcessError): - import sys, os - print("FAILED: {f}".format(f=case.func_name)) + failed = True + print("FAILED: {f}".format(f=func_name)) sys.excepthook(*sys.exc_info()) print("") - except util.TestCaseSkipError, e: - print("SKIPPED: {f}: {m}".format(f=case.func_name, m=e.message)) + except util.TestCaseSkipError as e: + print("SKIPPED: {f}: {m}".format(f=func_name, m=e)) + if failed: + sys.exit(1) diff --git a/xar/test/data.py b/xar/test/data.py index a9793f0..f902b78 100755 --- a/xar/test/data.py +++ b/xar/test/data.py @@ -6,6 +6,7 @@ import contextlib import os import os.path import subprocess +import sys import util @@ -28,7 +29,7 @@ def _process_toc(archive_path): subprocess.check_call(["xar", "-f", archive_path, "--dump-toc=data_toc.xml"]) try: result = subprocess.check_output(["xsltproc", "-o", "-", os.path.realpath(os.path.join(__file__, "..", "data.xsl")), "data_toc.xml"]) - assert result == "", "expected no data offset, but instead found:{o}".format(o=result) + assert result == b"", "expected no data offset, but instead found:{o}".format(o=result) finally: os.unlink("data_toc.xml") @@ -90,14 +91,18 @@ TEST_CASES = (zero_length_default_compression, zero_length_no_compression, mixed_length_gzip_compression, mixed_length_bzip2_compression, mixed_length_lzma_compression) if __name__ == "__main__": + failed = False for case in TEST_CASES: + func_name = case.__name__ try: - case("{f}.xar".format(f=case.func_name)) - print("PASSED: {f}".format(f=case.func_name)) + case("{f}.xar".format(f=func_name)) + print("PASSED: {f}".format(f=func_name)) except (AssertionError, IOError, subprocess.CalledProcessError): - import sys, os - print("FAILED: {f}".format(f=case.func_name)) + failed = True + print("FAILED: {f}".format(f=func_name)) sys.excepthook(*sys.exc_info()) print("") - except util.TestCaseSkipError, e: - print("SKIPPED: {f}: {m}".format(f=case.func_name, m=e.message)) + except util.TestCaseSkipError as e: + print("SKIPPED: {f}: {m}".format(f=func_name, m=e)) + if failed: + sys.exit(1) diff --git a/xar/test/hardlink.py b/xar/test/hardlink.py index 5145216..da409d6 100755 --- a/xar/test/hardlink.py +++ b/xar/test/hardlink.py @@ -5,6 +5,7 @@ from __future__ import print_function import os import os.path import subprocess +import sys import util @@ -58,12 +59,17 @@ def hard_link_identical_files(filename): TEST_CASES = (hard_link_in_directory, hard_link_in_cwd, hard_link_identical_files) if __name__ == "__main__": + failed = False for case in TEST_CASES: + func_name = case.__name__ try: - case("{f}.xar".format(f=case.func_name)) - print("PASSED: {f}".format(f=case.func_name)) + case("{f}.xar".format(f=func_name)) + print("PASSED: {f}".format(f=func_name)) except (AssertionError, IOError, subprocess.CalledProcessError): + failed = True import sys, os - print("FAILED: {f}".format(f=case.func_name)) + print("FAILED: {f}".format(f=func_name)) sys.excepthook(*sys.exc_info()) print("") + if failed: + sys.exit(1) diff --git a/xar/test/heap.py b/xar/test/heap.py index f431c77..727412a 100755 --- a/xar/test/heap.py +++ b/xar/test/heap.py @@ -8,6 +8,7 @@ import os.path import re import shutil import subprocess +import sys import util @@ -19,8 +20,8 @@ import util def _file_offsets_for_archive(path, xsl_path): subprocess.check_call(["xar", "--dump-toc=heap_toc.xml", "-f", path]) try: - offsets = subprocess.check_output(["xsltproc", xsl_path, "heap_toc.xml"]) - matches = [re.match("^(.+)\s([^\s]+)$", offset) for offset in offsets.splitlines()] + offsets = subprocess.check_output(["xsltproc", xsl_path, "heap_toc.xml"], text=True) + matches = [re.match(r"^(.+)\s([^\s]+)$", offset) for offset in offsets.splitlines()] offsets = [(match.groups()[0], int(match.groups()[1])) for match in matches] return offsets finally: @@ -33,9 +34,8 @@ def _file_offsets_for_archive(path, xsl_path): XSL_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "heap1.xsl") def normal_heap(filename): - with util.directory_created("scratch") as directory: - shutil.copy("/bin/ls", os.path.join(directory, "ls")) - shutil.copy(os.path.join(directory, "ls"), os.path.join(directory, "foo")) + with util.test_directory_created("scratch") as directory: + shutil.copy(os.path.join(directory, "script"), os.path.join(directory, "foo")) with util.chdir(directory): with util.archive_created(os.path.join("..", "heap.xar"), ".") as path: # Verify file offsets are as we expect @@ -50,9 +50,8 @@ def normal_heap(filename): subprocess.check_call(["xar", "-x", "-f", path, "-C", extracted]) def coalesce_heap(filename): - with util.directory_created("scratch") as directory: - shutil.copy("/bin/ls", os.path.join(directory, "ls")) - shutil.copy(os.path.join(directory, "ls"), os.path.join(directory, "foo")) + with util.test_directory_created("scratch") as directory: + shutil.copy(os.path.join(directory, "script"), os.path.join(directory, "foo")) with util.chdir(directory): with util.archive_created(os.path.join("..", "heap.xar"), ".", "--coalesce-heap") as path: # Verify file offsets are as we expect @@ -67,12 +66,16 @@ def coalesce_heap(filename): TEST_CASES = (normal_heap, coalesce_heap) if __name__ == "__main__": + failed = False for case in TEST_CASES: + func_name = case.__name__ try: - case("{f}.xar".format(f=case.func_name)) - print("PASSED: {f}".format(f=case.func_name)) + case("{f}.xar".format(f=func_name)) + print("PASSED: {f}".format(f=func_name)) except (AssertionError, IOError, subprocess.CalledProcessError): - import sys, os - print("FAILED: {f}".format(f=case.func_name)) + failed = True + print("FAILED: {f}".format(f=func_name)) sys.excepthook(*sys.exc_info()) print("") + if failed: + sys.exit(1) diff --git a/xar/test/integrity.py b/xar/test/integrity.py index c47ac6a..f4d2af7 100755 --- a/xar/test/integrity.py +++ b/xar/test/integrity.py @@ -5,6 +5,7 @@ from __future__ import print_function import os import os.path import subprocess +import sys import util @@ -12,9 +13,9 @@ import util # Utility Functions # -def _test_truncation(filename, path_to_be_archived, bytes_to_chop, *args): - with util.archive_created(filename, path_to_be_archived) as path: - with open("/dev/null", "w") as bitbucket: +def _test_truncation(filename, bytes_to_chop, *args): + with util.test_directory_created("testdir") as test_directory: + with util.archive_created(filename, test_directory) as path: size = os.stat(path).st_size while size > 0: last_size = size @@ -23,7 +24,7 @@ def _test_truncation(filename, path_to_be_archived, bytes_to_chop, *args): f.truncate(size) with util.directory_created("scratch") as directory: - returncode = subprocess.call(["xar", "-x", "-f", path, "-C", directory], stderr=bitbucket) + returncode = subprocess.call(["xar", "-x", "-f", path, "-C", directory], stderr=subprocess.DEVNULL) assert returncode != 0, "xar claimed to succeed when extracting a truncated archive" # @@ -31,42 +32,42 @@ def _test_truncation(filename, path_to_be_archived, bytes_to_chop, *args): # def large_uncompressed(filename): - _test_truncation(filename, "/usr/share/man/man1", 1024 * 1024, "--compression=none") + _test_truncation(filename, 1024 * 1024, "--compression=none") def large_default_compression(filename): - _test_truncation(filename, "/usr/share/man/man1", 1024 * 1024) + _test_truncation(filename, 1024 * 1024) def large_gzip_compressed(filename): util.skip_if_no_compression_support("gzip") - _test_truncation(filename, "/usr/share/man/man1", 1024 * 1024, "--compression=gzip") + _test_truncation(filename, 1024 * 1024, "--compression=gzip") def large_bzip2_compressed(filename): util.skip_if_no_compression_support("bzip2") - _test_truncation(filename, "/usr/share/man/man1", 1024 * 1024, "--compression=bzip2") + _test_truncation(filename, 1024 * 1024, "--compression=bzip2") def large_lzma_compressed(filename): util.skip_if_no_compression_support("lzma") - _test_truncation(filename, "/usr/share/man/man1", 1024 * 1024, "--compression=lzma") + _test_truncation(filename, 1024 * 1024, "--compression=lzma") # "small" variants use a non-base-2 size to try to catch issues that occur on uneven boundaries def small_uncompressed(filename): - _test_truncation(filename, "/bin", 43651, "--compression=none") + _test_truncation(filename, 43651, "--compression=none") def small_default_compression(filename): - _test_truncation(filename, "/bin", 43651) + _test_truncation(filename, 43651) def small_gzip_compressed(filename): util.skip_if_no_compression_support("gzip") - _test_truncation(filename, "/bin", 43651, "--compression=gzip") + _test_truncation(filename, 43651, "--compression=gzip") def small_bzip2_compressed(filename): util.skip_if_no_compression_support("bzip2") - _test_truncation(filename, "/bin", 43651, "--compression=bzip2") + _test_truncation(filename, 43651, "--compression=bzip2") def small_lzma_compressed(filename): util.skip_if_no_compression_support("lzma") - _test_truncation(filename, "/bin", 43651, "--compression=lzma") + _test_truncation(filename, 43651, "--compression=lzma") TEST_CASES = (large_uncompressed, large_default_compression, @@ -75,14 +76,18 @@ TEST_CASES = (large_uncompressed, large_default_compression, small_gzip_compressed, small_bzip2_compressed, small_lzma_compressed) if __name__ == "__main__": + failed = False for case in TEST_CASES: + func_name = case.__name__ try: - case("{f}.xar".format(f=case.func_name)) - print("PASSED: {f}".format(f=case.func_name)) + case("{f}.xar".format(f=func_name)) + print("PASSED: {f}".format(f=func_name)) except (AssertionError, IOError, subprocess.CalledProcessError): - import sys, os - print("FAILED: {f}".format(f=case.func_name)) + failed = True + print("FAILED: {f}".format(f=func_name)) sys.excepthook(*sys.exc_info()) print("") - except util.TestCaseSkipError, e: - print("SKIPPED: {f}: {m}".format(f=case.func_name, m=e.message)) + except util.TestCaseSkipError as e: + print("SKIPPED: {f}: {m}".format(f=func_name, m=e)) + if failed: + sys.exit(1) diff --git a/xar/test/run-all.py b/xar/test/run-all.py new file mode 100755 index 0000000..05e3054 --- /dev/null +++ b/xar/test/run-all.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 + +import os.path +import subprocess +import sys + +test_suites = [ + "attr.py", + "checksums.py", + "compression.py", + "data.py", + "hardlink.py", + "heap.py", + "integrity.py", +] + +test_path = os.path.dirname(__file__) + +failed = False +for suite in test_suites: + p = subprocess.run([sys.executable, "--", os.path.join(test_path, suite)]) + if p.returncode: + failed = True +if failed: + sys.exit(1) diff --git a/xar/test/util.py b/xar/test/util.py index da79925..423dd3c 100644 --- a/xar/test/util.py +++ b/xar/test/util.py @@ -1,6 +1,8 @@ #!/usr/bin/env python import contextlib +import errno +import functools import hashlib import os import os.path @@ -13,16 +15,65 @@ import xattr class TestCaseSkipError(Exception): pass +@functools.cache +def _check_xattrs_supported(): + """ + Returns True if the filesystem supports extended attributes. + """ + with directory_created("empty") as directory: + try: + xattr.setxattr(directory, "user.xattrcheck", b"supported") + return True + except OSError as e: + if e.errno != errno.ENOTSUP: + raise + return False + +def skip_if_no_xattrs_support(): + """ + Raises TestCaseSkipError if the the filesystem does not support extended + attributes. + """ + if not _check_xattrs_supported(): + raise TestCaseSkipError("filesystem does not support extended attributes") + +@functools.cache +def _check_compression_supported(type): + """ + Returns True if xar has support for the given compression type compiled + in. This function performs a runtime check that tries to compress data + with the given compression type and looks for a known error string. It + ignores all other errors. + """ + supported = True + with directory_created("empty") as directory: + archive_path = f"{type}_compression_check.xar" + try: + return f"{type} support not compiled in." not in subprocess.run( + [ + "xar", + "-c", + "-f", + archive_path, + "--compression=" + type, + directory, + ], + stdout=subprocess.PIPE, + text=True, + ).stdout + except: + # Assume that this compression type is supported. + pass + finally: + if os.path.exists(archive_path): + os.unlink(archive_path) + return supported + def skip_if_no_compression_support(type): """ - Raises TestCaseSkipError if the type is "lzma" and the test is running on - darwin (OS X). In the future, we should add a hidden debugging flag to xar - to determine valid compression types. This will skip incorrectly if a - custom xar is used on OS X, or if a custom xar on another platform is - built without bzip2 or lzma. - + Raises TestCaseSkipError if the compression type is not compiled in. """ - if sys.platform == "darwin" and type == "lzma": + if not _check_compression_supported(type): raise TestCaseSkipError("{t} support not compiled in".format(t=type)) @contextlib.contextmanager @@ -43,6 +94,22 @@ def directory_created(directory_path): if os.path.exists(directory_path): shutil.rmtree(directory_path) +@contextlib.contextmanager +def test_directory_created(directory_path): + """ + Like directory_created, but populates the directory with test files. + """ + with directory_created(directory_path) as directory: + with open(os.path.join(directory, "script"), "w+", opener=lambda path, flags: os.open(path, flags, 0o750)) as f: + f.write("#!/bin/sh\necho hello world") + with open(os.path.join(directory, "random_1kb"), "wb+") as f: + f.write(os.urandom(1000)) + with open(os.path.join(directory, "random_4kib"), "wb+") as f: + f.write(os.urandom(4096)) + with open(os.path.join(directory, "random_1mb"), "wb+") as f: + f.write(os.urandom(9999999)) + yield directory + @contextlib.contextmanager def archive_created(archive_path, content_path, *extra_args, **extra_kwargs): """ @@ -68,7 +135,7 @@ def archive_created(archive_path, content_path, *extra_args, **extra_kwargs): HASH_CHUNK_SIZE = 32768 def _md5_path(path): - with open(path, "r") as f: + with open(path, "rb") as f: h = hashlib.md5() while True: last = f.read(HASH_CHUNK_SIZE) @@ -122,7 +189,7 @@ def assert_identical_directories(path1, path2): # Sizes and the like assert stat1.st_size == stat2.st_size, "size mismatch for \"{e1}\" ({s1}) and \"{e2}\" ({s2})".format(e1=entry1, s1=stat1.st_size, e2=entry2, s2=stat2.st_size) - assert stat1.st_mtime == stat2.st_mtime, "mtime mismatch for \"{e1}\" and \"{e2}\"".format(e1=entry1, e2=entry2) + assert int(stat1.st_mtime) == int(stat2.st_mtime), "mtime mismatch for \"{e1}\" and \"{e2}\"".format(e1=entry1, e2=entry2) assert _md5_path(entry1) == _md5_path(entry2), "md5 hash mismatch for \"{e1}\" and \"{e2}\"".format(e1=entry1, e2=entry2) if os.path.isdir(entry1): assert_identical_directories(entry1, entry2) diff --git a/xar/test/validate.c b/xar/test/validate.c index dfe69eb..a5fbe37 100644 --- a/xar/test/validate.c +++ b/xar/test/validate.c @@ -16,37 +16,40 @@ off_t HeapOff = 0; -static char* xar_format_md5(const unsigned char* m) { +static char* xar_format_sha1(const unsigned char* m) { char* result = NULL; asprintf(&result, "%02x%02x%02x%02x" "%02x%02x%02x%02x" "%02x%02x%02x%02x" + "%02x%02x%02x%02x" "%02x%02x%02x%02x", m[0], m[1], m[2], m[3], m[4], m[5], m[6], m[7], m[8], m[9], m[10], m[11], - m[12], m[13], m[14], m[15]); + m[12], m[13], m[14], m[15], + m[16], m[17], m[18], m[19]); return result; } void heap_check(int fd, const char *name, const char *prop, off_t offset, off_t length, const char *csum) { char *buf; - EVP_MD_CTX ctx; + EVP_MD_CTX *ctx; const EVP_MD *md; - unsigned char md5str[EVP_MAX_MD_SIZE]; + unsigned char sha1str[EVP_MAX_MD_SIZE]; unsigned int len; ssize_t r; - char *formattedmd5; + char *formattedsha1; fprintf(stderr, "Heap checking %s %s at offset: %" PRIu64 "\n", name, prop, HeapOff+offset); OpenSSL_add_all_digests(); - md = EVP_get_digestbyname("md5"); + md = EVP_get_digestbyname("sha1"); if( md == NULL ) { - fprintf(stderr, "No md5 digest in openssl\n"); + fprintf(stderr, "No sha1 digest in openssl\n"); exit(1); } - EVP_DigestInit(&ctx, md); + ctx = EVP_MD_CTX_create(); + EVP_DigestInit(ctx, md); buf = malloc(length); if( !buf ) { @@ -65,14 +68,15 @@ void heap_check(int fd, const char *name, const char *prop, off_t offset, off_t fprintf(stderr, "Error reading from the heap\n"); exit(1); } - EVP_DigestUpdate(&ctx, buf, length); - EVP_DigestFinal(&ctx, md5str, &len); + EVP_DigestUpdate(ctx, buf, length); + EVP_DigestFinal(ctx, sha1str, &len); + EVP_MD_CTX_destroy(ctx); - formattedmd5 = xar_format_md5(md5str); - if( strcmp(formattedmd5, csum) != 0 ) { - fprintf(stderr, "%s %s checksum does not match\n", name, prop); + formattedsha1 = xar_format_sha1(sha1str); + if( strcmp(formattedsha1, csum) != 0 ) { + fprintf(stderr, "%s %s checksum does not match (got %s but expected %s)\n", name, prop, formattedsha1, csum); } - free(formattedmd5); + free(formattedsha1); free(buf); } -- 2.44.1