diff options
author | ダカマ <dakama@kakeranoumi.xyz> | 2025-06-02 13:08:46 +0200 |
---|---|---|
committer | ダカマ <dakama@kakeranoumi.xyz> | 2025-06-02 13:08:46 +0200 |
commit | 14a892e627a375334a02381351f139d94ad7ecf3 (patch) | |
tree | e7c1ee5040f3076f57401b3b9e9eb567e2ecfb1a /pyscripts |
Diffstat (limited to 'pyscripts')
-rwxr-xr-x | pyscripts/anilist_crawl.py | 45 | ||||
-rwxr-xr-x | pyscripts/crx_getter.py | 141 | ||||
-rwxr-xr-x | pyscripts/nyu_wide_rsync.py | 596 | ||||
-rwxr-xr-x | pyscripts/pixiv_auth.py | 115 | ||||
-rwxr-xr-x | pyscripts/wide_rsync.py | 563 |
5 files changed, 1460 insertions, 0 deletions
diff --git a/pyscripts/anilist_crawl.py b/pyscripts/anilist_crawl.py new file mode 100755 index 0000000..c529343 --- /dev/null +++ b/pyscripts/anilist_crawl.py @@ -0,0 +1,45 @@ +#!/usr/bin/python3 + +import requests +import json +import os + +url = 'https://graphql.anilist.co' +variables = {} +home = os.path.expanduser("~") +subdir = "notes/misc" + + +# Here we define our query as a multi-line string +query = ''' +query ($name: String, $type: MediaType) { +MediaListCollection (userName: $name, type: $type) { + lists { + status + entries { + media { + title { + romaji + } + } + } + } + } +} +''' + +variables['name'] = input('Enter username: ') + +for MediaType in ['ANIME', 'MANGA']: + + variables['type'] = MediaType + + response = requests.post(url, json={'query': query, 'variables': variables}) + data = response.json() + + with open(os.path.join(home, subdir, "AniList_{}_{}.txt".format(variables['name'], variables['type'])), "w") as f: + for sublist in data["data"]["MediaListCollection"]["lists"]: + print('----- {} -----'.format(sublist["status"]), end="\n"*2, file=f) + for entry in sublist["entries"]: + print(entry["media"]["title"]["romaji"], file=f) + print("\n", file=f) diff --git a/pyscripts/crx_getter.py b/pyscripts/crx_getter.py new file mode 100755 index 0000000..412d2e2 --- /dev/null +++ b/pyscripts/crx_getter.py @@ -0,0 +1,141 @@ +#!/usr/bin/python3 + +import requests +import re +import os +import sys + +def treat_url(url): + + check_url = re.search("^https:\\/\\/chromewebstore\\.google\\.com\\/detail\\/.*\\/[a-zA-Z0-9]{32}$", url) + + if check_url: + print("URL") + ext_name = url.split('/')[4] + ext_id = url.split('/')[5] + yield ext_name, ext_id + else: + print("ERROR - INVALID INPUT") + sys.exit() + +def treat_input(arg): + + check_file = re.search("^.*\\.txt$", arg) + + if check_file and os.path.isfile(arg): + print("Text file") + with open(arg) as f: + for line in f: + print(line.rstrip()) + url_generator = treat_url(line.rstrip()) + for ext_name, ext_id in url_generator: + yield ext_name, ext_id + else: + print("Not text file") + url_generator = treat_url(arg) + for ext_name, ext_id in url_generator: + yield ext_name, ext_id + + +def get_crx(ext_name, ext_id): + + crx_filename = ext_name + '.crx' + crx_x = 'id=' + ext_id + '&installsource=ondemand&uc' + chromium_version = os.popen("chromium --version | awk '{print $2}'").read().rstrip('\n') + + cookies = { + 'NID': '511=kbP9Akj2jywBWVd78MXD_ul3aupm6fd1zqmUJ-7H_eeY9FsyWAKsaoL4lvzkk7sot6jL6uabwxoLyFJPC5ZRWolSB8M980A3CP1DzmCu7oxIk6jP7yKThJIUhKmUJFrEHArN_Q8GFgqEwLTIHoK4vLHJo84n8JPoOxioxBq0r0I', + 'AEC': 'AakniGNT4RJzcCee9888Rkbuc7DjnzbyPxhy42p3TxZAqz957PuAUVPuzwQ', + '1P_JAR': '2023-01-06-17', + } + + headers = { + 'authority': 'clients2.google.com', + 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', + 'accept-language': 'en-US,en;q=0.9', + # 'cookie': 'NID=511=kbP9Akj2jywBWVd78MXD_ul3aupm6fd1zqmUJ-7H_eeY9FsyWAKsaoL4lvzkk7sot6jL6uabwxoLyFJPC5ZRWolSB8M980A3CP1DzmCu7oxIk6jP7yKThJIUhKmUJFrEHArN_Q8GFgqEwLTIHoK4vLHJo84n8JPoOxioxBq0r0I; AEC=AakniGNT4RJzcCee9888Rkbuc7DjnzbyPxhy42p3TxZAqz957PuAUVPuzwQ; 1P_JAR=2023-01-06-17', + 'referer': 'https://crxextractor.com/', + 'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="108"', + 'sec-ch-ua-mobile': '?0', + 'sec-ch-ua-platform': '"Linux"', + 'sec-fetch-dest': 'document', + 'sec-fetch-mode': 'navigate', + 'sec-fetch-site': 'cross-site', + 'sec-fetch-user': '?1', + 'upgrade-insecure-requests': '1', + 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36', + 'x-client-data': 'CJv6ygE=', + } + + params = { + 'response': 'redirect', + 'prodversion': chromium_version, + 'acceptformat': 'crx3', + 'x': crx_x, + } + + response = requests.get('https://clients2.google.com/service/update2/crx', params=params, cookies=cookies, headers=headers) + + crx_url = response.url + response = requests.get(crx_url) + + crx_file = open(crx_filename, 'wb') + crx_file.write(response.content) + crx_file.close() + + return crx_filename + + +def crx_to_zip(crx_filename): + + i = 0 + zip_filename = crx_filename[:-4] + '.zip' + + with open(crx_filename, "rb") as f: + byte1 = f.read(1) + byte2 = f.read(1) + while ( byte2 and not (byte1 == b'P' and byte2 == b'K')): + byte1 = byte2 + byte2 = f.read(1) + i += 1 + + with open(crx_filename, "rb") as in_file: + with open(zip_filename, "wb") as out_file: + out_file.write(in_file.read()[i:]) + + return zip_filename + + +def main(): + + args = sys.argv + + if 3 <= len(args) <= 4 and args.count('-i') == 1 and args.index('-i') < len(args) - 1: + pass + elif len(args) == 2 and args.count('-h') == 1: + print("Usage: python3 crxgetter.py -i file.txt/URL [-k to keep .crx file]") + sys.exit() + else: + print("ERROR - INVALID USAGE") + print("Usage: python3 crxgetter.py -i file.txt/URL [-k to keep .crx file]") + sys.exit() + + crx_keep = True if args.count('-k') == 1 else False + + extensions = dict(treat_input(args[args.index('-i') + 1])) + for key in extensions: + print(key, '->', extensions[key]) + crx_filename = get_crx(key, extensions[key]) + zip_filename = crx_to_zip(crx_filename) + print("Created " + zip_filename) + if crx_keep: + print("Preserved " + crx_filename) + else: + os.remove(crx_filename) + + +if __name__ == "__main__": + main() + +# TO-DO +# SIMPLE GETOPTS ARG PARSER diff --git a/pyscripts/nyu_wide_rsync.py b/pyscripts/nyu_wide_rsync.py new file mode 100755 index 0000000..54b6c39 --- /dev/null +++ b/pyscripts/nyu_wide_rsync.py @@ -0,0 +1,596 @@ +#!/usr/bin/python3 + +import re, os, sys, subprocess, shlex +import xml.etree.ElementTree as ET +import argparse, unicodedata + +SITES_DEFAULT="~/.config/wide_rsync/transfer_sites.xml" +SITE_TYPES=["transfer", "snapshot", "custom"] +LOCATION_TYPES=["local", "external_drive", "ssh_server", "android_device"] +QUIET_LVL=0 +WARNING_COLOR="\033[1;33m" +ERROR_COLOR = "\033[1;31m" +RESET_COLOR="\033[0m" + +class Site: + def __init__(self, name, source, destination, short_flags, long_flags, filters): + self.name = name + self.source = source + self.destination = destination + self.short_flags = short_flags + self.long_flags = long_flags + self.filters = filters + + def __str__(self): + pass + +class SnapshotSite(Site): + def __init__(self, name, source, destination, short_flags, long_flags, filters, snap_base, snap_scheme, snap_rotation): + super().__init__(self, name, source, destination, short_flags, long_flags, filters) + self.snap_base = snap_base + self.snap_scheme = snap_scheme + self.snap_rotation = snap_rotation + +class Location: + def __init__(self, path): + self.path = path + + def __str__(self): + pass + +class LocalLocation(Location): + def __init__(self, path): + super().__init__(self, path) + +class ExternalLocation(Location): + def __init__(self, path): + super().__init__(self, path) + +class SSHLocation(Location): + # ssh_username, ssh_port, ssh_privkey + def __init__(self, path): + super().__init__(self, path, username, port, privkey) + self.username = username + self.port = port + self.privkey = privkey + +class AndroidLocation(Location): + def __init__(self, path): + super().__init__(self, path) + + +def query_yes_no(question, default="yes"): + + valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} + if default is None: + prompt = " [y/n] " + elif default == "yes": + prompt = " [Y/n] " + elif default == "no": + prompt = " [y/N] " + else: + raise ValueError("invalid default answer: '%s'" % default) + + while True: + sys.stdout.write(question + prompt) + choice = input().lower() + if default is not None and choice == "": + return valid[default] + elif choice in valid: + return valid[choice] + else: + sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n") + + +def check_children(node, expected_children): + + count = 0 + expected_count = len(node.findall("./*")) + for child in expected_children: + count = count + len(node.findall(f"./{child}")) + + return ( count == expected_count ) + + +def validate_sites(sites): + + global QUIET_LVL + + errors = 0 + + # GO BACK TO THIS ONE + if sites is None: + print(f"{ERROR_COLOR}ERROR - Sites file is empty.{RESET_COLOR}\n") + errors = errors + 1 + + if len(sites.findall("./notification[@type='success']")) > 1 or len(sites.findall("./notification[@type='failure']")) > 1 or len(sites.findall("./notification")) > 2: + print(f"{ERROR_COLOR}ERROR - Too many notification scripts in sites file.{RESET_COLOR}\n") + errors = errors + 1 + + if len(sites.findall("./site")) < 1: + print(f"{ERROR_COLOR}ERROR - No sites configured in sites file.{RESET_COLOR}\n") + errors = errors + 1 + + if not check_children(sites, ['notification', 'site']): + print(f"{ERROR_COLOR}ERROR - Unknown elements found [NODE: sites].{RESET_COLOR}\n") + errors = errors + 1 + + ids = [] + for site in sites.findall('site'): + + if not site.get('id') or not site.get('name'): + print(f"{ERROR_COLOR}ERROR - Site {site} has no name or ID.{RESET_COLOR}\n") + errors = errors + 1 + + if site.get('id') in ids: + print(f"{ERROR_COLOR}ERROR - One or more sites with identical IDs exist.{RESET_COLOR}\n") + errors = errors + 1 + else: + ids.append(site.get('id')) + + if not check_children(site, ['source', 'destination', 'params', 'flags', 'filters']): + print(f"{ERROR_COLOR}ERROR - Unknown elements found [NODE: site {site.get('id')}].{RESET_COLOR}\n") + errors = errors + 1 + + if len(site.findall("./source")) != 1 or len(site.findall("./destination")) != 1 : + print(f"{ERROR_COLOR}ERROR - Site {site.get('id')} contains bad number of sources/destinations (must be exactly 1 of each){RESET_COLOR}.\n") + errors = errors + 1 + elif not site.find("./source").text or not site.find("./destination").text: + print(f"{ERROR_COLOR}ERROR - Site {site.get('id')} contains incomplete source/destination.{RESET_COLOR}\n") + errors = errors + 1 + elif site.find("./source").get('type') is None or site.find("./destination").get('type') is None: + print(f"{ERROR_COLOR}ERROR - Site {site.get('id')}'s source/destination does not have a type (see: [-t] for list of available types).{RESET_COLOR}\n") + errors = errors + 1 + elif site.find("./source").get('type') not in SITE_TYPES or site.find("./destination").get('type') not in SITE_TYPES: + print(f"{ERROR_COLOR}ERROR - Site {site.get('id')} contains unknown type of source/destination (see: [-t] for list of available types).{RESET_COLOR}\n") + errors = errors + 1 + + for entry in list(sites.iter()): + if entry.text is None: + if not QUIET_LVL > 0: print(f"{WARNING_COLOR}WARNING - Empty entries exist in site file. May cause problems with program execution.{RESET_COLOR}\n") + break + + return errors + + +def get_sites_root(sites_location): + tree = ET.parse(sites_location) + sites = tree.getroot() + return sites + + +def get_site_params(site): + + site_params = {} + for param in site.find('params').findall('param'): + site_params[param.get('type')] = param.text + + return site_params + + +def get_site_flags(site): + + global QUIET_LVL + + arg = "" + site_flags = [] + + if len(list(site.iter('flags')) + list(site.iter('flag'))) == 0: + if not QUIET_LVL > 0: print(f"{WARNING_COLOR}WARNING - No flags found for site {site.get('id')}. Skipping...{RESET_COLOR}\n") + return site_flags + + for flag in site.find('flags').findall('flag'): + arg = "" + if not flag.text: continue + if flag.get('is_long') == "true": + arg = "--" + flag.text + else: + arg = "-" + flag.text + site_flags.append(arg) + arg = "" + + return site_flags + + +def get_site_filters(site): + + global QUIET_LVL + + arg = "" + site_filters = [] + + if len(list(site.iter('filters')) + list(site.iter('filter'))) == 0: + if not QUIET_LVL > 0: print(f"{WARNING_COLOR}WARNING - No filters found for site {site.get('id')}. Skipping...{RESET_COLOR}\n") + return site_filters + + for site_filter in site.find('filters').findall('filter'): + flag = "" + if (site_filter.get('type') == "include" or site_filter.get('type') == "exclude") and site_filter.text: + flag = "--" + site_filter.get('type') + site_filters.append(flag) + site_filters.append(site_filter.text) + + return site_filters + + +def site_exists(sites, site_id): + for site in sites.findall('site'): + if site.get('id') == site_id: + return True + return False + + +def site_is_available(sites, site_id): + + global QUIET_LVL + + available = True + exists = False + + for site in sites.findall('site'): + if site.get('id') == site_id: + exists = True + for location in [site.find('source'), site.find('destination')]: + match location.get('type'): + + case "local": + available = available and (True if os.path.exists(location.text) and os.path.isdir(location.text) else False) + + case "external_drive": + for i in range(len(location.text) + 1): + if os.path.ismount(location.text[:i]): + available = True + break + available = False + available = available and (True if os.path.exists(location.text) and os.path.isdir(location.text) else False) + + case "ssh_server": + + location_user = re.search(r"^.*?\@", location.text) + location_user = location_user.group(0)[:-1] if location_user != None else "" + + location_domain = re.search(r"^.*\:", location.text) + if location_domain != None: + location_domain = re.sub(r"^.*\@", "" , location_domain.group(0)[:-1]) + else: + if not QUIET_LVL > 1: print(f"ERROR: IP Address or Domain Name not detected for site {site.get('name')}.\n") + return False + + location_directory = re.search(r"\:(\/|\~\/)?([a-zA-Z0-9_\- ]+\/?)+|\:\/|\:\~\/?" , location.text) + if location_directory != None: + location_directory = location_directory.group(0)[1:] + else: + if not QUIET_LVL > 1: print(f"ERROR: Remote location not detected for site {site.get('name')}.\n") + return False + + # location_directory_full = (location_user + "@" if location_user != "" else "") + location_domain + ":" + location_directory + location_domain_full = (location_user + "@" if location_user != "" else "") + location_domain + + params = get_site_params(site) + + ssh_port = params['ssh_port'] + ssh_key_location = params['ssh_key_location'] + + writeable = 1 + if subprocess.run(["ping", "-c", "1", "-w", "5", f"{location_domain}"], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT).returncode == 0: + writeable = subprocess.run(["ssh", "-p", f"{ssh_port}", "-i", f"{ssh_key_location}" , f"{location_domain_full}", f"[[ -d {location_directory} ]]"], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) + available = available and (True if (writeable != 1 and writeable.returncode == 0) else False) + + case "android_device": + + # everything here except os.path.exists() horrible and redundant, but I had to do it + + uid = os.getuid() + mtp_location = re.sub(r"\/run\/user\/{}\/gvfs\/mtp\:host\=".format(uid), "mtp://", location.text) + + try: + gio1 = subprocess.check_output(["gio", "info", f"{location.text}"], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + gio1 = str(e.output) + try: + gio2 = subprocess.check_output(["gio", "info", f"{mtp_location}"], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + gio2 = str(e.output) + + available = available and (True if os.path.exists(location.text) and gio1 == gio2 else False) + + case "snapshot": + available = False + + case _: + if not QUIET_LVL > 1: print("ERROR: Location type doesn't exist\n") + return False + break + + return ( available and exists ) + + +def get_sites(sites, source_type=[], destination_type=[], all_flag=False): + + site_infos = [] + + for site in sites.findall('site'): + site_info = [site.get('id'), site.get('name'), site.find('source').get('type'), site.find('destination').get('type'), site.find('source').text, site.find('destination').text] + if (source_type is None or site.find('source').get('type') in source_type) and (destination_type is None or site.find('destination').get('type') in destination_type): + if all_flag: + site_infos.append(site_info) + elif site_is_available(sites, site.get('id')): + site_infos.append(site_info) + + return site_infos + + +def compile_rsync_command(sites, site_id, DRY_RUN=False): + + global QUIET_LVL + + command_subproc = [["rsync"]] + command_postproc = [[]] + + try: + subprocess.run(["rsync", "-h"], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) + except OSError as e: + print("ERROR - Rsync not found.\n", e, "\n") + sys.exit(0) + + + if not site_is_available(sites, site_id): + if not QUIET_LVL > 1: print(f"{ERROR_COLOR}ERROR - Site {site_id} not available.{RESET_COLOR}\n") + return 1 + + # FIND SITE + for site in sites.findall('site'): + if site.get('id') == site_id: + + site_params = get_site_params(site) + site_source = site.find('source').text + site_destination = site.find('destination').text + + # CONSTRUCT SSH-SPECIFIC FLAG + if site.find('source').get('type') == "ssh_server" or site.find('destination').get('type') == "ssh_server": + if site_params["ssh_port"] != None and site_params["ssh_key_location"] != None: + e = "ssh -p " + site_params["ssh_port"] + " -i " + site_params["ssh_key_location"] + command_subproc[0].extend(["-e", e ]) + else: + if not QUIET_LVL > 1: print(f"{ERROR_COLOR}ERROR - Unable to compile SSH command for site {site_id} - Missing arguments.{RESET_COLOR}\n") + return 1 + + # CONSTRUCT SNAPSHOT-SPECIFIC FLAG AND POST-PROCESSING + if site.find('destination').get('snapshot') == "true": + if "/" not in site_params['snap_base']: + + snap_base = site_params['snap_base'] if site_params['snap_base'] else "default." + snap_extension = "" + + if site_params['snap_extension'].lower() == "date": + snap_extension = subprocess.check_output(['date', '+%Y-%m-%d@%T'], encoding='UTF-8') + else: + temp_proc = subprocess.Popen(['find', site_destination, '-mindepth', '1', '-maxdepth', '1', '!', '-type', 'l', '-iname', f"{site_params['snap_base']}*"], stdout=subprocess.PIPE) + snap_extension = subprocess.check_output(['wc', '-l'], stdin=temp_proc.stdout, encoding='UTF-8') + temp_proc.wait() + + snap_name = snap_base + snap_extension + snap_name = snap_name[:-1] + site_destination_last = site_destination + ( "/last" if site_destination[-1] != "/" else "last" ) + command_subproc[0].extend(["--link-dest", site_destination_last ]) + site_destination = site_destination + ( f"/{snap_name}" if site_destination[-1] != "/" else f"{snap_name}" ) + + command_postproc = [['rm', '-f', site_destination_last], ['ln', '-s', site_destination, site_destination_last]] + + else: + if not QUIET_LVL > 1: print(f"{ERROR_COLOR}ERROR - Unable to compile snapshot command for site {site_id} - Trailing slash in base name.{RESET_COLOR}\n") + return 1 + + + # HANDLE FLAGS + command_subproc[0].extend(get_site_flags(site)) + if DRY_RUN == True: command_subproc[0].append("--dry-run") + + # HANDLE FILTERS + command_subproc[0].extend(get_site_filters(site)) + + + # HANDLE DESTINATION DIRECTORY + command_subproc[0].append(site_destination) + + # HANDLE POST-PROCESSING + if DRY_RUN == False and command_postproc != [[]]: command_subproc.extend(command_postproc) + + break + + if command_subproc != ["rsync"]: + return command_subproc + else: + return 1 + +def run_notification_script(sites, site_id_list, return_code): + + global QUIET_LVL + + script_type = "failure" if return_code > 0 else "success" + script_command = "" + + for script in sites.findall('notification'): + if script.get('type') == script_type: + script_command = re.sub("%ID", f"{site_id_list}" , script.text) + + if script_command: + subprocess.run(shlex.split(script_command)) + else: + if not QUIET_LVL > 0: print(f"{WARNING_COLOR}WARNING - No valid notification script found. Skipping...{RESET_COLOR}\n") + + +def print_site_list(site_list, list_all): + + univ_l= 5 + ls = [0] * 6 + kanjicount = [] + title = ["SITE ID" , "SITE NAME", "TRANSFER TYPE", "TRANSFER SOURCE", "TRANSFER DESTINATION"] + + for site in site_list: + for i in range(len(ls)): + ls[i] = ls[i] if ls[i] > len(site[i]) else len(site[i]) + kanjicount.append([0] * 6) + + for site in site_list: + for i in range(len(ls)): + for k in site[i]: + if unicodedata.east_asian_width(k) == "W": + kanjicount[site_list.index(site)][i] = kanjicount[site_list.index(site)][i] - 1 + + + ls_title = [0] * 6 + bar = "" + formatstr= "" + for i in range(len(ls_title)): + ls_title[i] = ls[i] + univ_l + ls_title = [ls_title[0], ls_title[1], ls_title[2]+ls_title[3], ls_title[4], ls_title[5]] + for i in range(len(ls_title)): + formatstr = formatstr + "{:<" + str(ls_title[i]) + "}" + bar = bar + ("-" * (ls_title[i] - univ_l)) + (" " * univ_l) + + print("LISTING ALL", "CONFIGURED" if list_all else "AVAILABLE", "TRANSFER SITES\n") + print(formatstr.format(*title)) + print(bar) + # BAR SLIGHTLY OFFSET, FIGURE OUT WHY + + for site in site_list: + formatstr="" + ls_site = [0] * 6 + for i in range(len(ls_site)): + ls_site[i] = ls[i] + kanjicount[site_list.index(site)][i] + univ_l + ls_site = [ls_site[0], ls_site[1], ls_site[2]+ls_site[3], ls_site[4], ls_site[5]] + for i in range(len(ls_site)): + ls_site[i] = ls_site[i] if ls_site[i] > len(title[i]) else ( len(title[i]) + univ_l ) + for i in range(len(ls_site)): + formatstr = formatstr + "{:<" + str(ls_site[i]) + "}" + print(formatstr.format(site[0], site[1], f"{site[2]}->{site[3]}", site[4], site[5])) + + print() + +def main(): + + # IT'S PARSIN' TIME + parser = argparse.ArgumentParser(description="Perform rsync operations using a pre-configured list of transfer sites") + group = parser.add_mutually_exclusive_group() + group.add_argument("-l", "--list", help="list all available transfer sites", action="store_true") + group.add_argument("-L", "--list-all", help="list all configured transfer sites (including unavailable ones)", action="store_true") + group.add_argument("-t", "--list-types", help="list all known transfer site types (e.g. 'local', 'ssh_server'...)", action="store_true") + parser.add_argument("--source", help="filter list of transfer sites by source type (used with [-l|-L], view available types with [-t])", action="extend", nargs="+", type=str) + parser.add_argument("--destination", help="filter list of transfer sites by destination type (used with [-l|-L], view available types with [-t])", action="extend", nargs="+", type=str) + group.add_argument("-s", "--sites", help="select transfer sites to process", action="extend", nargs="+", type=str) + parser.add_argument("-n", "--dry-run", help="turn all queued site transfers into dry runs (see: rsync(1) [-n|--dry-run])", action="store_true") + parser.add_argument("-q", "--quiet-level", help="set quietness level of site transfers ([] - print errors and warnings, [-q] - print errors only, [-qq...] - print critical errors only, default: [])", action="count", default=0) + parser.add_argument("-p", "--prompt-frequency", help="set prompt frequency of site transfers ([] - don't prompt, [-p] - prompt only once, [-pp...] - prompt once for each rsync command, default: [])", action="count", default=0) + parser.add_argument("--notify-each", help="run configured notification script once after each site transfer (default: runs once after all site transfers are done)", action="store_true") + parser.add_argument("-i", "--input-file", help=f"set custom file location to process transfer sites from (default: {SITES_DEFAULT})", action="store", nargs=1, type=str) + + print() + + if len(sys.argv) == 1: + parser.print_usage() + print() + sys.exit(1) + + args = parser.parse_args() + + # QUIET LEVEL AND PROMPT FREQUENCY + global QUIET_LVL + QUIET_LVL = args.quiet_level + prompt_frequency = args.prompt_frequency + + # LIST TYPES + if args.list_types == True: + response = "Currently implemented types of transfer sites are: " + for site_type in SITE_TYPES: + response = response + site_type + ", " + print(response[:-2], "\n") + sys.exit(0) + + # FIND INPUT FILE + if args.input_file != None and os.path.isfile(os.path.abspath(os.path.expanduser(args.input_file))): + if not QUIET_LVL > 1: print ("Using custom site location:", os.path.abspath(os.path.expanduser(args.input_file)), "\n") + sites = get_sites_root(os.path.abspath(os.path.expanduser(args.input_file))) + elif os.path.isfile(os.path.abspath(os.path.expanduser(SITES_DEFAULT))): + if not QUIET_LVL > 1: print ("Using default site location:", os.path.abspath(os.path.expanduser(SITES_DEFAULT)), "\n") + sites = get_sites_root(os.path.abspath(os.path.expanduser(SITES_DEFAULT))) + else: + print ("ERROR: No site locations found. Exiting...\n") + sys.exit(1) + + # VALIDATE INPUT FILE + errors = validate_sites(sites) + if errors > 0: + print("Number of errors:", errors, "\nExiting...\n") + sys.exit(1) + + # CONFIGURE FILTER + if (args.source != None or args.destination != None) and args.list != True and args.list_all != True: + print("ERROR: --source and --destination can only be used on a list (see: [-l|-L])\n") + sys.exit(1) + + # LIST SITES + if args.list == True or args.list_all == True: + site_list = get_sites(sites, args.source, args.destination, args.list_all) + print_site_list(site_list, args.list_all) + sys.exit(0) + + # WHERE THE MAGIC HAPPENS + elif args.sites != None: + site_id_list = "" + for site_id in args.sites: + site_id_list = site_id_list + site_id + ", " + if not site_exists(sites, site_id): + print("ERROR: One or more supplied sites do not exist. Exiting...\n") + sys.exit(1) + site_id_list = site_id_list[:-2] + + commands_list = [] + all_skipped = True + return_code = 0 + final_return_code = 0 + + for site_id in args.sites: + commands_list.append(compile_rsync_command(sites, site_id, args.dry_run)) + + query_1 = "Do you wish to run the rsync commands for sites " + site_id_list + (" (WARNING: ERROR IN ONE OR MORE SITES DETECTED)?" if 1 in commands_list else "?") + if prompt_frequency != 1 or (prompt_frequency == 1 and query_yes_no(query_1)): + for commands in commands_list: + final_return_code = 0 + curr_site_id = site_id_list.split(", ")[commands_list.index(commands)] + query_2 = "Do you wish to run the rsync command for site " + curr_site_id + (" (WARNING: ERROR IN SITE DETECTED)?" if commands == 1 else "?") + if prompt_frequency != 2 or (prompt_frequency == 2 and query_yes_no(query_2)): + all_skipped = False + for command in commands: + return_code = subprocess.run(command).returncode if command != 1 else command + final_return_code = final_return_code + return_code + if return_code != 0: break + if args.notify_each: run_notification_script(sites, curr_site_id, final_return_code) + print() + else: + if not QUIET_LVL > 1: print("Skipping command...\n") + else: print() + if not (args.notify_each or all_skipped): run_notification_script(sites, site_id_list, final_return_code) + else: + if not QUIET_LVL > 1: print("Skipping all commands...\n") + else: print() + + # END OF MAIN + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print("Keyboard interrupted received. Exiting...\n") + try: + sys.exit(0) + except SystemExit: + os._exit(0) + +# TO-DO +# REVERSE FLAG +# FLAG FOR SHOWING COMMANDS TO RUN DURING PROMPT, ALSO FLAG FOR NOT RUNNING NOTIFICATION SCRIPT +# FINISH IMPLEMENTING snap_count +# FIX FORMATTING, DISPLAY x->y(snapshot) TYPE FOR SNAPSHOTS +# THINK OF OTHER WAYS TO STAMP SNAPS +# ONE DAY, CLEAN UP COMPILE_RSYNC FUNCTION INTO SUBROUTINES AND PROPERLY USE AN ARRAY FROM START TO FINISH diff --git a/pyscripts/pixiv_auth.py b/pyscripts/pixiv_auth.py new file mode 100755 index 0000000..f7dc5d1 --- /dev/null +++ b/pyscripts/pixiv_auth.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python + +from argparse import ArgumentParser +from base64 import urlsafe_b64encode +from hashlib import sha256 +from pprint import pprint +from secrets import token_urlsafe +from sys import exit +from urllib.parse import urlencode +from webbrowser import open as open_url + +import requests + +# Latest app version can be found using GET /v1/application-info/android +USER_AGENT = "PixivAndroidApp/5.0.234 (Android 11; Pixel 5)" +REDIRECT_URI = "https://app-api.pixiv.net/web/v1/users/auth/pixiv/callback" +LOGIN_URL = "https://app-api.pixiv.net/web/v1/login" +AUTH_TOKEN_URL = "https://oauth.secure.pixiv.net/auth/token" +CLIENT_ID = "MOBrBDS8blbauoSck0ZfDbtuzpyT" +CLIENT_SECRET = "lsACyCD94FhDUtGTXi3QzcFE2uU1hqtDaKeqrdwj" + + +def s256(data): + """S256 transformation method.""" + + return urlsafe_b64encode(sha256(data).digest()).rstrip(b"=").decode("ascii") + + +def oauth_pkce(transform): + """Proof Key for Code Exchange by OAuth Public Clients (RFC7636).""" + + code_verifier = token_urlsafe(32) + code_challenge = transform(code_verifier.encode("ascii")) + + return code_verifier, code_challenge + + +def print_auth_token_response(response): + data = response.json() + + try: + access_token = data["access_token"] + refresh_token = data["refresh_token"] + except KeyError: + print("error:") + pprint(data) + exit(1) + + print("access_token:", access_token) + print("refresh_token:", refresh_token) + print("expires_in:", data.get("expires_in", 0)) + + +def login(): + code_verifier, code_challenge = oauth_pkce(s256) + login_params = { + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "client": "pixiv-android", + } + + open_url(f"{LOGIN_URL}?{urlencode(login_params)}") + + try: + code = input("code: ").strip() + except (EOFError, KeyboardInterrupt): + return + + response = requests.post( + AUTH_TOKEN_URL, + data={ + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "code": code, + "code_verifier": code_verifier, + "grant_type": "authorization_code", + "include_policy": "true", + "redirect_uri": REDIRECT_URI, + }, + headers={"User-Agent": USER_AGENT}, + ) + + print_auth_token_response(response) + + +def refresh(refresh_token): + response = requests.post( + AUTH_TOKEN_URL, + data={ + "client_id": CLIENT_ID, + "client_secret": CLIENT_SECRET, + "grant_type": "refresh_token", + "include_policy": "true", + "refresh_token": refresh_token, + }, + headers={"User-Agent": USER_AGENT}, + ) + print_auth_token_response(response) + + +def main(): + parser = ArgumentParser() + subparsers = parser.add_subparsers() + parser.set_defaults(func=lambda _: parser.print_usage()) + login_parser = subparsers.add_parser("login") + login_parser.set_defaults(func=lambda _: login()) + refresh_parser = subparsers.add_parser("refresh") + refresh_parser.add_argument("refresh_token") + refresh_parser.set_defaults(func=lambda ns: refresh(ns.refresh_token)) + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/pyscripts/wide_rsync.py b/pyscripts/wide_rsync.py new file mode 100755 index 0000000..dc0118f --- /dev/null +++ b/pyscripts/wide_rsync.py @@ -0,0 +1,563 @@ +#!/usr/bin/python3 + +import re, os, sys, subprocess, shlex +import xml.etree.ElementTree as ET +import argparse, unicodedata + +SITES_DEFAULT="~/.config/wide_rsync/transfer_sites.xml" +SITE_TYPES=["local", "external_drive", "remote_server", "android_device", "snapshot", "custom"] +QUIET_LVL=0 +WARNING_COLOR="\033[1;33m" +ERROR_COLOR = "\033[1;31m" +RESET_COLOR="\033[0m" + + +def query_yes_no(question, default="yes"): + + valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} + if default is None: + prompt = " [y/n] " + elif default == "yes": + prompt = " [Y/n] " + elif default == "no": + prompt = " [y/N] " + else: + raise ValueError("invalid default answer: '%s'" % default) + + while True: + sys.stdout.write(question + prompt) + choice = input().lower() + if default is not None and choice == "": + return valid[default] + elif choice in valid: + return valid[choice] + else: + sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n") + + +def check_children(node, expected_children): + + count = 0 + expected_count = len(node.findall("./*")) + for child in expected_children: + count = count + len(node.findall(f"./{child}")) + + return ( count == expected_count ) + + +def validate_sites(sites): + + global QUIET_LVL + + errors = 0 + + # GO BACK TO THIS ONE + if sites is None: + print(f"{ERROR_COLOR}ERROR - Sites file is empty.{RESET_COLOR}\n") + errors = errors + 1 + + if len(sites.findall("./notification[@type='success']")) > 1 or len(sites.findall("./notification[@type='failure']")) > 1 or len(sites.findall("./notification")) > 2: + print(f"{ERROR_COLOR}ERROR - Too many notification scripts in sites file.{RESET_COLOR}\n") + errors = errors + 1 + + if len(sites.findall("./site")) < 1: + print(f"{ERROR_COLOR}ERROR - No sites configured in sites file.{RESET_COLOR}\n") + errors = errors + 1 + + if not check_children(sites, ['notification', 'site']): + print(f"{ERROR_COLOR}ERROR - Unknown elements found [NODE: sites].{RESET_COLOR}\n") + errors = errors + 1 + + ids = [] + for site in sites.findall('site'): + + if not site.get('id') or not site.get('name'): + print(f"{ERROR_COLOR}ERROR - Site {site} has no name or ID.{RESET_COLOR}\n") + errors = errors + 1 + + if site.get('id') in ids: + print(f"{ERROR_COLOR}ERROR - One or more sites with identical IDs exist.{RESET_COLOR}\n") + errors = errors + 1 + else: + ids.append(site.get('id')) + + if not check_children(site, ['source', 'destination', 'params', 'flags', 'filters']): + print(f"{ERROR_COLOR}ERROR - Unknown elements found [NODE: site {site.get('id')}].{RESET_COLOR}\n") + errors = errors + 1 + + if len(site.findall("./source")) != 1 or len(site.findall("./destination")) != 1 : + print(f"{ERROR_COLOR}ERROR - Site {site.get('id')} contains bad number of sources/destinations (must be exactly 1 of each){RESET_COLOR}.\n") + errors = errors + 1 + elif not site.find("./source").text or not site.find("./destination").text: + print(f"{ERROR_COLOR}ERROR - Site {site.get('id')} contains incomplete source/destination.{RESET_COLOR}\n") + errors = errors + 1 + elif site.find("./source").get('type') is None or site.find("./destination").get('type') is None: + print(f"{ERROR_COLOR}ERROR - Site {site.get('id')}'s source/destination does not have a type (see: [-t] for list of available types).{RESET_COLOR}\n") + errors = errors + 1 + elif site.find("./source").get('type') not in SITE_TYPES or site.find("./destination").get('type') not in SITE_TYPES: + print(f"{ERROR_COLOR}ERROR - Site {site.get('id')} contains unknown type of source/destination (see: [-t] for list of available types).{RESET_COLOR}\n") + errors = errors + 1 + + for entry in list(sites.iter()): + if entry.text is None: + if not QUIET_LVL > 0: print(f"{WARNING_COLOR}WARNING - Empty entries exist in site file. May cause problems with program execution.{RESET_COLOR}\n") + break + + return errors + + +def get_sites_root(sites_location): + tree = ET.parse(sites_location) + sites = tree.getroot() + return sites + + +def get_site_params(site): + + site_params = {} + for param in site.find('params').findall('param'): + site_params[param.get('type')] = param.text + + return site_params + + +def get_site_flags(site): + + global QUIET_LVL + + arg = "" + site_flags = [] + + if len(list(site.iter('flags')) + list(site.iter('flag'))) == 0: + if not QUIET_LVL > 0: print(f"{WARNING_COLOR}WARNING - No flags found for site {site.get('id')}. Skipping...{RESET_COLOR}\n") + return site_flags + + for flag in site.find('flags').findall('flag'): + arg = "" + if not flag.text: continue + if flag.get('is_long') == "true": + arg = "--" + flag.text + else: + arg = "-" + flag.text + site_flags.append(arg) + arg = "" + + return site_flags + + +def get_site_filters(site): + + global QUIET_LVL + + arg = "" + site_filters = [] + + if len(list(site.iter('filters')) + list(site.iter('filter'))) == 0: + if not QUIET_LVL > 0: print(f"{WARNING_COLOR}WARNING - No filters found for site {site.get('id')}. Skipping...{RESET_COLOR}\n") + return site_filters + + for site_filter in site.find('filters').findall('filter'): + flag = "" + if (site_filter.get('type') == "include" or site_filter.get('type') == "exclude") and site_filter.text: + flag = "--" + site_filter.get('type') + site_filters.append(flag) + site_filters.append(site_filter.text) + + return site_filters + + +def site_exists(sites, site_id): + for site in sites.findall('site'): + if site.get('id') == site_id: + return True + return False + + +def site_is_available(sites, site_id): + + global QUIET_LVL + + available = True + exists = False + + for site in sites.findall('site'): + if site.get('id') == site_id: + exists = True + for location in [site.find('source'), site.find('destination')]: + match location.get('type'): + + case "local": + location_full=os.path.abspath(os.path.expanduser(location.text)) + available = available and (True if os.path.exists(location_full) and os.path.isdir(location_full) else False) + + case "external_drive": + location_full=os.path.abspath(os.path.expanduser(location.text)) + for i in range(len(location_full) + 1): + if os.path.ismount(location_full[:i]): + available = True + break + available = False + available = available and (True if os.path.exists(location_full) and os.path.isdir(location_full) else False) + + case "remote_server": + + location_user = re.search(r"^.*?\@", location.text) + location_user = location_user.group(0)[:-1] if location_user != None else "" + + location_domain = re.search(r"^.*\:", location.text) + if location_domain != None: + location_domain = re.sub(r"^.*\@", "" , location_domain.group(0)[:-1]) + else: + if not QUIET_LVL > 1: print(f"ERROR: IP Address or Domain Name not detected for site {site.get('name')}.\n") + return False + + location_directory = re.search(r"\:(\/|\~\/)?([a-zA-Z0-9_\- ]+\/?)+|\:\/|\:\~\/?" , location.text) + if location_directory != None: + location_directory = location_directory.group(0)[1:] + else: + if not QUIET_LVL > 1: print(f"ERROR: Remote location not detected for site {site.get('name')}.\n") + return False + + # location_directory_full = (location_user + "@" if location_user != "" else "") + location_domain + ":" + location_directory + location_domain_full = (location_user + "@" if location_user != "" else "") + location_domain + + params = get_site_params(site) + + ssh_port = params['ssh_port'] + ssh_key_location = params['ssh_key_location'] + + writeable = 1 + if subprocess.run(["ping", "-c", "1", "-w", "5", f"{location_domain}"], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT).returncode == 0: + writeable = subprocess.run(["ssh", "-p", f"{ssh_port}", "-i", f"{ssh_key_location}" , f"{location_domain_full}", f"[[ -d {location_directory} ]]"], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) + available = available and (True if (writeable != 1 and writeable.returncode == 0) else False) + + case "android_device": + + # everything here except os.path.exists() horrible and redundant, but I had to do it + + uid = os.getuid() + mtp_location = re.sub(r"\/run\/user\/{}\/gvfs\/mtp\:host\=".format(uid), "mtp://", location.text) + + try: + gio1 = subprocess.check_output(["gio", "info", f"{location.text}"], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + gio1 = str(e.output) + try: + gio2 = subprocess.check_output(["gio", "info", f"{mtp_location}"], stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + gio2 = str(e.output) + + available = available and (True if os.path.exists(location.text) and gio1 == gio2 else False) + + case "snapshot": + available = False + + case _: + if not QUIET_LVL > 1: print("ERROR: Location type doesn't exist\n") + return False + break + + return ( available and exists ) + + +def get_sites(sites, source_type=[], destination_type=[], all_flag=False): + + site_infos = [] + + for site in sites.findall('site'): + site_info = [site.get('id'), site.get('name'), site.find('source').get('type'), site.find('destination').get('type'), site.find('source').text, site.find('destination').text] + if (source_type is None or site.find('source').get('type') in source_type) and (destination_type is None or site.find('destination').get('type') in destination_type): + if all_flag: + site_infos.append(site_info) + elif site_is_available(sites, site.get('id')): + site_infos.append(site_info) + + return site_infos + + +def compile_rsync_command(sites, site_id, DRY_RUN=False): + + global QUIET_LVL + + command_subproc = [["rsync"]] + command_postproc = [[]] + + try: + subprocess.run(["rsync", "-h"], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) + except OSError as e: + print("ERROR - Rsync not found.\n", e, "\n") + sys.exit(0) + + + if not site_is_available(sites, site_id): + if not QUIET_LVL > 1: print(f"{ERROR_COLOR}ERROR - Site {site_id} not available.{RESET_COLOR}\n") + return 1 + + # FIND SITE + for site in sites.findall('site'): + if site.get('id') == site_id: + + site_params = get_site_params(site) + site_source = site.find('source').text + site_destination = site.find('destination').text + + if site.find('source').get('type') == "local": + site_source = os.path.abspath(os.path.expanduser(site_source)) + if site.find('destination').get('type') == "local": + site_destination = os.path.abspath(os.path.expanduser(site_destination)) + + # CONSTRUCT SSH-SPECIFIC FLAG + if site.find('source').get('type') == "remote_server" or site.find('destination').get('type') == "remote_server": + if site_params["ssh_port"] != None and site_params["ssh_key_location"] != None: + e = "ssh -p " + site_params["ssh_port"] + " -i " + os.path.abspath(os.path.expanduser(site_params["ssh_key_location"])) + command_subproc[0].extend(["-e", e ]) + else: + if not QUIET_LVL > 1: print(f"{ERROR_COLOR}ERROR - Unable to compile SSH command for site {site_id} - Missing arguments.{RESET_COLOR}\n") + return 1 + + # CONSTRUCT SNAPSHOT-SPECIFIC FLAG AND POST-PROCESSING + if site.find('destination').get('snapshot') == "true": + if "/" not in site_params['snap_base']: + + snap_base = site_params['snap_base'] if site_params['snap_base'] else "default." + snap_extension = "" + + if site_params['snap_extension'].lower() == "date": + snap_extension = subprocess.check_output(['date', '+%Y-%m-%d@%T'], encoding='UTF-8') + else: + temp_proc = subprocess.Popen(['find', site_destination, '-mindepth', '1', '-maxdepth', '1', '!', '-type', 'l', '-iname', f"{site_params['snap_base']}*"], stdout=subprocess.PIPE) + snap_extension = subprocess.check_output(['wc', '-l'], stdin=temp_proc.stdout, encoding='UTF-8') + temp_proc.wait() + + snap_name = snap_base + snap_extension + snap_name = snap_name[:-1] + site_destination_last = site_destination + ( "/last" if site_destination[-1] != "/" else "last" ) + command_subproc[0].extend(["--link-dest", site_destination_last ]) + site_destination = site_destination + ( f"/{snap_name}" if site_destination[-1] != "/" else f"{snap_name}" ) + + command_postproc = [['rm', '-f', site_destination_last], ['ln', '-s', site_destination, site_destination_last]] + + else: + if not QUIET_LVL > 1: print(f"{ERROR_COLOR}ERROR - Unable to compile snapshot command for site {site_id} - Trailing slash in base name.{RESET_COLOR}\n") + return 1 + + + # HANDLE FLAGS + command_subproc[0].extend(get_site_flags(site)) + if DRY_RUN == True: command_subproc[0].append("--dry-run") + + # HANDLE FILTERS + command_subproc[0].extend(get_site_filters(site)) + + # HANDLE SOURCE DIRECTORY + arg="" + if site.find('source').get('preserve_dir') == "true": + arg = (site_source if site_source[-1] != "/" else site_source[:-1]) + else: + arg = (site_source if site_source[-1] == "/" else site_source + "/") + command_subproc[0].append(arg) + + # HANDLE DESTINATION DIRECTORY + command_subproc[0].append(site_destination) + + # HANDLE POST-PROCESSING + if DRY_RUN == False and command_postproc != [[]]: command_subproc.extend(command_postproc) + + break + + if command_subproc != ["rsync"]: + return command_subproc + else: + return 1 + +def run_notification_script(sites, site_id_list, return_code): + + global QUIET_LVL + + script_type = "failure" if return_code > 0 else "success" + script_command = "" + + for script in sites.findall('notification'): + if script.get('type') == script_type: + script_command = re.sub("%ID", f"{site_id_list}" , script.text) + script_command = re.sub("~", os.getenv('HOME') , script_command) + + if script_command: + subprocess.run(shlex.split(script_command)) + else: + if not QUIET_LVL > 0: print(f"{WARNING_COLOR}WARNING - No valid notification script found. Skipping...{RESET_COLOR}\n") + + +def print_site_list(site_list, list_all): + + univ_l= 5 + ls = [0] * 6 + kanjicount = [] + title = ["SITE ID" , "SITE NAME", "TRANSFER TYPE", "TRANSFER SOURCE", "TRANSFER DESTINATION"] + + for site in site_list: + for i in range(len(ls)): + ls[i] = ls[i] if ls[i] > len(site[i]) else len(site[i]) + kanjicount.append([0] * 6) + + for site in site_list: + for i in range(len(ls)): + for k in site[i]: + if unicodedata.east_asian_width(k) == "W": + kanjicount[site_list.index(site)][i] = kanjicount[site_list.index(site)][i] - 1 + + + ls_title = [0] * 6 + bar = "" + formatstr= "" + for i in range(len(ls_title)): + ls_title[i] = ls[i] + univ_l + ls_title = [ls_title[0], ls_title[1], ls_title[2]+ls_title[3], ls_title[4], ls_title[5]] + for i in range(len(ls_title)): + formatstr = formatstr + "{:<" + str(ls_title[i]) + "}" + bar = bar + ("-" * (ls_title[i] - univ_l)) + (" " * univ_l) + + print("LISTING ALL", "CONFIGURED" if list_all else "AVAILABLE", "TRANSFER SITES\n") + print(formatstr.format(*title)) + print(bar) + # BAR SLIGHTLY OFFSET, FIGURE OUT WHY + + for site in site_list: + formatstr="" + ls_site = [0] * 6 + for i in range(len(ls_site)): + ls_site[i] = ls[i] + kanjicount[site_list.index(site)][i] + univ_l + ls_site = [ls_site[0], ls_site[1], ls_site[2]+ls_site[3], ls_site[4], ls_site[5]] + for i in range(len(ls_site)): + ls_site[i] = ls_site[i] if ls_site[i] > len(title[i]) else ( len(title[i]) + univ_l ) + for i in range(len(ls_site)): + formatstr = formatstr + "{:<" + str(ls_site[i]) + "}" + print(formatstr.format(site[0], site[1], f"{site[2]}->{site[3]}", site[4], site[5])) + + print() + +def main(): + + # IT'S PARSIN' TIME + parser = argparse.ArgumentParser(description="Perform rsync operations using a pre-configured list of transfer sites") + group = parser.add_mutually_exclusive_group() + group.add_argument("-l", "--list", help="list all available transfer sites", action="store_true") + group.add_argument("-L", "--list-all", help="list all configured transfer sites (including unavailable ones)", action="store_true") + group.add_argument("-t", "--list-types", help="list all known transfer site types (e.g. 'local', 'remote_server'...)", action="store_true") + parser.add_argument("--source", help="filter list of transfer sites by source type (used with [-l|-L], view available types with [-t])", action="extend", nargs="+", type=str) + parser.add_argument("--destination", help="filter list of transfer sites by destination type (used with [-l|-L], view available types with [-t])", action="extend", nargs="+", type=str) + group.add_argument("-s", "--sites", help="select transfer sites to process", action="extend", nargs="+", type=str) + parser.add_argument("-n", "--dry-run", help="turn all queued site transfers into dry runs (see: rsync(1) [-n|--dry-run])", action="store_true") + parser.add_argument("-q", "--quiet-level", help="set quietness level of site transfers ([] - print errors and warnings, [-q] - print errors only, [-qq...] - print critical errors only, default: [])", action="count", default=0) + parser.add_argument("-p", "--prompt-frequency", help="set prompt frequency of site transfers ([] - don't prompt, [-p] - prompt only once, [-pp...] - prompt once for each rsync command, default: [])", action="count", default=0) + parser.add_argument("--notify-each", help="run configured notification script once after each site transfer (default: runs once after all site transfers are done)", action="store_true") + parser.add_argument("-i", "--input-file", help=f"set custom file location to process transfer sites from (default: {SITES_DEFAULT})", action="store", nargs=1, type=str) + + print() + + if len(sys.argv) == 1: + parser.print_usage() + print() + sys.exit(1) + + args = parser.parse_args() + + # QUIET LEVEL AND PROMPT FREQUENCY + global QUIET_LVL + QUIET_LVL = args.quiet_level + prompt_frequency = args.prompt_frequency + + # LIST TYPES + if args.list_types == True: + response = "Currently implemented types of transfer sites are: " + for site_type in SITE_TYPES: + response = response + site_type + ", " + print(response[:-2], "\n") + sys.exit(0) + + # FIND INPUT FILE + if args.input_file != None and os.path.isfile(os.path.abspath(os.path.expanduser(args.input_file))): + if not QUIET_LVL > 1: print ("Using custom site location:", os.path.abspath(os.path.expanduser(args.input_file)), "\n") + sites = get_sites_root(os.path.abspath(os.path.expanduser(args.input_file))) + elif os.path.isfile(os.path.abspath(os.path.expanduser(SITES_DEFAULT))): + if not QUIET_LVL > 1: print ("Using default site location:", os.path.abspath(os.path.expanduser(SITES_DEFAULT)), "\n") + sites = get_sites_root(os.path.abspath(os.path.expanduser(SITES_DEFAULT))) + else: + print ("ERROR: No site locations found. Exiting...\n") + sys.exit(1) + + # VALIDATE INPUT FILE + errors = validate_sites(sites) + if errors > 0: + print("Number of errors:", errors, "\nExiting...\n") + sys.exit(1) + + # CONFIGURE FILTER + if (args.source != None or args.destination != None) and args.list != True and args.list_all != True: + print("ERROR: --source and --destination can only be used on a list (see: [-l|-L])\n") + sys.exit(1) + + # LIST SITES + if args.list == True or args.list_all == True: + site_list = get_sites(sites, args.source, args.destination, args.list_all) + print_site_list(site_list, args.list_all) + sys.exit(0) + + # WHERE THE MAGIC HAPPENS + elif args.sites != None: + site_id_list = "" + for site_id in args.sites: + site_id_list = site_id_list + site_id + ", " + if not site_exists(sites, site_id): + print("ERROR: One or more supplied sites do not exist. Exiting...\n") + sys.exit(1) + site_id_list = site_id_list[:-2] + + commands_list = [] + all_skipped = True + return_code = 0 + final_return_code = 0 + + for site_id in args.sites: + commands_list.append(compile_rsync_command(sites, site_id, args.dry_run)) + + query_1 = "Do you wish to run the rsync commands for sites " + site_id_list + (" (WARNING: ERROR IN ONE OR MORE SITES DETECTED)?" if 1 in commands_list else "?") + if prompt_frequency != 1 or (prompt_frequency == 1 and query_yes_no(query_1)): + for commands in commands_list: + final_return_code = 0 + curr_site_id = site_id_list.split(", ")[commands_list.index(commands)] + query_2 = "Do you wish to run the rsync command for site " + curr_site_id + (" (WARNING: ERROR IN SITE DETECTED)?" if commands == 1 else "?") + if prompt_frequency != 2 or (prompt_frequency == 2 and query_yes_no(query_2)): + all_skipped = False + for command in commands: + return_code = subprocess.run(command).returncode if command != 1 else command + final_return_code = final_return_code + return_code + if return_code != 0: break + if args.notify_each: run_notification_script(sites, curr_site_id, final_return_code) + print() + else: + if not QUIET_LVL > 1: print("Skipping command...\n") + else: print() + if not (args.notify_each or all_skipped): run_notification_script(sites, site_id_list, final_return_code) + else: + if not QUIET_LVL > 1: print("Skipping all commands...\n") + else: print() + + # END OF MAIN + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print("Keyboard interrupted received. Exiting...\n") + try: + sys.exit(0) + except SystemExit: + os._exit(0) + +# TO-DO +# FLAG FOR SHOWING COMMANDS TO RUN DURING PROMPT, ALSO FLAG FOR NOT RUNNING NOTIFICATION SCRIPT +# FINISH IMPLEMENTING snap_count +# FIX FORMATTING, DISPLAY x->y(snapshot) TYPE FOR SNAPSHOTS +# THINK OF OTHER WAYS TO STAMP SNAPS +# ONE DAY, CLEAN UP COMPILE_RSYNC FUNCTION INTO SUBROUTINES AND PROPERLY USE AN ARRAY FROM START TO FINISH |