Source code for acdcli.utils.progress

import time
import sys
from math import floor, log10
from collections import deque


[docs]class FileProgress(object): __slots__ = ('current', 'status', 'total')
[docs] def __init__(self, total_sz: int, current: int=0): self.total = total_sz self.current = current self.status = None
[docs] def update(self, chunk): self.current += chunk.__sizeof__()
[docs] def reset(self): self.current = 0
[docs] def done(self): self.current = self.total
[docs]class MultiProgress(object): """Container that accumulates multiple FileProgress objects"""
[docs] def __init__(self): self._progresses = [] self._last_inv = None self._last_prog = 0 self._last_speeds = deque([0] * 10, 10)
[docs] def end(self): self.print_progress() print() failed = sum(1 for s in self._progresses if s.status) if failed: print('%d file(s) failed.' % failed)
[docs] def add(self, progress: FileProgress): self._progresses.append(progress)
[docs] def print_progress(self): total = 0 current = 0 complete = 0 for p in self._progresses: total += p.total current += p.current if p.total <= p.current: complete += 1 if current > total: total = current self._print(total, current, len(self._progresses), complete)
def _print(self, total_sz: int, current_sz: int, total_items: int, done: int): """Prints a line that includes a progress bar, total and current transfer size, total and done items, average speed, and ETA. Uses ANSI escape codes.""" if not self._last_inv: self._last_inv = time.time() t = time.time() duration = t - self._last_inv speed = (current_sz - self._last_prog) / duration if duration else 0 rate = float(current_sz) / total_sz if total_sz else 1 self._last_speeds.append(speed) avg_speed = float(sum(self._last_speeds)) / len(self._last_speeds) eta = float(total_sz - current_sz) / avg_speed if avg_speed else 0 self._last_inv, self._last_prog = t, current_sz percentage = round(rate * 100, ndigits=2) if rate <= 1 else 100 completed = "#" * int(percentage / 4) spaces = " " * (25 - len(completed)) item_width = floor(log10(total_items)) sys.stdout.write('[%s%s] %s%% of %s %s/%d %s %s\x1b[K\r' % (completed, spaces, ('%3.1f' % percentage).rjust(5), (file_size_str(total_sz)).rjust(7), str(done).rjust(item_width + 1), total_items, (speed_str(avg_speed)).rjust(10), time_str(eta).rjust(7))) sys.stdout.flush()
[docs]def speed_str(num: int, suffix='B', time_suffix='/s') -> str: for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']: if abs(num) < 1000.0: return "%3.1f%s%s%s" % (num, unit, suffix, time_suffix) num /= 1000.0 return "%.1f%s%s%s" % (num, 'Y', suffix, time_suffix)
[docs]def file_size_str(num: int, suffix='B') -> str: for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: if abs(num) < 1024.0: return "%4.0f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, 'Yi', suffix)
[docs]def time_str(num: float) -> str: if num <= 0: return '0s' if num < 60: return '%02ds' % num elif num < 3600: seconds = num % 60 // 1 minutes = (num - seconds) // 60 return '%02d:%02dm' % (minutes, seconds) elif num <= 86400: minutes = num % 3600 // 60 hours = (num - minutes) // 3600 return '%02d:%02dh' % (hours, minutes) elif num <= 31536000: hours = num % 86400 // 3600 days = (num - hours) // 86400 if days >= 100: return '%id' % days return '%id %02dh' % (days, hours) else: return '>1 year'