Source code for fast_package_file

"""fast_package_file
https://github.com/Kataiser/fast-package-file
https://fast-package-file.readthedocs.io/en/latest/
https://pypi.org/project/fast-package-file/
"""

__author__ = "Kataiser (https://github.com/Kataiser)"
__version__ = "1.1"
__copyright__ = "Copyright (c) 2019 Kataiser (https://github.com/Kataiser)"
__license__ = "MIT"

import gzip
import hashlib
import io
import json
import os
import sys
import time
import zlib
from typing import Union, Callable, Dict

try:
    import tqdm  # a neat progress bar
except ImportError:
    tqdm = None

FORMAT_VERSION = 1


class PackagedDataFile:
    def __init__(self, data_file_path: str, prepare: bool = True, decomp_func: Callable[[bytes], bytes] = None):
        """
        Prepare a packaged data file.

        :param data_file_path: Location of the package.
        :param prepare: Whether to load, decompress (if necessary), and parse the file location header now, or wait until the first file is loaded. Regardless, the entire package is not
            loaded.
        :param decomp_func: A supplied decompression function.
        """
        self.__data_file_path = data_file_path
        self.__prepared = prepare
        self.__decomp_func = decomp_func

        if prepare:
            if not os.path.exists(self.__data_file_path):  # check if file exists
                raise PackageDataError("'{}' doesn't seem to exist".format(self.__data_file_path))

            if not os.access(self.__data_file_path, os.R_OK):  # check file permissions
                raise PackageDataError("Don't have read permissions for '{}'".format(self.__data_file_path))

            with open(self.__data_file_path, 'rb') as data_file_init:
                format_version = int.from_bytes(data_file_init.read(2), byteorder='little')
                if format_version != FORMAT_VERSION:
                    raise PackageDataError("{} is corrupted or malformed (loaded file format version is {}, should be {})".format(self.__data_file_path, format_version, FORMAT_VERSION))

                self.__loc_data_length = int.from_bytes(data_file_init.read(8), byteorder='little')  # read header length
                if self.__loc_data_length < 0 or self.__loc_data_length > 1000000000000:  # 1 TB seems to be a reasonable limit
                    raise PackageDataError("{} is corrupted or malformed (header length is {})".format(self.__data_file_path, self.__loc_data_length))

                loc_data_compressed = bool.from_bytes(data_file_init.read(1), byteorder='little')  # read header's compressed state
                self.__loc_data_crc32 = bool.from_bytes(data_file_init.read(1), byteorder='little')  # read header's path format
                loc_data_raw = data_file_init.read(self.__loc_data_length)  # read the compressed header

            if loc_data_compressed:
                decomp_func_loc_data = decomp_func if decomp_func else gzip.decompress

                try:
                    loc_data_bin = decomp_func_loc_data(loc_data_raw)  # decompress
                except (OSError, zlib.error) as gzip_error:
                    raise PackageDataError("{} is corrupted or malformed ({})".format(self.__data_file_path, gzip_error))
            else:
                loc_data_bin = loc_data_raw

            try:
                self.file_data = json.loads(loc_data_bin.decode('utf-8'))  # and parse
            except json.decoder.JSONDecodeError:
                raise PackageDataError("{} is corrupted or malformed (couldn't decode JSON)".format(self.__data_file_path))
            except UnicodeDecodeError as utf8_error:
                raise PackageDataError("{} is corrupted or malformed ({})".format(self.__data_file_path, utf8_error))

    def load_file(self, file: str) -> bytes:
        """
        Load a single file from the package. Also loads and parses the location data header for the packge, if it hasn't been already.

        :param file: The path to the file, relative to the input directory (e.g. a file at surface level would be ``file.txt``, and one folder in would be ``folder\\file.txt``).
        :returns: The file as a bytes object, uncompressed.

        .. note::
            File paths stored within a package file are modified to always use backslashes as path seperators, regardless of what OS is used to build or load the package. Be sure to either
            escape the backslashes or use raw strings.
        """
        if not self.__prepared:
            self.__init__(self.__data_file_path, prepare=True, decomp_func=self.__decomp_func)  # elegant

        try:
            # get the file's stats: (offset, length, compressed (1 or 0), first byte, last byte)
            file_loc_data = self.file_data[str(zlib.crc32(file.encode('utf-8'))) if self.__loc_data_crc32 else file]
        except KeyError:
            raise PackageDataError("{} is corrupted or malformed (file '{}' doesn't exist in location header)".format(self.__data_file_path, file))

        with open(self.__data_file_path, 'rb') as data_file:
            data_file.seek(file_loc_data[0] + self.__loc_data_length + 12)  # account for header and other data
            data_file_raw = data_file.read(file_loc_data[1])

        if file_loc_data[2]:  # is compressed
            try:
                if self.__decomp_func:
                    data_file_out = self.__decomp_func(data_file_raw)
                else:
                    data_file_out = gzip.decompress(data_file_raw)
            except Exception as error:
                raise PackageDataError("{} is corrupted or malformed (decompression error for {}: '{}')".format(self.__data_file_path, file, error))
        else:
            data_file_out = data_file_raw

        if len(file_loc_data) == 6:  # hash info exists
            if file_loc_data[5].startswith('md5   '):
                hasher = hashlib.md5()
                hasher.update(data_file_out)
                hash_ = hasher.hexdigest()
            elif file_loc_data[5].startswith('sha256'):
                hasher = hashlib.sha256()
                hasher.update(data_file_out)
                hash_ = hasher.hexdigest()
            elif file_loc_data[5].startswith('crc32 '):
                hash_ = zlib.crc32(data_file_out)
            else:
                raise PackageDataError("{} is corrupted or malformed (hash method seems to be '{}')".format(self.__data_file_path, file_loc_data[5][:6]))

            if hash_ != file_loc_data[5][6:]:  # confirm hash
                raise PackageDataError("{} is corrupted or malformed ('{}' hash mismatch: {} != {})".format(self.__data_file_path, file, hash_, file_loc_data[5][6:]))
        else:  # basically a cheap hash
            if data_file_raw[0] != file_loc_data[3]:  # check if first byte matches
                raise PackageDataError("{} is corrupted or malformed (first byte of file '{}' should be {}, but was loaded as {})".format(
                                       self.__data_file_path, file, data_file_raw[0], file_loc_data[3]))
            elif data_file_raw[-1] != file_loc_data[4]:  # check if last byte matches
                raise PackageDataError("{} is corrupted or malformed (last byte of file '{}' should be {}, but was loaded as {})".format(
                                       self.__data_file_path, file, data_file_raw[-1], file_loc_data[4]))

        return data_file_out

    def load_bulk(self, prefix: str = '', postfix: str = '') -> Dict[str, bytes]:
        """
        Load multiple files at once, based on a prefix and/or a postfix for the file path (uses ``.startswith`` and ``.endswith``).

        :param prefix: File path prefix (e.g. a subdirectory).
        :param postfix: File path postfix (e.g. a file extension).
        :returns: A :py:class:`dict`, formatted as ``{'path': bytes}``.

        .. note::
            Doesn't support packages using crc32 file paths.
        """
        out_data = {}

        for file_in_package in self.file_data.keys():
            if file_in_package.startswith(prefix) and file_in_package.endswith(postfix):
                out_data[file_in_package] = (self.load_file(file_in_package))

        if out_data:
            return out_data
        else:
            raise PackageDataError("{} is corrupted or malformed (no file paths start with '{}' and end with '{}')".format(self.__data_file_path, prefix, postfix))

    def __repr__(self):
        """
        Includes path, number of files, and total file size.

        :returns: :py:class:`str`
        """
        return "<fast_package_file.PackagedDataFile object for {} ({} files, {} bytes)>".format(self.__data_file_path, len(self.file_data), os.stat(self.__data_file_path).st_size)


