File size: 1,901 Bytes
117b368 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
import csv
import os
# 续写CSV
def append_to_csv(data,
csv_file_path="../data/temp/temp.csv"):
# 检查文件是否存在及是否为空
if not os.path.exists(csv_file_path) or os.stat(csv_file_path).st_size == 0:
# 文件不存在或为空,需要写入表头
fieldnames = list(data.keys())
with open(csv_file_path, 'w', newline='', encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
with open(csv_file_path, 'r', newline='', encoding='utf-8') as file:
reader = csv.DictReader(file)
fieldnames = reader.fieldnames
with open(csv_file_path, 'a', newline='', encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writerow(data)
# 读取CSV文件并返回字典
def read_csv_to_dicts(filename='../data/temp/temp.csv'):
data = []
with open(filename, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
data.append(row)
return data
# 将字典列表写入CSV文件
def dict_to_csv(data, csv_file_path='./data/temp/temp.csv'):
with open(csv_file_path, mode='w', newline='', encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=list(data[0].keys()))
writer.writeheader()
writer.writerows(data)
# 从CSV中读取指定字段的所有值
def get_field_values(file_path, field_name):
if not os.path.exists(file_path):
return None
data = read_csv_to_dicts(file_path)
values = [row[field_name] for row in data if field_name in row]
return values
# 检查值是否存在于CSV
def check_in_csv(data, file_path, field_name):
datas = get_field_values(file_path, field_name)
if datas == None:
return False
return True if data in datas else False
|