PTP / download_cc3m.py
Awiny's picture
Upload download_cc3m.py
1432af7
import pandas as pd
import numpy as np
import requests
import zlib
import os
import shelve
import magic #pip install python-magic
from multiprocessing import Pool
from tqdm import tqdm
# headers = {
# #'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36',
# 'User-Agent':'Googlebot-Image/1.0', # Pretend to be googlebot
# 'X-Forwarded-For': '64.18.15.200'
# }
def _df_split_apply(tup_arg):
split_ind, subset, func = tup_arg
r = subset.apply(func, axis=1)
return (split_ind, r)
def df_multiprocess(df, processes, chunk_size, func, dataset_name):
print("Generating parts...")
with shelve.open('%s_%s_%s_results.tmp' % (dataset_name, func.__name__, chunk_size)) as results:
pbar = tqdm(total=len(df), position=0)
# Resume:
finished_chunks = set([int(k) for k in results.keys()])
pbar.desc = "Resuming"
for k in results.keys():
pbar.update(len(results[str(k)][1]))
pool_data = ((index, df[i:i + chunk_size], func) for index, i in enumerate(range(0, len(df), chunk_size)) if index not in finished_chunks)
print(int(len(df) / chunk_size), "parts.", chunk_size, "per part.", "Using", processes, "processes")
pbar.desc = "Downloading"
with Pool(processes) as pool:
for i, result in enumerate(pool.imap_unordered(_df_split_apply, pool_data, 2)):
results[str(result[0])] = result
pbar.update(len(result[1]))
pbar.close()
print("Finished Downloading.")
return
# Unique name based on url
def _file_name(row):
row.name = str(int(row.name) // 1000)
return "%s/%s_%s.jpg" % (row['folder'], row.name, (zlib.crc32(row['url'].encode('utf-8')) & 0xffffffff))
# For checking mimetypes separately without download
def check_mimetype(row):
if os.path.isfile(str(row['file'])):
row['mimetype'] = magic.from_file(row['file'], mime=True)
row['size'] = os.stat(row['file']).st_size
return row
# Don't download image, just check with a HEAD request, can't resume.
# Can use this instead of download_image to get HTTP status codes.
def check_download(row):
fname = _file_name(row)
sub_dir = fname.split('_')[0]
if not os.path.exists(sub_dir):
os.mkdir(sub_dir)
fname = '/'.join(fname.split('_'))
try:
# not all sites will support HEAD
response = requests.head(row['url'], stream=False, timeout=5, allow_redirects=True ) #, headers=headers)
row['status'] = response.status_code
row['headers'] = dict(response.headers)
except:
# log errors later, set error as 408 timeout
row['status'] = 408
return row
if response.ok:
row['file'] = fname
return row
def download_image(row):
# print(row)
fname = _file_name(row)
sub_dir = fname.split('_')[0]
if not os.path.exists(sub_dir):
os.mkdir(sub_dir)
fname = '/'.join(fname.split('_'))
# print(fname)
# Skip Already downloaded, retry others later
if os.path.isfile(fname):
row['status'] = 200
row['file'] = fname
row['mimetype'] = magic.from_file(row['file'], mime=True)
row['size'] = os.stat(row['file']).st_size
return row
try:
# use smaller timeout to skip errors, but can result in failed downloads
response = requests.get(row['url'], stream=False, timeout=10, allow_redirects=True ) # , headers=headers)
row['status'] = response.status_code
#row['headers'] = dict(response.headers)
except Exception as e:
# log errors later, set error as 408 timeout
row['status'] = 408
return row
if response.ok:
try:
with open(fname, 'wb') as out_file:
# some sites respond with gzip transport encoding
response.raw.decode_content = True
out_file.write(response.content)
row['mimetype'] = magic.from_file(fname, mime=True)
row['size'] = os.stat(fname).st_size
except:
# This is if it times out during a download or decode
row['status'] = 408
return row
row['file'] = fname
return row
def open_tsv(fname, folder):
print("Opening %s Data File..." % fname)
df = pd.read_csv(fname, sep='\t', names=["caption","url"], usecols=range(1,2))
df['folder'] = folder
print("Processing", len(df), " Images:")
return df
def df_from_shelve(chunk_size, func, dataset_name):
print("Generating Dataframe from results...")
with shelve.open('%s_%s_%s_results.tmp' % (dataset_name, func.__name__, chunk_size)) as results:
keylist = sorted([int(k) for k in results.keys()])
df = pd.concat([results[str(k)][1] for k in keylist], sort=True)
return df
# number of processes in the pool can be larger than cores
num_processes = 32
# chunk_size is how many images per chunk per process - changing this resets progress when restarting.
images_per_part = 100
# should download 15840
data_name = "/CC3M/images/validation"
df = open_tsv("/CC3M/validation.tsv", data_name)
df_multiprocess(df=df, processes=num_processes, chunk_size=images_per_part, func=download_image, dataset_name=data_name)
df = df_from_shelve(chunk_size=images_per_part, func=download_image, dataset_name=data_name)
df.to_csv("%s_report.tsv.gz" % data_name, compression='gzip', sep='\t', header=False, index=False)
# print("Saved.")
# # should download 3318333
# data_name = "CC3M/images/train"
# df = open_tsv("CC3M/Train_GCC-training.tsv",data_name)
# df_multiprocess(df=df, processes=num_processes, chunk_size=images_per_part, func=download_image, dataset_name=data_name)
# df = df_from_shelve(chunk_size=images_per_part, func=download_image, dataset_name=data_name)
# df.to_csv("%s_report.tsv.gz" % data_name, compression='gzip', sep='\t', header=False, index=False)
# print("Saved.")
# # 3334173 images in total