Source code for fairly.file.local

"""LocalFile class module.

LocalFile class is used to perform operations on local files.

Usage example:

    >>> file = LocalFile("/path/to/local/file/filename.txt")
    >>> file.type
        application/text
    >>> file.size
        543
    >>> file.is_archive
        False
"""
from . import File
from typing import Callable, List

import os
import os.path
import mimetypes
import hashlib
import zipfile
import tarfile
import logging


[docs] class LocalFile(File): """LocalFile class. Class Attributes: CHUNK_SIZE: Chunk size in bytes to calculate MD5 checksum (default = 65536). NO_EXTRACT: List of file extensions which should not be extracted. Attributes: _fullpath (str): Full path of the local file. """ CHUNK_SIZE = 2**18 NO_EXTRACT = [ ".docx", ".xlsx", ".pptx", ] def __init__(self, fullpath: str, basepath: str = None, md5: str = None): """Initializes LocalFile object. Args: fullpath (str): Full path of the local file. basepath (str): Base path of the local file (optional). md5 (str): MD5 checksum of the local file (optional). Raises: ValueError("Invalid file path"): If fullpath is not a valid file path. """ if not os.path.isfile(fullpath): raise ValueError("Invalid file path") self._fullpath = fullpath self._path = os.path.relpath(fullpath, basepath) if basepath else fullpath self._name = os.path.basename(fullpath) self._size = os.path.getsize(fullpath) self._type = None self._md5 = md5 @property def fullpath(self) -> str: """Full path of the local file.""" return self._fullpath @property def type(self) -> str: """Content type of the local file.""" if self._type is None: logging.info("Guessing content type of %s.", self.fullpath) self._type, _ = mimetypes.guess_type(self.fullpath) logging.info("Guessed content type is %s.", self._type) return self._type @property def md5(self) -> str: """MD5 checksum of the local file. MD5 checksum is only calculated once and cached for subsequent calls. """ if self._md5 is None: logging.info("Calculating MD5 checksum of %s.", self.fullpath) with open(self.fullpath, "rb") as file: md5 = hashlib.md5() while chunk := file.read(self.CHUNK_SIZE): md5.update(chunk) self._md5 = md5.hexdigest() logging.info("Calculated MD5 checksum is %s.", self._md5) return self._md5 @property def is_archive(self) -> bool: """Checks if file is an archive file. Returns: True if file is an archive file, False otherwise. """ if self.NO_EXTRACT and (self.extension in self.NO_EXTRACT): return False if zipfile.is_zipfile(self.fullpath): return True elif tarfile.is_tarfile(self.fullpath): return True else: return False
[docs] def match(self, val: str) -> bool: """Checks if file matches the specified file identifier. File fullpath is compared with the specified identifier in addition to the properties checked by File.match(). Args: val (str): File identifier. Returns: True if file matches the specified file identifier, False otherwise. """ return True if self.fullpath == val else super().match(val)
[docs] def extract(self, path: str = None, notify: Callable = None) -> List: """Extracts archive file contents to a specified directory. Args: path: Path of the directory to extract to. Default is the current working directory. notify: Notification callback function. Three arguments are provided to the callback function: - file (LocalFile): File object of the extracted local file. - current_size (int): Current total uncompressed size of extracted files. - total_size (int): Total uncompressed size of the archive. Raises: ValueError("Invalid path"): If path is not a directory path. ValueError("Invalid archive item {name}"): If archive item path is not valid. ValueError("Invalid archive file"): If file is not an archive file. Returns: List of names of extracted files (str). """ # Raise exception if invalid path if path: if not os.path.isdir(path): raise ValueError("Invalid path") else: path = "" files = [] # Check if ZIP archive if zipfile.is_zipfile(self.fullpath): # Open ZIP archive with zipfile.ZipFile(self.fullpath, "r") as archive: # Get list of items items = archive.infolist() # Calculate total size total_size = sum(item.file_size for item in items) # Extract items current_size = 0 for item in items: # REMARK: Absolute and non-canonical paths are corrected # https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile.extract # TODO: Add error handling archive.extract(item, path) files.append(item.filename) # Call notify callback if required if notify and not item.is_dir(): file = LocalFile(os.path.join( path, item.filename), path) current_size += file.size notify(file, file.size, total_size, current_size) # Check if TAR archive elif tarfile.is_tarfile(self.fullpath): # Open TAR archive with tarfile.open(self.fullpath, "r") as archive: # Get list of items items = archive.getmembers() # Calculate total size total_size = sum(item.size for item in items) # Check validity of the archive content for item in items: if os.path.normpath(item.name) != os.path.relpath(item.name): raise ValueError(f"Invalid archive item {item.name}") # Extract items # REMARK: extractall() cannot be used as it sets owner attributes attrs = [] current_size = 0 for item in items: itempath = os.path.join(path, item.name) attrs.append( {"path": itempath, "mode": item.mode, "time": item.mtime}) if item.isdir(): item.mode = 0o700 # TODO: Add error handling archive.extract(item, path, set_attrs=False) files.append(item.name) # Call notify callback if required if notify and item.isfile(): file = LocalFile(itempath, path) current_size += file.size notify(file, file.size, total_size, current_size) # Set file mode and modification times # REMARK: Reverse sorting is required to handle directories correctly attrs.sort(key=lambda item: item["path"], reverse=True) for item in attrs: try: os.chmod(item["path"], item["mode"]) os.utime(item["path"], (item["time"], item["time"])) except: pass else: raise ValueError("Invalid archive file") return files