Runtime error
Runtime error
File size: 7,640 Bytes
33e71b2 112bf3b 33e71b2 112bf3b 33e71b2 aa24cb4 33e71b2 aa24cb4 33e71b2 aa24cb4 33e71b2 aa24cb4 33e71b2 aa24cb4 33e71b2 aa24cb4 33e71b2 aa24cb4 33e71b2 aa24cb4 33e71b2 aa24cb4 33e71b2 aa24cb4 33e71b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# YOLOv5 π by Ultralytics, GPL-3.0 license
Download utils
import logging
import os
import platform
import subprocess
import time
import urllib
from pathlib import Path
from zipfile import ZipFile
import requests
import torch
def is_url(url, check_online=True):
# Check if online file exists
url = str(url)
result = urllib.parse.urlparse(url)
assert all([result.scheme, result.netloc, result.path]) # check if is url
return (urllib.request.urlopen(url).getcode() == 200) if check_online else True # check if exists online
except (AssertionError, urllib.request.HTTPError):
return False
def gsutil_getsize(url=''):
# gs://bucket/file size
s = subprocess.check_output(f'gsutil du {url}', shell=True).decode('utf-8')
return eval(s.split(' ')[0]) if len(s) else 0 # bytes
def url_getsize(url=''):
# Return downloadable file size in bytes
response = requests.head(url, allow_redirects=True)
return int(response.headers.get('content-length', -1))
def safe_download(file, url, url2=None, min_bytes=1E0, error_msg=''):
# Attempts to download file from url or url2, checks and removes incomplete downloads < min_bytes
from utils.general import LOGGER
file = Path(file)
assert_msg = f"Downloaded file '{file}' does not exist or size is < min_bytes={min_bytes}"
try: # url1'Downloading {url} to {file}...')
torch.hub.download_url_to_file(url, str(file), progress=LOGGER.level <= logging.INFO)
assert file.exists() and file.stat().st_size > min_bytes, assert_msg # check
except Exception as e: # url2
if file.exists():
file.unlink() # remove partial downloads'ERROR: {e}\nRe-attempting {url2 or url} to {file}...')
os.system(f"curl -# -L '{url2 or url}' -o '{file}' --retry 3 -C -") # curl download, retry and resume on fail
if not file.exists() or file.stat().st_size < min_bytes: # check
if file.exists():
file.unlink() # remove partial downloads"ERROR: {assert_msg}\n{error_msg}")'')
def attempt_download(file, repo='ultralytics/yolov5', release='v6.2'):
# Attempt file download from GitHub release assets if not found locally. release = 'latest', 'v6.2', etc.
from utils.general import LOGGER
def github_assets(repository, version='latest'):
# Return GitHub repo tag (i.e. 'v6.2') and assets (i.e. ['', '', ...])
if version != 'latest':
version = f'tags/{version}' # i.e. tags/v6.2
response = requests.get(f'{repository}/releases/{version}').json() # github api
return response['tag_name'], [x['name'] for x in response['assets']] # tag, assets
file = Path(str(file).strip().replace("'", ''))
if not file.exists():
# URL specified
name = Path(urllib.parse.unquote(str(file))).name # decode '%2F' to '/' etc.
if str(file).startswith(('http:/', 'https:/')): # download
url = str(file).replace(':/', '://') # Pathlib turns :// -> :/
file = name.split('?')[0] # parse authentication
if Path(file).is_file():'Found {url} locally at {file}') # file already exists
safe_download(file=file, url=url, min_bytes=1E5)
return file
# GitHub assets
assets = [
'', '', '', '', '', '', '',
'', '', '']
tag, assets = github_assets(repo, release)
except Exception:
tag, assets = github_assets(repo) # latest release
except Exception:
tag = subprocess.check_output('git tag', shell=True, stderr=subprocess.STDOUT).decode().split()[-1]
except Exception:
tag = release
file.parent.mkdir(parents=True, exist_ok=True) # make parent dir (if required)
if name in assets:
url3 = '' # backup gdrive mirror
url2=f'{repo}/{tag}/{name}', # backup url (optional)
error_msg=f'{file} missing, try downloading from{repo}/releases/{tag} or {url3}')
return str(file)
def gdrive_download(id='16TiPfZj7htmTyhntwcZyEEAejOUxuT6m', file=''):
# Downloads a file from Google Drive. from yolov5.utils.downloads import *; gdrive_download()
t = time.time()
file = Path(file)
cookie = Path('cookie') # gdrive cookie
print(f'Downloading{id} as {file}... ', end='')
if file.exists():
file.unlink() # remove existing file
if cookie.exists():
cookie.unlink() # remove existing cookie
# Attempt file download
out = "NUL" if platform.system() == "Windows" else "/dev/null"
os.system(f'curl -c ./cookie -s -L "{id}" > {out}')
if os.path.exists('cookie'): # large file
s = f'curl -Lb ./cookie "{get_token()}&id={id}" -o {file}'
else: # small file
s = f'curl -s -L -o {file} "{id}"'
r = os.system(s) # execute, capture return
if cookie.exists():
cookie.unlink() # remove existing cookie
# Error check
if r != 0:
if file.exists():
file.unlink() # remove partial
print('Download error ') # raise Exception('Download error')
return r
# Unzip if archive
if file.suffix == '.zip':
print('unzipping... ', end='')
ZipFile(file).extractall(path=file.parent) # unzip
file.unlink() # remove zip
print(f'Done ({time.time() - t:.1f}s)')
return r
def get_token(cookie="./cookie"):
with open(cookie) as f:
for line in f:
if "download" in line:
return line.split()[-1]
return ""
# Google utils: ----------------------------------------------
# def upload_blob(bucket_name, source_file_name, destination_blob_name):
# # Uploads a file to a bucket
# #
# storage_client = storage.Client()
# bucket = storage_client.get_bucket(bucket_name)
# blob = bucket.blob(destination_blob_name)
# blob.upload_from_filename(source_file_name)
# print('File {} uploaded to {}.'.format(
# source_file_name,
# destination_blob_name))
# def download_blob(bucket_name, source_blob_name, destination_file_name):
# # Uploads a blob from a bucket
# storage_client = storage.Client()
# bucket = storage_client.get_bucket(bucket_name)
# blob = bucket.blob(source_blob_name)
# blob.download_to_filename(destination_file_name)
# print('Blob {} downloaded to {}.'.format(
# source_blob_name,
# destination_file_name))