Spaces:
Running
Running
from __future__ import annotations | |
import csv | |
import hashlib | |
import os.path | |
import re | |
import stat | |
import time | |
from collections import OrderedDict | |
from io import StringIO, TextIOWrapper | |
from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo | |
from wheel.cli import WheelError | |
from wheel.util import log, urlsafe_b64decode, urlsafe_b64encode | |
# Non-greedy matching of an optional build number may be too clever (more | |
# invalid wheel filenames will match). Separate regex for .dist-info? | |
WHEEL_INFO_RE = re.compile( | |
r"""^(?P<namever>(?P<name>[^\s-]+?)-(?P<ver>[^\s-]+?))(-(?P<build>\d[^\s-]*))? | |
-(?P<pyver>[^\s-]+?)-(?P<abi>[^\s-]+?)-(?P<plat>\S+)\.whl$""", | |
re.VERBOSE, | |
) | |
MINIMUM_TIMESTAMP = 315532800 # 1980-01-01 00:00:00 UTC | |
def get_zipinfo_datetime(timestamp=None): | |
# Some applications need reproducible .whl files, but they can't do this without | |
# forcing the timestamp of the individual ZipInfo objects. See issue #143. | |
timestamp = int(os.environ.get("SOURCE_DATE_EPOCH", timestamp or time.time())) | |
timestamp = max(timestamp, MINIMUM_TIMESTAMP) | |
return time.gmtime(timestamp)[0:6] | |
class WheelFile(ZipFile): | |
"""A ZipFile derivative class that also reads SHA-256 hashes from | |
.dist-info/RECORD and checks any read files against those. | |
""" | |
_default_algorithm = hashlib.sha256 | |
def __init__(self, file, mode="r", compression=ZIP_DEFLATED): | |
basename = os.path.basename(file) | |
self.parsed_filename = WHEEL_INFO_RE.match(basename) | |
if not basename.endswith(".whl") or self.parsed_filename is None: | |
raise WheelError(f"Bad wheel filename {basename!r}") | |
ZipFile.__init__(self, file, mode, compression=compression, allowZip64=True) | |
self.dist_info_path = "{}.dist-info".format( | |
self.parsed_filename.group("namever") | |
) | |
self.record_path = self.dist_info_path + "/RECORD" | |
self._file_hashes = OrderedDict() | |
self._file_sizes = {} | |
if mode == "r": | |
# Ignore RECORD and any embedded wheel signatures | |
self._file_hashes[self.record_path] = None, None | |
self._file_hashes[self.record_path + ".jws"] = None, None | |
self._file_hashes[self.record_path + ".p7s"] = None, None | |
# Fill in the expected hashes by reading them from RECORD | |
try: | |
record = self.open(self.record_path) | |
except KeyError: | |
raise WheelError(f"Missing {self.record_path} file") from None | |
with record: | |
for line in csv.reader( | |
TextIOWrapper(record, newline="", encoding="utf-8") | |
): | |
path, hash_sum, size = line | |
if not hash_sum: | |
continue | |
algorithm, hash_sum = hash_sum.split("=") | |
try: | |
hashlib.new(algorithm) | |
except ValueError: | |
raise WheelError( | |
f"Unsupported hash algorithm: {algorithm}" | |
) from None | |
if algorithm.lower() in {"md5", "sha1"}: | |
raise WheelError( | |
"Weak hash algorithm ({}) is not permitted by PEP " | |
"427".format(algorithm) | |
) | |
self._file_hashes[path] = ( | |
algorithm, | |
urlsafe_b64decode(hash_sum.encode("ascii")), | |
) | |
def open(self, name_or_info, mode="r", pwd=None): | |
def _update_crc(newdata): | |
eof = ef._eof | |
update_crc_orig(newdata) | |
running_hash.update(newdata) | |
if eof and running_hash.digest() != expected_hash: | |
raise WheelError(f"Hash mismatch for file '{ef_name}'") | |
ef_name = ( | |
name_or_info.filename if isinstance(name_or_info, ZipInfo) else name_or_info | |
) | |
if ( | |
mode == "r" | |
and not ef_name.endswith("/") | |
and ef_name not in self._file_hashes | |
): | |
raise WheelError(f"No hash found for file '{ef_name}'") | |
ef = ZipFile.open(self, name_or_info, mode, pwd) | |
if mode == "r" and not ef_name.endswith("/"): | |
algorithm, expected_hash = self._file_hashes[ef_name] | |
if expected_hash is not None: | |
# Monkey patch the _update_crc method to also check for the hash from | |
# RECORD | |
running_hash = hashlib.new(algorithm) | |
update_crc_orig, ef._update_crc = ef._update_crc, _update_crc | |
return ef | |
def write_files(self, base_dir): | |
log.info(f"creating '{self.filename}' and adding '{base_dir}' to it") | |
deferred = [] | |
for root, dirnames, filenames in os.walk(base_dir): | |
# Sort the directory names so that `os.walk` will walk them in a | |
# defined order on the next iteration. | |
dirnames.sort() | |
for name in sorted(filenames): | |
path = os.path.normpath(os.path.join(root, name)) | |
if os.path.isfile(path): | |
arcname = os.path.relpath(path, base_dir).replace(os.path.sep, "/") | |
if arcname == self.record_path: | |
pass | |
elif root.endswith(".dist-info"): | |
deferred.append((path, arcname)) | |
else: | |
self.write(path, arcname) | |
deferred.sort() | |
for path, arcname in deferred: | |
self.write(path, arcname) | |
def write(self, filename, arcname=None, compress_type=None): | |
with open(filename, "rb") as f: | |
st = os.fstat(f.fileno()) | |
data = f.read() | |
zinfo = ZipInfo( | |
arcname or filename, date_time=get_zipinfo_datetime(st.st_mtime) | |
) | |
zinfo.external_attr = (stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode)) << 16 | |
zinfo.compress_type = compress_type or self.compression | |
self.writestr(zinfo, data, compress_type) | |
def writestr(self, zinfo_or_arcname, data, compress_type=None): | |
if isinstance(zinfo_or_arcname, str): | |
zinfo_or_arcname = ZipInfo( | |
zinfo_or_arcname, date_time=get_zipinfo_datetime() | |
) | |
zinfo_or_arcname.compress_type = self.compression | |
zinfo_or_arcname.external_attr = (0o664 | stat.S_IFREG) << 16 | |
if isinstance(data, str): | |
data = data.encode("utf-8") | |
ZipFile.writestr(self, zinfo_or_arcname, data, compress_type) | |
fname = ( | |
zinfo_or_arcname.filename | |
if isinstance(zinfo_or_arcname, ZipInfo) | |
else zinfo_or_arcname | |
) | |
log.info(f"adding '{fname}'") | |
if fname != self.record_path: | |
hash_ = self._default_algorithm(data) | |
self._file_hashes[fname] = ( | |
hash_.name, | |
urlsafe_b64encode(hash_.digest()).decode("ascii"), | |
) | |
self._file_sizes[fname] = len(data) | |
def close(self): | |
# Write RECORD | |
if self.fp is not None and self.mode == "w" and self._file_hashes: | |
data = StringIO() | |
writer = csv.writer(data, delimiter=",", quotechar='"', lineterminator="\n") | |
writer.writerows( | |
( | |
(fname, algorithm + "=" + hash_, self._file_sizes[fname]) | |
for fname, (algorithm, hash_) in self._file_hashes.items() | |
) | |
) | |
writer.writerow((format(self.record_path), "", "")) | |
self.writestr(self.record_path, data.getvalue()) | |
ZipFile.close(self) | |