diff --git a/deprecated/crc32sum_threaded.py b/deprecated/crc32sum_threaded.py deleted file mode 100755 index 17a9ab4..0000000 --- a/deprecated/crc32sum_threaded.py +++ /dev/null @@ -1,192 +0,0 @@ -#!/usr/bin/env python3 - -# This software is licensed under the MIT License: -# Copyright (c) 2019-2020 Kevin Baensch - -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: - -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. - -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - - -# DISCLAIMER -# This file will not receive any updates. -# This is my initial implementation of a crc32 checker. -# The code quality is kind of mediocre (as it was only intended for personal use) -# Still works perfectly fine and has the advantage of being compact (and not as bloated as the OO implementation) - - -# Cool things to come... eventually... maybe (when I'm motivated): -# - add support for sfv files -# - write checksums to files -# - better error handling (symlinks etc) -# - should under no circumstances crash (check for available memory) -# - lots and lots of cleanup -# - documentation -# - verify sfv files: re.fullmatch(r"^[0-9a-fA-F]$", s or "") is not None -# - contextmanagers are a cool way to handle exceptions https://www.python.org/dev/peps/pep-0343/ -# - if possible move file handling into separate function (or if really necessary a class) -# - unit tests -# - maybe use a logging library -# - turn this into a proper library -> move stuff into separate files - -""" -+---------------------+ -| CRC32 sum generator | -+---------------------+ -""" - -import argparse -from multiprocessing import Pool -from os import listdir, path -from typing import Generator, List, Tuple, Optional -from zlib import crc32 - - -def file_search(pathlist: List[str]) -> Generator[Tuple[(Optional[str], str)], None, None]: - """ - Generate file paths from given list of Paths. - - +------------+ - | Parameters | - +------------+ - | pathlist: List[str] - | List of files and directories. - - +--------+ - | Yields | - ---------+ - | file: Tuple[(None, str)] - | file is a Tuple containing: - | - a crc32 sum (None if unknown) - | - a files path string - """ - for fpath in pathlist: - if path.isfile(fpath): - if len(fpath) > 4 and fpath[-4:] == ".sfv": - yield from sfv_read(fpath) - else: - yield (None, fpath) - continue - if path.isdir(fpath): - yield from file_search([path.join(fpath, x) for x in listdir(fpath)]) - continue - print(f"[WARN]: No such file or directory: {fpath}") - - -def sfv_read(filename: str) -> Generator[Tuple[(Optional[str], str)], None, None]: - """ - Read sfv file. - """ - try: - with open(filename, 'r') as file: - yield from ((x.split()[-1], ' '.join(x.split()[:-1])) - for x in file.read().split('\n') - if len(x) != 0 and x[0] != ';') - except UnicodeDecodeError: - print(f"[ERR]: {filename} is not a text file.") - except FileNotFoundError: - print(f"[WARN]: No such file or directory: {filename}") - - -# This was never fully implemented before the OO rewrite - so it won't work. -def sfv_write(checked_files: List[Tuple[str, str, bool]], filename: str) -> None: - """ - Write sfv file. - """ - try: - with open(filename, 'w') as file: - checked_files.sort() - if any([not x[2] for x in checked_files]): - print(f"[WARN]: {filename} will contain unverified checksums.") -# [file.write(f"{str(x[0])}\t{str(x[1])}") for x in checked_files] - # Placeholder - file.write('\n'.join([x for x in ["hello" "world"]])) - except FileExistsError: - pass - - -def checkmark(check: Optional[bool] = None) -> str: - """ - Takes optional bool and returns colored string. - """ - return { - True: '\033[92m✓\033[0m', - False: '\033[91m❌\033[0m', - None: '\033[90?\033[0m' - }[check] - - -def file_check(filename: Tuple[Optional[str], str], - largefile: bool = False) -> Tuple[Tuple[str, str], bool]: - """ - Check given file and return checked file. - """ - cksum = "" - try: - with open(filename[1], 'rb') as file: - if not largefile: - cksum = str(hex(crc32(file.read())))[2:].upper() - else: - lastcksum = 0 - for line in file: - lastcksum = crc32(line, lastcksum) - cksum = str(hex(lastcksum))[2:].upper() - except FileNotFoundError: - print(f"[WARN]: No such file or directory: {filename}") - if len(cksum) < 8: - cksum = ''.join([((8 - len(cksum)) * "0"), cksum]) - check = ((cksum in filename[1].upper() and filename[0] is None) ^ - (filename[0] is not None and cksum == filename[0].upper())) - print(f"{checkmark(check)} \t {cksum} \t {filename[1]}") - return ((cksum, filename[1]), check) - - -def main() -> None: - """ - Main function, should only run when programm gets directly invoked. - """ - parser = argparse.ArgumentParser(description="Calculate CRC32 of files") - parser.add_argument("-b", "--bigfiles", - help="parse files that exceed your memory limit", - action="store_true") - parser.add_argument("-p", "--processes", - help="", - default=2, - type=int) - parser.add_argument("-c", "--checksfv", - help="", - default="", - type=str) - parser.add_argument("files", - help="files and folders to process", - nargs='*', - default=[]) - args = parser.parse_args() - - file_list = file_search(args.files) - ppool = Pool(processes=args.processes) - file_list_checked = ppool.starmap(file_check, [(x, args.bigfiles) - for x in list(file_list)]) - ppool.close() - print(f"[{len([x for x in file_list_checked if x[1]])}/{len(file_list_checked)}] Files passed") - - -if __name__ == '__main__': - try: - main() - except KeyboardInterrupt: - print("Exception: KeyboardInterrupt")