File size: 6,007 Bytes
1432af7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import pandas as pd
import numpy as np
import requests
import zlib
import os
import shelve
import magic #pip install python-magic
from multiprocessing import Pool
from tqdm import tqdm
# headers = {
# #'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36',
# 'User-Agent':'Googlebot-Image/1.0', # Pretend to be googlebot
# 'X-Forwarded-For': '64.18.15.200'
# }
def _df_split_apply(tup_arg):
split_ind, subset, func = tup_arg
r = subset.apply(func, axis=1)
return (split_ind, r)
def df_multiprocess(df, processes, chunk_size, func, dataset_name):
print("Generating parts...")
with shelve.open('%s_%s_%s_results.tmp' % (dataset_name, func.__name__, chunk_size)) as results:
pbar = tqdm(total=len(df), position=0)
# Resume:
finished_chunks = set([int(k) for k in results.keys()])
pbar.desc = "Resuming"
for k in results.keys():
pbar.update(len(results[str(k)][1]))
pool_data = ((index, df[i:i + chunk_size], func) for index, i in enumerate(range(0, len(df), chunk_size)) if index not in finished_chunks)
print(int(len(df) / chunk_size), "parts.", chunk_size, "per part.", "Using", processes, "processes")
pbar.desc = "Downloading"
with Pool(processes) as pool:
for i, result in enumerate(pool.imap_unordered(_df_split_apply, pool_data, 2)):
results[str(result[0])] = result
pbar.update(len(result[1]))
pbar.close()
print("Finished Downloading.")
return
# Unique name based on url
def _file_name(row):
row.name = str(int(row.name) // 1000)
return "%s/%s_%s.jpg" % (row['folder'], row.name, (zlib.crc32(row['url'].encode('utf-8')) & 0xffffffff))
# For checking mimetypes separately without download
def check_mimetype(row):
if os.path.isfile(str(row['file'])):
row['mimetype'] = magic.from_file(row['file'], mime=True)
row['size'] = os.stat(row['file']).st_size
return row
# Don't download image, just check with a HEAD request, can't resume.
# Can use this instead of download_image to get HTTP status codes.
def check_download(row):
fname = _file_name(row)
sub_dir = fname.split('_')[0]
if not os.path.exists(sub_dir):
os.mkdir(sub_dir)
fname = '/'.join(fname.split('_'))
try:
# not all sites will support HEAD
response = requests.head(row['url'], stream=False, timeout=5, allow_redirects=True ) #, headers=headers)
row['status'] = response.status_code
row['headers'] = dict(response.headers)
except:
# log errors later, set error as 408 timeout
row['status'] = 408
return row
if response.ok:
row['file'] = fname
return row
def download_image(row):
# print(row)
fname = _file_name(row)
sub_dir = fname.split('_')[0]
if not os.path.exists(sub_dir):
os.mkdir(sub_dir)
fname = '/'.join(fname.split('_'))
# print(fname)
# Skip Already downloaded, retry others later
if os.path.isfile(fname):
row['status'] = 200
row['file'] = fname
row['mimetype'] = magic.from_file(row['file'], mime=True)
row['size'] = os.stat(row['file']).st_size
return row
try:
# use smaller timeout to skip errors, but can result in failed downloads
response = requests.get(row['url'], stream=False, timeout=10, allow_redirects=True ) # , headers=headers)
row['status'] = response.status_code
#row['headers'] = dict(response.headers)
except Exception as e:
# log errors later, set error as 408 timeout
row['status'] = 408
return row
if response.ok:
try:
with open(fname, 'wb') as out_file:
# some sites respond with gzip transport encoding
response.raw.decode_content = True
out_file.write(response.content)
row['mimetype'] = magic.from_file(fname, mime=True)
row['size'] = os.stat(fname).st_size
except:
# This is if it times out during a download or decode
row['status'] = 408
return row
row['file'] = fname
return row
def open_tsv(fname, folder):
print("Opening %s Data File..." % fname)
df = pd.read_csv(fname, sep='\t', names=["caption","url"], usecols=range(1,2))
df['folder'] = folder
print("Processing", len(df), " Images:")
return df
def df_from_shelve(chunk_size, func, dataset_name):
print("Generating Dataframe from results...")
with shelve.open('%s_%s_%s_results.tmp' % (dataset_name, func.__name__, chunk_size)) as results:
keylist = sorted([int(k) for k in results.keys()])
df = pd.concat([results[str(k)][1] for k in keylist], sort=True)
return df
# number of processes in the pool can be larger than cores
num_processes = 32
# chunk_size is how many images per chunk per process - changing this resets progress when restarting.
images_per_part = 100
# should download 15840
data_name = "/CC3M/images/validation"
df = open_tsv("/CC3M/validation.tsv", data_name)
df_multiprocess(df=df, processes=num_processes, chunk_size=images_per_part, func=download_image, dataset_name=data_name)
df = df_from_shelve(chunk_size=images_per_part, func=download_image, dataset_name=data_name)
df.to_csv("%s_report.tsv.gz" % data_name, compression='gzip', sep='\t', header=False, index=False)
# print("Saved.")
# # should download 3318333
# data_name = "CC3M/images/train"
# df = open_tsv("CC3M/Train_GCC-training.tsv",data_name)
# df_multiprocess(df=df, processes=num_processes, chunk_size=images_per_part, func=download_image, dataset_name=data_name)
# df = df_from_shelve(chunk_size=images_per_part, func=download_image, dataset_name=data_name)
# df.to_csv("%s_report.tsv.gz" % data_name, compression='gzip', sep='\t', header=False, index=False)
# print("Saved.")
# # 3334173 images in total |