fck/deprecated/crc32sum_threaded.py

193 lines
6.8 KiB
Python
Executable File

#!/usr/bin/env python3
# This software is licensed under the MIT License:
# Copyright (c) 2019-2020 Kevin Baensch
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# DISCLAIMER
# This file will not receive any updates.
# This is my initial implementation of a crc32 checker.
# The code quality is kind of mediocre (as it was only intended for personal use)
# Still works perfectly fine and has the advantage of being compact (and not as bloated as the OO implementation)
# Cool things to come... eventually... maybe (when I'm motivated):
# - add support for sfv files
# - write checksums to files
# - better error handling (symlinks etc)
# - should under no circumstances crash (check for available memory)
# - lots and lots of cleanup
# - documentation
# - verify sfv files: re.fullmatch(r"^[0-9a-fA-F]$", s or "") is not None
# - contextmanagers are a cool way to handle exceptions https://www.python.org/dev/peps/pep-0343/
# - if possible move file handling into separate function (or if really necessary a class)
# - unit tests
# - maybe use a logging library
# - turn this into a proper library -> move stuff into separate files
"""
+---------------------+
| CRC32 sum generator |
+---------------------+
"""
import argparse
from multiprocessing import Pool
from os import listdir, path
from typing import Generator, List, Tuple, Optional
from zlib import crc32
def file_search(pathlist: List[str]) -> Generator[Tuple[(Optional[str], str)], None, None]:
"""
Generate file paths from given list of Paths.
+------------+
| Parameters |
+------------+
| pathlist: List[str]
| List of files and directories.
+--------+
| Yields |
---------+
| file: Tuple[(None, str)]
| file is a Tuple containing:
| - a crc32 sum (None if unknown)
| - a files path string
"""
for fpath in pathlist:
if path.isfile(fpath):
if len(fpath) > 4 and fpath[-4:] == ".sfv":
yield from sfv_read(fpath)
else:
yield (None, fpath)
continue
if path.isdir(fpath):
yield from file_search([path.join(fpath, x) for x in listdir(fpath)])
continue
print(f"[WARN]: No such file or directory: {fpath}")
def sfv_read(filename: str) -> Generator[Tuple[(Optional[str], str)], None, None]:
"""
Read sfv file.
"""
try:
with open(filename, 'r') as file:
yield from ((x.split()[-1], ' '.join(x.split()[:-1]))
for x in file.read().split('\n')
if len(x) != 0 and x[0] != ';')
except UnicodeDecodeError:
print(f"[ERR]: {filename} is not a text file.")
except FileNotFoundError:
print(f"[WARN]: No such file or directory: {filename}")
# This was never fully implemented before the OO rewrite - so it won't work.
def sfv_write(checked_files: List[Tuple[str, str, bool]], filename: str) -> None:
"""
Write sfv file.
"""
try:
with open(filename, 'w') as file:
checked_files.sort()
if any([not x[2] for x in checked_files]):
print(f"[WARN]: {filename} will contain unverified checksums.")
# [file.write(f"{str(x[0])}\t{str(x[1])}") for x in checked_files]
# Placeholder
file.write('\n'.join([x for x in ["hello" "world"]]))
except FileExistsError:
pass
def checkmark(check: Optional[bool] = None) -> str:
"""
Takes optional bool and returns colored string.
"""
return {
True: '\033[92m✓\033[0m',
False: '\033[91m❌\033[0m',
None: '\033[90?\033[0m'
}[check]
def file_check(filename: Tuple[Optional[str], str],
largefile: bool = False) -> Tuple[Tuple[str, str], bool]:
"""
Check given file and return checked file.
"""
cksum = ""
try:
with open(filename[1], 'rb') as file:
if not largefile:
cksum = str(hex(crc32(file.read())))[2:].upper()
else:
lastcksum = 0
for line in file:
lastcksum = crc32(line, lastcksum)
cksum = str(hex(lastcksum))[2:].upper()
except FileNotFoundError:
print(f"[WARN]: No such file or directory: {filename}")
if len(cksum) < 8:
cksum = ''.join([((8 - len(cksum)) * "0"), cksum])
check = ((cksum in filename[1].upper() and filename[0] is None) ^
(filename[0] is not None and cksum == filename[0].upper()))
print(f"{checkmark(check)} \t {cksum} \t {filename[1]}")
return ((cksum, filename[1]), check)
def main() -> None:
"""
Main function, should only run when programm gets directly invoked.
"""
parser = argparse.ArgumentParser(description="Calculate CRC32 of files")
parser.add_argument("-b", "--bigfiles",
help="parse files that exceed your memory limit",
action="store_true")
parser.add_argument("-p", "--processes",
help="",
default=2,
type=int)
parser.add_argument("-c", "--checksfv",
help="",
default="",
type=str)
parser.add_argument("files",
help="files and folders to process",
nargs='*',
default=[])
args = parser.parse_args()
file_list = file_search(args.files)
ppool = Pool(processes=args.processes)
file_list_checked = ppool.starmap(file_check, [(x, args.bigfiles)
for x in list(file_list)])
ppool.close()
print(f"[{len([x for x in file_list_checked if x[1]])}/{len(file_list_checked)}] Files passed")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("Exception: KeyboardInterrupt")