Skip to content

Commit

Permalink
Improve sync error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
const-cloudinary committed Jul 27, 2021
1 parent bf97439 commit 2083800
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 23 deletions.
3 changes: 2 additions & 1 deletion cloudinary_cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
import sys

import click
import click_log
Expand Down Expand Up @@ -63,4 +64,4 @@ def main():


if __name__ == "__main__":
main()
sys.exit(main())
56 changes: 42 additions & 14 deletions cloudinary_cli/modules/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,23 @@ def sync(local_folder, cloudinary_folder, push, pull, include_hidden, concurrent
sync_dir = SyncDir(local_folder, cloudinary_folder, include_hidden, concurrent_workers, force, keep_unique,
deletion_batch_size)

result = 0
if push:
sync_dir.push()
result = sync_dir.push()
elif pull:
sync_dir.pull()
result = sync_dir.pull()

logger.info("Done!")

return result


class SyncDir:
def __init__(self, local_dir, remote_dir, include_hidden, concurrent_workers, force, keep_deleted,
deletion_batch_size):
self.local_dir = local_dir
self.remote_dir = remote_dir.strip('/')
self.user_friendly_remote_dir = self.remote_dir if self.remote_dir else '/'
self.include_hidden = include_hidden
self.concurrent_workers = concurrent_workers
self.force = force
Expand All @@ -67,7 +71,7 @@ def __init__(self, local_dir, remote_dir, include_hidden, concurrent_workers, fo
logger.info(f"Found {len(self.local_files)} items in local folder '{local_dir}'")

self.remote_files = query_cld_folder(self.remote_dir)
logger.info(f"Found {len(self.remote_files)} items in Cloudinary folder '{self.remote_dir}'")
logger.info(f"Found {len(self.remote_files)} items in Cloudinary folder '{self.user_friendly_remote_dir}'")

local_file_names = self.local_files.keys()
remote_file_names = self.remote_files.keys()
Expand Down Expand Up @@ -99,10 +103,10 @@ def __init__(self, local_dir, remote_dir, include_hidden, concurrent_workers, fo
self.out_of_sync_remote_file_names = set(self.diverse_file_names.get(f, f) for f in
self.out_of_sync_local_file_names)

skipping = len(common_file_names) - len(self.out_of_sync_local_file_names)
self.synced_files_count = len(common_file_names) - len(self.out_of_sync_local_file_names)

if skipping:
logger.info(f"Skipping {skipping} items")
if self.synced_files_count:
logger.info(f"Skipping {self.synced_files_count} items")

def _get_out_of_sync_file_names(self, common_file_names):
logger.debug("\nCalculating differences...\n")
Expand Down Expand Up @@ -130,7 +134,7 @@ def push(self):
if not files_to_push:
return True

logger.info(f"Uploading {len(files_to_push)} items to Cloudinary folder '{self.remote_dir}'")
logger.info(f"Uploading {len(files_to_push)} items to Cloudinary folder '{self.user_friendly_remote_dir}'")

options = {
'use_filename': True,
Expand All @@ -139,17 +143,32 @@ def push(self):
'resource_type': 'auto'
}
upload_results = {}
upload_errors = {}
uploads = []
for file in files_to_push:
folder = get_destination_folder(self.remote_dir, file)

uploads.append((self.local_files[file]['path'], {**options, 'folder': folder}, upload_results))
uploads.append(
(self.local_files[file]['path'], {**options, 'folder': folder}, upload_results, upload_errors))

try:
run_tasks_concurrently(upload_file, uploads, self.concurrent_workers)
finally:
self._print_sync_status(upload_results, upload_errors)
self._save_sync_meta_file(upload_results)

run_tasks_concurrently(upload_file, uploads, self.concurrent_workers)
if upload_errors:
raise Exception("Sync did not finish successfully")

self.save_sync_meta_file(upload_results)
def _print_sync_status(self, success, errors):
logger.info("==Sync Status==")
logger.info("===============")
logger.info(f"In Sync| {self.synced_files_count}")
logger.info(f"Synced | {len(success)}")
logger.info(f"Failed | {len(errors)}")
logger.info("===============")

def save_sync_meta_file(self, upload_results):
def _save_sync_meta_file(self, upload_results):
diverse_filenames = {}
for local_path, remote_path in upload_results.items():
local = normalize_file_extension(path.relpath(local_path, self.local_dir))
Expand All @@ -163,6 +182,7 @@ def save_sync_meta_file(self, upload_results):
if diverse_filenames or current_diverse_files != self.diverse_file_names:
current_diverse_files.update(diverse_filenames)
try:
logger.debug(f"Updating '{self.sync_meta_file}' file")
write_json_to_file(current_diverse_files, self.sync_meta_file)
logger.debug(f"Updated '{self.sync_meta_file}' file")
except Exception as e:
Expand All @@ -175,7 +195,7 @@ def _handle_unique_remote_files(self):
return handled

logger.info(f"Deleting {len(self.unique_remote_file_names)} resources "
f"from Cloudinary folder '{self.remote_dir}'")
f"from Cloudinary folder '{self.user_friendly_remote_dir}'")
files_to_delete_from_cloudinary = list(map(lambda x: self.remote_files[x], self.unique_remote_file_names))

for i in product({"upload", "private", "authenticated"}, {"image", "video", "raw"}):
Expand Down Expand Up @@ -204,6 +224,8 @@ def _handle_unique_remote_files(self):
return True

def pull(self):
download_results = {}
download_errors = {}
if not self._handle_unique_local_files():
return False

Expand All @@ -218,9 +240,15 @@ def pull(self):
remote_file = self.remote_files[file]
local_path = path.abspath(path.join(self.local_dir, file))

downloads.append((remote_file, local_path))
downloads.append((remote_file, local_path, download_results, download_errors))

try:
run_tasks_concurrently(download_file, downloads, self.concurrent_workers)
finally:
self._print_sync_status(download_results, download_errors)

run_tasks_concurrently(download_file, downloads, self.concurrent_workers)
if download_errors:
raise Exception("Sync did not finish successfully")

def _handle_unique_local_files(self):
handled = self._handle_files_deletion(len(self.unique_local_file_names), "local")
Expand Down
2 changes: 1 addition & 1 deletion cloudinary_cli/modules/upload_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
@option("-w", "--concurrent_workers", type=int, default=30, help="Specify number of concurrent network threads.")
def upload_dir(directory, optional_parameter, optional_parameter_parsed, transformation, folder, preset,
concurrent_workers):
items, skipped = {}, []
items, skipped = {}, {}
dir_to_upload = abspath(path_join(getcwd(), directory))
logger.info("Uploading directory '{}'".format(dir_to_upload))
parent = dirname(dir_to_upload)
Expand Down
19 changes: 12 additions & 7 deletions cloudinary_cli/utils/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ def query_cld_folder(folder):
return files


def upload_file(file_path, options, uploaded=None, skipped=None):
def upload_file(file_path, options, uploaded=None, failed=None):
uploaded = uploaded if uploaded is not None else {}
skipped = skipped if skipped is not None else []
failed = failed if failed is not None else {}
verbose = logger.getEffectiveLevel() < logging.INFO

try:
Expand All @@ -64,11 +64,12 @@ def upload_file(file_path, options, uploaded=None, skipped=None):
uploaded[file_path] = asset_source(result)
except Exception as e:
log_exception(e, f"Failed uploading {file_path}")
skipped.append(file_path)
raise
failed[file_path] = str(e)


def download_file(remote_file, local_path):
def download_file(remote_file, local_path, downloaded=None, failed=None):
downloaded = downloaded if downloaded is not None else {}
failed = failed if failed is not None else {}
makedirs(path.dirname(local_path), exist_ok=True)

sign_url = True if remote_file['type'] in ("private", "authenticated") else False
Expand All @@ -79,14 +80,18 @@ def download_file(remote_file, local_path):
result = requests.get(download_url)

if result.status_code != 200:
err = result.headers.get('x-cld-error')
msg = f"Failed downloading: {download_url}, status code: {result.status_code}, " \
f"details: {result.headers.get('x-cld-error')}"
f"details: {err}"
logger.error(msg)

failed[download_url] = err
return

with open(local_path, "wb") as f:
f.write(result.content)

downloaded[remote_file['relative_path']] = local_path

logger.info(style("Downloaded '{}' to '{}'".format(remote_file['relative_path'], local_path), fg="green"))


Expand Down

0 comments on commit 2083800

Please sign in to comment.