Back to Folder
ID | 0fd22e0e-c5ce-44c8-8c11-40a62efb85f4 |
Filename |
anon_cli.py
|
Size | 27.42 KB |
Uploaded | 2025-05-21 07:14:46 |
Downloads | 0 |
MIME Type | text/x-script.python |
Hashes |
- CRC32: c5bf0428
- MD5: 38b72a0ba0127f6fb02e8e42f1b7cc08
- SHA1: a3666b7ab3ab4303d7d11ec62c84453437172832
- SHA256: 77e41c5a1faf10f308b586d4322383f9ef374ff8f4264d753326147cb569df18
|
Download file
Preview (text)
#!/usr/bin/env python3
"""
anon_cli.py
A simple Python CLI client for anon.services file hosting.
Usage examples:
# Create a new top-level folder (no parent):
anon_cli.py create-folder [--folder-pw <pw>]
# Upload local files into an existing folder:
anon_cli.py upload --folder-uuid <uuid> --edit-token <token> [--make-private] [--file-password <pw>] file1 file2 ...
# Recursively upload a local folder into a new top-level folder:
anon_cli.py upload-folder /path/to/localfolder --create --folder-pw secret
# Or recursively upload localfolder into an existing remote folder:
anon_cli.py upload-folder /path/to/localfolder --folder-uuid <uuid> --edit-token <token>
# Download a single file (with optional passwords):
anon_cli.py download-file --file-uuid <uuid> [--out localFile] [--file-pw <pw>] [--folder-pw <pw>]
# Recursively download an entire folder:
anon_cli.py download-folder --folder-uuid <uuid> [--target-dir .] [--folder-pw <pw>]
# Delete a file:
anon_cli.py delete-file --file-uuid <uuid> --delete-token <token>
# Delete a folder:
anon_cli.py delete-folder --folder-uuid <uuid> --edit-token <token>
"""
import argparse
import os
import sys
import time
from typing import Generator, Optional, Tuple
import requests
import json
import urllib.parse
API_ENDPOINT = "https://anon.services/api"
CHUNK_SIZE = 1024 * 512 # 512 KB per chunk
def format_time(seconds: float) -> str:
seconds = int(seconds)
days = seconds // 86400
seconds %= 86400
hours = seconds // 3600
seconds %= 3600
minutes = seconds // 60
seconds %= 60
parts = []
if days > 0:
parts.append(f"{days} day{'s' if days != 1 else ''}")
if hours > 0:
parts.append(f"{hours} hour{'s' if hours != 1 else ''}")
if minutes > 0:
parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}")
parts.append(f"{seconds} second{'s' if seconds != 1 else ''}")
return " ".join(parts)
def make_headers() -> dict:
return {}
def _try_request(method: str, url: str, **kwargs) -> Optional[requests.Response]:
"""
Internal helper for retrying requests up to 3 times.
Delays: 5s, 10s, 15s. Returns Response if successful (2xx).
Returns None if all attempts fail.
"""
delays = [5, 10, 15]
attempt_count = len(delays)
last_exception = None
for i in range(attempt_count):
try:
resp = requests.request(method, url, **kwargs)
# If 2xx, consider it successful and return immediately
if 200 <= resp.status_code < 300:
return resp
else:
last_exception = f"HTTP error {resp.status_code}: {resp.text}"
except requests.RequestException as ex:
last_exception = str(ex)
# Wait if not the last attempt
if i < attempt_count - 1:
time.sleep(delays[i])
# After 3 attempts, return None
return None
def post_json(url: str, data: dict) -> Tuple[int, dict]:
"""
POST JSON with retry logic.
Returns (status_code, dict_data). If fails after 3 attempts, returns (0, {"error": "..."}).
"""
headers = make_headers()
for _ in range(1): # single loop for clarity
resp = _try_request(
method="POST",
url=url,
json=data,
timeout=60,
headers=headers
)
if not resp:
return 0, {"error": f"Failed after 3 attempts posting to {url}"}
try:
resp_data = resp.json()
except:
resp_data = {"error": resp.text}
return resp.status_code, resp_data
return 0, {"error": f"Unknown error in post_json for {url}"}
def delete_request(url: str) -> Tuple[int, dict]:
"""
DELETE request with retry logic.
Returns (status_code, dict_data). If fails after 3 attempts, returns (0, {"error": "..."}).
"""
headers = make_headers()
for _ in range(1):
resp = _try_request(
method="DELETE",
url=url,
timeout=60,
headers=headers
)
if not resp:
return 0, {"error": f"Failed after 3 attempts deleting {url}"}
try:
resp_data = resp.json()
except:
resp_data = {"error": resp.text}
return resp.status_code, resp_data
return 0, {"error": f"Unknown error in delete_request for {url}"}
def get_request(url: str) -> Tuple[int, dict]:
"""
GET request with retry logic.
Returns (status_code, dict_data). If fails after 3 attempts, returns (0, {"error": "..."}).
"""
headers = make_headers()
for _ in range(1):
resp = _try_request(
method="GET",
url=url,
timeout=60,
headers=headers
)
if not resp:
return 0, {"error": f"Failed after 3 attempts getting {url}"}
try:
resp_data = resp.json()
except:
resp_data = {"error": resp.text}
return resp.status_code, resp_data
return 0, {"error": f"Unknown error in get_request for {url}"}
def create_folder(folder_pw: str) -> None:
url = f"{API_ENDPOINT}/folders"
payload = {}
if folder_pw:
payload["folder_password"] = folder_pw
st, js = post_json(url, payload)
if st != 200:
print(f"Error creating folder: {js.get('error', js)}", file=sys.stderr)
sys.exit(1)
folder_uuid = js.get("folder_uuid")
edit_token = js.get("edit_token")
base_url = API_ENDPOINT.replace("/api", "")
view_link = f"{base_url}/folder/{folder_uuid}"
edit_link = f"{view_link}?token={edit_token}"
print(f"Folder created successfully.")
print(f"Folder UUID: {folder_uuid}")
print(f"Edit Token: {edit_token}")
print(f"Folder link: {view_link}")
print(f"Edit link: {edit_link}")
def get_upload_url(file_size: int) -> Tuple[Optional[str], Optional[str]]:
url = f"{API_ENDPOINT}/files/get_upload_url"
st, js = post_json(url, {"file_size": file_size})
if st != 200 or "error" in js:
print(f"Error from get_upload_url: {js.get('error', js)}", file=sys.stderr)
return None, None
return js.get("upload_url"), js.get("token")
def upload_file_with_progress_realtime(upload_url: str, local_filepath: str) -> bool:
"""
Upload file with progress bar, retrying the entire upload up to 3 times
if there's an error or a non-200 response.
After 3 failures, returns False to skip this file.
"""
filename = os.path.basename(local_filepath)
file_size = os.path.getsize(local_filepath)
class ProgressFileWrapper:
def __init__(self, path, chunk_size):
self.f = open(path, "rb")
self.chunk_size = chunk_size
self.total_size = file_size
self.bytes_read = 0
self.start_time = time.time()
self.last_print_time = self.start_time
def __iter__(self):
return self
def __next__(self):
chunk = self.f.read(self.chunk_size)
if not chunk:
raise StopIteration
self.bytes_read += len(chunk)
self._print_progress()
return chunk
def _print_progress(self):
now = time.time()
if now - self.last_print_time >= 1.0 or self.bytes_read == self.total_size:
elapsed = now - self.start_time
if elapsed <= 0:
elapsed = 0.000001
speed = self.bytes_read / elapsed
if speed < 1024:
speed_str = f"{speed:.2f} B/s"
elif speed < 1024*1024:
speed_str = f"{(speed / 1024):.2f} KB/s"
else:
speed_str = f"{(speed / (1024*1024)):.2f} MB/s"
pct = (self.bytes_read / self.total_size) * 100 if self.total_size > 0 else 0
bytes_left = self.total_size - self.bytes_read
remaining_time = bytes_left / speed if speed > 0 else 0
time_remaining_str = format_time(remaining_time)
print(
f"[Upload] {filename} {pct:.2f}% | {self.bytes_read}/{self.total_size} bytes | "
f"{speed_str} | ETA: {time_remaining_str}",
end="\r",
flush=True
)
self.last_print_time = now
def close(self):
self.f.close()
headers = {"Content-Type": "application/octet-stream"}
headers.update(make_headers())
def do_single_attempt() -> bool:
wrapper_local = ProgressFileWrapper(local_filepath, CHUNK_SIZE)
try:
r = requests.post(upload_url, data=wrapper_local, headers=headers, stream=True, timeout=600)
print("")
if r.status_code != 200:
print(f"Upload error {r.status_code}: {r.text}", file=sys.stderr)
return False
try:
js = r.json()
except:
print(f"Upload agent response is not JSON: {r.text}", file=sys.stderr)
return False
if js.get("status") != "ok":
print(f"Agent error: {js}", file=sys.stderr)
return False
return True
except requests.RequestException as e:
print(f"Error uploading {filename}: {str(e)}", file=sys.stderr)
return False
finally:
wrapper_local.close()
# Retry logic for the upload itself
delays = [5, 10, 15]
for i, delay in enumerate(delays):
if do_single_attempt():
return True
if i < len(delays) - 1:
print(f"Retrying upload of {filename} after {delay} seconds...")
time.sleep(delay)
# After 3 failures
print(f"Skipping file {filename} after 3 failed upload attempts.", file=sys.stderr)
return False
def notify_upload(ephemeral_token: str, folder_uuid: Optional[str], edit_token: Optional[str],
make_private: bool, file_password: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
"""
Calls /files/notify_upload to finalize the file in DB. Retries up to 3 times.
Returns (file_uuid, view_link) if success, otherwise (None, None).
"""
url = f"{API_ENDPOINT}/files/notify_upload"
data = {
"token": ephemeral_token,
"make_private": make_private
}
if folder_uuid:
data["folder_uuid"] = folder_uuid
if edit_token:
data["folder_token"] = edit_token
if file_password:
data["file_password"] = file_password
for attempt in range(3):
st, js = post_json(url, data)
if st == 200 and "error" not in js:
return js.get("file_uuid"), js.get("view_link")
if attempt < 2:
wait_time = 5 * (attempt + 1) # 5, then 10
print(f"notify_upload error, retrying after {wait_time} seconds...")
time.sleep(wait_time)
# After 3 attempts
return None, None
def upload_files(folder_uuid: str, edit_token: str, filepaths: list,
make_private=False, file_password=None) -> None:
base_url = API_ENDPOINT.replace("/api", "")
folder_link = f"{base_url}/folder/{folder_uuid}"
edit_link = f"{folder_link}?token={edit_token}"
for local_path in filepaths:
if not os.path.isfile(local_path):
print(f"Not a file: {local_path}", file=sys.stderr)
continue
file_size = os.path.getsize(local_path)
# Get upload URL (retry is inside post_json)
upload_url, token = get_upload_url(file_size)
if not upload_url or not token:
print(f"Failed to get upload URL for {local_path} after 3 attempts. Skipping.", file=sys.stderr)
continue
filename = os.path.basename(local_path)
encoded_name = urllib.parse.quote(filename)
upload_url_with_filename = f"{upload_url}&filename={encoded_name}"
ok = upload_file_with_progress_realtime(upload_url_with_filename, local_path)
if not ok:
# Already skipped after 3 tries
continue
# notify_upload with its own retry
f_uuid, view_link = notify_upload(
ephemeral_token=token,
folder_uuid=folder_uuid,
edit_token=edit_token,
make_private=make_private,
file_password=file_password
)
if f_uuid:
print(f"Uploaded '{local_path}' => file_uuid={f_uuid}")
if view_link:
print(f"View link: {view_link}")
else:
print(f"Failed to finalize upload for {local_path} after 3 attempts. Skipping.")
print("\n--- Upload Finished ---")
print(f"Parent folder link: {folder_link}")
print(f"Edit token: {edit_token}")
print(f"Edit link: {edit_link}")
def download_file(file_uuid: str, out_path: str,
file_pw: Optional[str] = None,
folder_pw: Optional[str] = None) -> bool:
"""
Download a single file with optional file/folder passwords,
retrying the actual download up to 3 times.
"""
# First, get file info (this uses get_request with retry)
params = {}
if file_pw:
params["file_pw"] = file_pw
if folder_pw:
params["folder_pw"] = folder_pw
query_str = "?" + urllib.parse.urlencode(params) if params else ""
info_url = f"{API_ENDPOINT}/files/{file_uuid}{query_str}"
st, info_js = get_request(info_url)
if st != 200 or "error" in info_js:
print(f"Error fetching file info: {info_js.get('error', info_js)}", file=sys.stderr)
return False
filename = info_js.get("filename")
file_size = info_js.get("size", 0)
if not out_path:
out_path = filename
base_url = API_ENDPOINT.replace("/api", "")
dl_url = f"{base_url}/download/{file_uuid}{query_str}"
print(f"Downloading file_uuid={file_uuid} => {out_path} (size={file_size} bytes)")
def do_single_download() -> bool:
start_time = time.time()
last_print_time = start_time
bytes_downloaded = 0
headers = make_headers()
try:
with requests.get(dl_url, stream=True, timeout=60, headers=headers) as r:
if r.status_code != 200:
err_text = r.text
print(f"Download error {r.status_code}: {err_text}", file=sys.stderr)
return False
with open(out_path, "wb") as f:
for chunk in r.iter_content(chunk_size=CHUNK_SIZE):
if not chunk:
break
f.write(chunk)
bytes_downloaded += len(chunk)
now = time.time()
if now - last_print_time >= 1.0 or bytes_downloaded == file_size:
elapsed = now - start_time
if elapsed <= 0:
elapsed = 0.000001
speed = bytes_downloaded / elapsed
if speed < 1024:
speed_str = f"{speed:.2f} B/s"
elif speed < 1024*1024:
speed_str = f"{(speed / 1024):.2f} KB/s"
else:
speed_str = f"{(speed / (1024*1024)):.2f} MB/s"
pct = 0.0
bytes_left = 0
if file_size > 0:
pct = (bytes_downloaded / file_size) * 100
bytes_left = file_size - bytes_downloaded
remaining_time = bytes_left / speed if speed > 0 else 0
time_remaining_str = format_time(remaining_time)
print(
f"[Download] {out_path} {pct:.2f}% | "
f"{bytes_downloaded}/{file_size} bytes | {speed_str} | ETA: {time_remaining_str}",
end="\r",
flush=True
)
last_print_time = now
print("")
print(f"Completed download: {out_path} ({bytes_downloaded} bytes).")
return True
except requests.RequestException as e:
print(f"Download request error: {str(e)}", file=sys.stderr)
return False
delays = [5, 10, 15]
for i, delay in enumerate(delays):
if do_single_download():
return True
if i < len(delays) - 1:
print(f"Retrying download of {file_uuid} after {delay} seconds...")
time.sleep(delay)
# After 3 failures
print(f"Skipping file {file_uuid} after 3 failed download attempts.", file=sys.stderr)
return False
def gather_all_files(folder_uuid: str, folder_pw: str) -> Tuple[str, list]:
"""
Returns (remote_folder_name, list_of_files), where list_of_files is a list of dict with:
{file_uuid, filename, size, subpath}
Recursively gathers files from subfolders as well.
"""
params = {}
if folder_pw:
params["folder_pw"] = folder_pw
q = "?" + urllib.parse.urlencode(params) if params else ""
url = f"{API_ENDPOINT}/folders/{folder_uuid}{q}"
st, js = get_request(url)
if st != 200 or "error" in js:
err = js.get("error", js)
raise RuntimeError(f"Error fetching folder {folder_uuid}: {err}")
folder_name = js.get("folder_name", "remote_folder")
subfolders = js.get("subfolders", [])
files_ = js.get("files", [])
results = []
for f in files_:
results.append({
"file_uuid": f["file_uuid"],
"filename": f["filename"],
"size": f["size"],
"subpath": f["filename"]
})
for sf in subfolders:
sf_uuid = sf["folder_uuid"]
sf_name = sf["folder_name"]
sub_folder_name, sub_items = gather_all_files(sf_uuid, folder_pw)
for it in sub_items:
it["subpath"] = os.path.join(sf_name, it["subpath"])
results.extend(sub_items)
return (folder_name, results)
def download_folder(folder_uuid: str, target_dir: str, folder_pw: Optional[str] = None) -> None:
"""
Recursively download a folder.
Creates a local subfolder = <target_dir>/<remote_folder_name>
and stores everything inside it.
"""
folder_pw_str = folder_pw or ""
try:
root_folder_name, all_files = gather_all_files(folder_uuid, folder_pw_str)
except RuntimeError as e:
print(f"Cannot gather folder structure: {str(e)}", file=sys.stderr)
sys.exit(1)
local_base = os.path.join(target_dir, root_folder_name)
if not os.path.isdir(local_base):
os.makedirs(local_base, exist_ok=True)
total_size = sum([f["size"] for f in all_files])
print(f"Found {len(all_files)} files. Total size={total_size} bytes.")
cumulative_downloaded = 0
for finfo in all_files:
file_uuid = finfo["file_uuid"]
fname = finfo["filename"]
fsize = finfo["size"]
subrel = finfo["subpath"]
out_path = os.path.join(local_base, subrel)
if not os.path.isdir(os.path.dirname(out_path)):
os.makedirs(os.path.dirname(out_path), exist_ok=True)
print(f"\nDownloading {file_uuid} => {out_path} (size={fsize} bytes)...")
success = download_file(
file_uuid,
out_path=out_path,
file_pw=None,
folder_pw=folder_pw_str
)
if not success:
print(f"Failed to download {file_uuid}")
else:
cumulative_downloaded += fsize
print(f"\nAll downloads done. {cumulative_downloaded} of {total_size} bytes downloaded.")
def delete_file(file_uuid: str, delete_token: str):
url = f"{API_ENDPOINT}/files/{file_uuid}?token={delete_token}"
st, js = delete_request(url)
if st != 200 or "error" in js:
print(f"Error deleting file: {js.get('error', js)}", file=sys.stderr)
sys.exit(1)
print("File deleted successfully.")
def delete_folder(folder_uuid: str, edit_token: str):
url = f"{API_ENDPOINT}/folders/{folder_uuid}?token={edit_token}"
st, js = delete_request(url)
if st != 200 or "error" in js:
print(f"Error deleting folder: {js.get('error', js)}", file=sys.stderr)
sys.exit(1)
print("Folder deleted successfully.")
def create_subfolder(parent_uuid: str, parent_token: str, subfolder_name: str, folder_pw: str) -> Tuple[Optional[str], Optional[str]]:
"""
Creates a subfolder under parent_uuid using parent's edit token.
Returns (folder_uuid, edit_token).
Retries are built into post_json as well.
"""
url = f"{API_ENDPOINT}/folders"
payload = {
"parent_folder_uuid": parent_uuid,
"parent_folder_token": parent_token,
"folder_name": subfolder_name
}
if folder_pw:
payload["folder_password"] = folder_pw
st, js = post_json(url, payload)
if st != 200 or "error" in js:
print(f"Error creating subfolder '{subfolder_name}': {js.get('error', js)}", file=sys.stderr)
return None, None
return js["folder_uuid"], js["edit_token"]
def upload_folder_recursive(local_dir: str,
parent_folder_uuid: str,
parent_edit_token: str,
folder_password: str,
make_private: bool) -> None:
"""
Recursively upload local_dir to an existing remote folder.
All subfolders are created on the remote using the parent's edit_token.
"""
if not os.path.isdir(local_dir):
print(f"Not a directory: {local_dir}", file=sys.stderr)
return
entries = os.listdir(local_dir)
files_ = []
folders_ = []
for e in entries:
full_path = os.path.join(local_dir, e)
if os.path.isfile(full_path):
files_.append(full_path)
elif os.path.isdir(full_path):
folders_.append(full_path)
# Upload all files in this directory
upload_files(
folder_uuid=parent_folder_uuid,
edit_token=parent_edit_token,
filepaths=files_,
make_private=make_private,
file_password=folder_password if folder_password else None
)
# Create subfolders and recurse
for sub in folders_:
sub_name = os.path.basename(sub)
new_uuid, new_token = create_subfolder(parent_folder_uuid, parent_edit_token, sub_name, folder_password)
if not new_uuid:
print(f"Skipping local subfolder {sub} due to error.")
continue
upload_folder_recursive(sub, new_uuid, new_token, folder_password, make_private)
def main():
parser = argparse.ArgumentParser(description="ANON.SERVICES CLI")
sub = parser.add_subparsers(dest="command", required=True)
cf = sub.add_parser("create-folder", help="Create a new top-level folder")
cf.add_argument("--folder-pw", default="", help="Optional folder password")
up = sub.add_parser("upload", help="Upload file(s) to an existing folder")
up.add_argument("--folder-uuid", required=True)
up.add_argument("--edit-token", required=True)
up.add_argument("--make-private", action="store_true", default=False)
up.add_argument("--file-password", default="", help="File-level password (optional)")
up.add_argument("files", nargs="+", help="Paths to local files")
uf = sub.add_parser("upload-folder", help="Recursively upload a local folder")
uf.add_argument("local_folder", help="Path to local folder")
uf.add_argument("--create", action="store_true", default=False,
help="Create a brand-new top-level folder if set")
uf.add_argument("--folder-pw", default="", help="If creating new top folder or to lock all files")
uf.add_argument("--make-private", action="store_true", default=False)
uf.add_argument("--folder-uuid", default="",
help="Existing folder uuid (omit if using --create)")
uf.add_argument("--edit-token", default="",
help="Existing folder edit token (omit if using --create)")
df = sub.add_parser("download-file", help="Download a single file")
df.add_argument("--file-uuid", required=True)
df.add_argument("--out", default="", help="Output filepath")
df.add_argument("--file-pw", default="", help="File-level password")
df.add_argument("--folder-pw", default="", help="Folder password")
dfo = sub.add_parser("download-folder", help="Download entire folder (recursively)")
dfo.add_argument("--folder-uuid", required=True)
dfo.add_argument("--target-dir", default=".", help="Local destination directory")
dfo.add_argument("--folder-pw", default="", help="Folder password if needed")
delf = sub.add_parser("delete-file", help="Delete a file by file_uuid + delete_token")
delf.add_argument("--file-uuid", required=True)
delf.add_argument("--delete-token", required=True)
delfo = sub.add_parser("delete-folder", help="Delete a folder by folder_uuid + edit-token")
delfo.add_argument("--folder-uuid", required=True)
delfo.add_argument("--edit-token", required=True)
args = parser.parse_args()
if args.command == "create-folder":
create_folder(args.folder_pw)
elif args.command == "upload":
upload_files(
folder_uuid=args.folder_uuid,
edit_token=args.edit_token,
filepaths=args.files,
make_private=args.make_private,
file_password=args.file_password if args.file_password else None
)
elif args.command == "upload-folder":
if args.create:
# Create new top-level folder first
url = f"{API_ENDPOINT}/folders"
payload = {"folder_name": os.path.basename(os.path.normpath(args.local_folder))}
if args.folder_pw:
payload["folder_password"] = args.folder_pw
st, js = post_json(url, payload)
if st != 200 or "error" in js:
print(f"Error creating top-level folder: {js.get('error', js)}", file=sys.stderr)
sys.exit(1)
top_uuid = js["folder_uuid"]
top_edit_token = js["edit_token"]
print(f"New top-level folder created: {top_uuid}")
print(f"Edit token: {top_edit_token}")
upload_folder_recursive(
local_dir=args.local_folder,
parent_folder_uuid=top_uuid,
parent_edit_token=top_edit_token,
folder_password=args.folder_pw,
make_private=args.make_private
)
else:
if not args.folder_uuid or not args.edit_token:
print("Must provide --folder-uuid and --edit-token or use --create.", file=sys.stderr)
sys.exit(1)
upload_folder_recursive(
local_dir=args.local_folder,
parent_folder_uuid=args.folder_uuid,
parent_edit_token=args.edit_token,
folder_password=args.folder_pw,
make_private=args.make_private
)
elif args.command == "download-file":
download_file(
file_uuid=args.file_uuid,
out_path=args.out,
file_pw=args.file_pw or None,
folder_pw=args.folder_pw or None
)
elif args.command == "download-folder":
download_folder(
folder_uuid=args.folder_uuid,
target_dir=args.target_dir,
folder_pw=args.folder_pw or None
)
elif args.command == "delete-file":
delete_file(args.file_uuid, args.delete_token)
elif args.command == "delete-folder":
delete_folder(args.folder_uuid, args.edit_token)
else:
parser.print_help()
if __name__ == "__main__":
main()