[docs]def oneshot(data_file_path: str, file: str, decomp_func: Callable[[bytes], bytes] = None) -> bytes: """ Load a single file from a package file. :param data_file_path: Location of the package. :param file: The path to the file, relative to the input directory (same as :py:func:`~PackagedDataFile.load_file`). :param decomp_func: A supplied decompression function. :returns: The file as :py:class:`bytes`, uncompressed. .. note:: If you're planning on ever loading another file from the same package, it's recommended to use :py:mod:`~PackagedDataFile` explicitly since it caches the file location data. """ oneshot_package = PackagedDataFile(data_file_path, decomp_func=decomp_func) oneshot_file = oneshot_package.load_file(file) return oneshot_file
[docs]def oneshot_bulk(data_file_path: str, prefix: str = '', postfix: str = '', decomp_func: Callable[[bytes], bytes] = None) -> Dict[str, bytes]: """ Combines :py:func:`oneshot` and :py:func:`~PackagedDataFile.load_bulk`. Same note as :py:func:`oneshot`. :param data_file_path: Location of the package. :param prefix: Same as :py:func:`~PackagedDataFile.load_bulk`. :param postfix: Same as :py:func:`~PackagedDataFile.load_bulk`. :param decomp_func: A supplied decompression function. :returns: A :py:class:`dict`, formatted as ``{'path': bytes}``. .. note:: Doesn't support packages using crc32 file paths. """ oneshot_package = PackagedDataFile(data_file_path, decomp_func=decomp_func) oneshot_list = oneshot_package.load_bulk(prefix, postfix) return oneshot_list
# only used when reading packages
[docs]class PackageDataError(Exception): pass
# build a directory and all subdirectories into a single file (this part isn't fast tbh)
[docs]def build(directory: str, target: str, compress: bool = True, keep_comp_threshold: float = 0.98, hash_mode: Union[str, None] = None, comp_func: Callable[[bytes], bytes] = None, crc32_paths: bool = False, progress_bar: bool = True, silent: bool = False): """ Build a packaged data file from a directory. :param directory: The directory to package. Includes all subdirectories. :param target: The path for the package file. If it already exists, it's overwritten. The file extension can be whatever you like. :param compress: Whether to compress the package, either with ``comp_func`` or Gzip by default. :param keep_comp_threshold: 0 through 1 (default is 0.98). For each input file, if compression doesn't improve file size by this ratio, the file is instead stored uncompressed. Set to 1 to compress every file no matter what. :param hash_mode: The hash method to use to ensure file validity. Can be "md5", "crc32", or "sha256". If :py:class:`None` (the default), only the first and last bytes are compared. :param comp_func: A supplied decompression function that takes :py:class:`bytes` and returns :py:class:`bytes`. Some recommendations: LZMA, LZMA2, Deflate, BZip2, Oodle, or Zstandard. :param crc32_paths: Store file paths as `crc32 <https://en.wikipedia.org/wiki/Cyclic_redundancy_check>`_ numbers. Useful for obfuscating file names and paths. :param progress_bar: Whether to show a progress bar (uses `tqdm <https://github.com/tqdm/tqdm>`_). If tqdm isn't installed, this is irrelevant. :param silent: Disable all prints. """ print(directory) start_time = time.perf_counter() loc_data_save = {} files_to_add = [] current_loc = 0 total_data_in = 0 files_in = [] gztemps_deleted = 0 compressed_formats = ('dl_', '.gif', '.jpg', '.jpeg', '.bmp', '.png', '.wmf', '.mkv', '.mp4', '.wmv', '.avi', '.bik', '.flv', '.ogg', '.mpg', '.m2v', '.m4v', '.vob', '.mp3', '.aac', '.wma', '.flac', '.zip', '.xap', '.rar', '.7z', '.cab', '.lzx', '.docx', '.xlsx', '.pptx', '.vssx', '.vstx', '.onepkg') # list from CompactGUI for root, dirs, files in os.walk(directory): for filename_in in files: filename_in_joined = os.path.join(root, filename_in) # prepare file list in order to have a progress bar if filename_in_joined.endswith('.gztemp'): # in case packaging was cancelled os.remove(filename_in_joined) gztemps_deleted += 1 else: files_in.append(filename_in_joined) if gztemps_deleted != 0 and not silent: print(" Deleted {} .gztemp files".format(gztemps_deleted)) files_in.sort() if tqdm and not silent: input_iterable = tqdm.tqdm(files_in, file=sys.stdout, ncols=40, unit=' files', bar_format=' {l_bar}{bar}|', disable=not progress_bar) else: input_iterable = files_in for file_path in input_iterable: with open(file_path, 'rb') as input_file: input_file_data_raw = input_file.read() total_data_in += len(input_file_data_raw) is_compressed = [c_format for c_format in compressed_formats if file_path.endswith(c_format)] # check file extension if compress and not is_compressed: if comp_func: input_file_data_comp = comp_func(input_file_data_raw) else: input_file_data_comp = _gzip_compress_fix(input_file_data_raw) if len(input_file_data_comp) < len(input_file_data_raw) * keep_comp_threshold: # if compression improves file size input_file_data = input_file_data_comp compressed = True gz_path = '{}.gztemp'.format(file_path) # because storing every file's data takes too much memory files_to_add.append(gz_path) with open(gz_path, 'wb') as temp_gz: temp_gz.write(input_file_data) else: # if compression doesn't improve file size (much) input_file_data = input_file_data_raw compressed = False files_to_add.append(file_path) else: # skipping gzip testing because file is likely already compressed based on extension input_file_data = input_file_data_raw compressed = False files_to_add.append(file_path) file_path_short = file_path[len(directory) + len(os.sep):] # removes some redundancy file_path_out = file_path_short.replace(os.sep, '\\') # makes building uniform across OSs if crc32_paths: file_path_out = zlib.crc32(file_path_out.encode('utf-8')) loc_data_save[file_path_out] = [current_loc, len(input_file_data), compressed, input_file_data[0], input_file_data[-1]] # add file to header dictionary current_loc += len(input_file_data) # keep track of offset # calculate and add hash, if enabled if hash_mode == 'md5': hasher = hashlib.md5() hasher.update(input_file_data_raw) loc_data_save[file_path_out].append('md5 {}'.format(hasher.hexdigest())) elif hash_mode == 'sha256': hasher = hashlib.sha256() hasher.update(input_file_data_raw) loc_data_save[file_path_out].append('sha256{}'.format(hasher.hexdigest())) elif hash_mode == 'crc32': loc_data_save[file_path_out].append('crc32 {}'.format(zlib.crc32(input_file_data_raw))) elif hash_mode: raise KeyError("""'hash_mode' isn't "md5", "sha256", or "crc32" """[:-1]) comp_func_loc_data = comp_func if (comp_func and compress) else _gzip_compress_fix loc_data_save_json = json.dumps(loc_data_save, separators=(',', ':'), sort_keys=True).encode('utf-8') # convert header to binary loc_data_save_out = comp_func_loc_data(loc_data_save_json) if compress else loc_data_save_json # and compress it loc_data_save_length = (len(loc_data_save_out)).to_bytes(8, byteorder='little') # get its length as an 8 bit binary with open(target, 'wb') as out_file: out_file.write(FORMAT_VERSION.to_bytes(2, byteorder='little')) # add the file format version out_file.write(loc_data_save_length) # add the header's length out_file.write(compress.to_bytes(1, byteorder='little')) # add the header's compressed state out_file.write(crc32_paths.to_bytes(1, byteorder='little')) # add the header's path format out_file.write(loc_data_save_out) # add the header for file_to_add_path in files_to_add: with open(file_to_add_path, 'rb') as file_to_add: # add the files from either the original data or its .gztemp out_file.write(file_to_add.read()) if file_to_add_path.endswith('.gztemp'): # and then delete the .gztemp os.remove(file_to_add_path) if not silent: # monitoring output duration = format(time.perf_counter() - start_time, '.2f') input_size = format(total_data_in / 1048576, '.2f') target_size = format(os.stat(target).st_size / 1048576, '.2f') print(" {} ({} MB, {} files) -> {} ({} MB) took {} seconds".format(directory, input_size, len(files_in), target, target_size, duration))
# basically gzip.compress(), but deterministic (mtime=0) def _gzip_compress_fix(data: bytes) -> bytes: buffer = io.BytesIO() with gzip.GzipFile(mode='wb', fileobj=buffer, mtime=0) as temp_file: temp_file.write(data) return buffer.getvalue() if __name__ == '__main__': pass