import os
import asyncio
import aiohttp
import aiofiles
semaphore = asyncio.Semaphore(10)
async def get_file(url, dir, file_name=None, file_suf=None, session=None):
if session is None:
session = aiohttp.ClientSession()
if file_name is None:
file_name = url.split("/")[-1].split(".")[0:-1]
if file_suf is None:
file_suf = url.split("/")[-1].split(".")[-1]
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1.6) ",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-us",
"Connection": "keep-alive",
"Accept-Charset": "GB2312,utf-8;q=0.7,*;q=0.7"}
file_path = r'{}/{}.{}'.format(dir, file_name, file_suf)
if os.path.exists(file_path):
pass
async with semaphore:
async with session.get(url, headers=headers) as res:
res = await res.content.read()
async with aiofiles.open(file_path, 'ab') as f1:
await f1.write(res)
await f1.flush()
print('下载完成:{}'.format(url))
except TypeError:
print('文件链接为空,跳过!')
作者: renho
-
downloader
-
wrapper
"""Module providing a function printing python version.""" def wrapper(fn): """Function printing python version.""" def inner(*args, **kwargs): print('wrapper start') ret = fn(*args, **kwargs) print('wrapper end') return ret return inner def wrapper1(fn): """Function printing python version.""" def inner(*args, **kwargs): print('wrapper1 start') ret = fn(*args, **kwargs) print('wrapper1 end') return ret return inner @wrapper1 @wrapper def show(name): """Function printing python version.""" print(f"show: {name}") return {name: name} result = show('renho') print('done: ' + str(result)) -
线程池
import time import threading from concurrent.futures import ThreadPoolExecutor, as_completed import _thread import demo_requests import asyncio def task(task_id): for run_count in range(5): print(f"id: {task_id} run times: {run_count}") time.sleep(3) def handle_result(future): print(future) pass async def handle(): print('handle') pass def start_multi_thread(threadPoolNum, count): pool = ThreadPoolExecutor(max_workers=threadPoolNum) futures = [] for i in range(count): future = pool.submit(task, i) futures.append(future) print(len(futures)) for future in as_completed(futures): print("for each futures start") future.add_done_callback(handle_result) print("for each futures end") pool.shutdown() if __name__ == '__main__': # start_multi_thread(5, 10) # print('done') t = _thread.start_new_thread(asyncio.run, [handle()]) print(t) -
pandas
读文件
import pandas as pd df = pd.read_excel('data.xlsx') df = pd.read_csv('data.csv') df.columns.tolist() merged_data = pd.merge(df_a, df_b, left_on='id', right_on='aid', how='inner') df.drop('id', axis=1) df = df[df['name'] == 'nnname'] df.to_excel('result.xlsx', index=False) df.head() df.groupby('callno') df['name'].apply(lambda x: is_valid_text(x)) result = df['name'].copy() faq_list = df['答案文本'].dropna().unique().tolist() faq_list = df.iloc[:, 2].dropna().unique().tolist() result_df = pd.DataFrame(results) filtered_df = result_df[result_df['Q相似度得分'] > 90].copy() result_df['Q相似度得分'].mean():.2f filtered_df['更适合的答案'].value_counts().to_string() merged = pd.merge( lx_grouped, yx_grouped, on=['intent', 'best_match'], suffixes=('_lx', '_yx'), how='outer' ) for _, row in merged.iterrows(): rows.append({}) df.values: df.to_dict(orient='records') list(df_e['会话id']) df = df.sort_values(['callno', 'speake_no']).reset_index(drop=True) df.sheet_names df.count() -
async io
import asyncio import threading # 传入name参数: async def hello(name): # 打印name和当前线程: print("Hello %s! (%s)" % (name, threading.current_thread)) # 异步调用asyncio.sleep(1): await asyncio.sleep(10) print("Hello %s again! (%s)" % (name, threading.current_thread)) return name # syncio.ensure_future是Python asyncio库中用于安排协程或Future对象执行的函数,其核心作用是将可等待对象(如协程或Future)加入任务队列并确保其被调度执行,即使未被显式await async def main(): L = await asyncio.gather(hello("Bob"), hello("Alice")) #L = await asyncio.wait(hello("Bob"), hello("Alice")) print(L) asyncio.run(main()) -
async for
import asyncio import time print("\n\n=== 2. 异步模式开始 (请注意观察心跳) ===") # 这是一个背景任务,模拟 UI 动画或者心跳检测 async def background_heartbeat(): while True: print(" 💓 (背景任务) 咚-咚-咚... 程序还活着!") await asyncio.sleep(0.3) # 每0.3秒跳动一次 # 异步生成器 async def async_stream(): for i in range(3): print(f"🔄 (生成器内部) 正在用力下载第 {i+1} 个包...") # 【挂起】告诉 Python:我要等1秒,这期间你去处理那个“心跳”任务吧 await asyncio.sleep(1) yield f"📦 包 {i+1} 下载完成" async def main(): # 1. 启动背景心跳任务 task = asyncio.create_task(background_heartbeat()) start_time = time.time() # 2. 开始 Async For 循环 # 关键点:每次在这里“等”数据的时候,上面的 background_heartbeat 就会插队运行 async for data in async_stream(): print(f"✅ 主程序收到: {data}\n") # 停止心跳任务 task.cancel() print(f"=== 异步模式结束,总耗时: {time.time() - start_time:.2f}秒 ===") # 运行 asyncio.run(main()) -
python基础
集合
# set s1 = {1, 2, 3, 4} print(type(s1)) print(s1) s1.add(5) s1.add(6) print(s1) s2 = {5, 6, 7, 8} s3 = s1 | s2 print(s3) s3.remove(3) s3.discard(4) print(s3) s4 = {s for s in range(10, 20)} print(s4) t0 = (1, 2, 3, 4) print(t0) m = { "a": "a", "b": "", "c": None } print(m.get("a")) print(m.get("b")) print(m.get("c")) print(m.get("d")) print("--------") print(m.get("a", "aa")) print(m.get("b", "bb")) print(m.get("c", "cc")) print(m.get("d", "dd")) print("--------") print(m.get("a") if m.get("a") else "aa") print(m.get("b") if m.get("b") else "bb") print(m.get("c") if m.get("c") else "cc") print(m.get("d") if m.get("d") else "dd")内置函数
################## zip list0 = ["a", "b", "c"] list1 = [1, 2, 3] list2 = ["A", "B", "C"] result = zip(list0, list1, list2) print(str(result)) # for item in result: # print(item) list3 = list(result) print(list3) print('----------') print(locals()) print(globals()) ################## sorted list0 = ["a", "abb", "adddd", "accc"] result = sorted(list0, key=lambda k: len(k)) print(result) ################## filter result = filter(lambda s: "adddd" == s, list0) print(list(result)) ################## map result = map(lambda i : i + i, list0) print(list(result)) ################## reduce s0 = {0, 1, 2, 3} s1 = {1, 2, 3} from functools import reduce result = reduce(lambda a, b: a | b, [s0, s1]) print('67' in '67@1@dhxy') ################## zip list0 = ["a", "b", "c"] list1 = [1, 2, 3] list2 = ["A", "B", "C"] result = zip(list0, list1, list2) print(str(result)) # for item in result: # print(item) list3 = list(result) print(list3) print('----------') print(locals()) print(globals()) ################## sorted list0 = ["a", "abb", "adddd", "accc"] result = sorted(list0, key=lambda k: len(k)) print(result) ################## filter result = filter(lambda s: "adddd" == s, list0) print(list(result)) ################## map result = map(lambda i : i + i, list0) print(list(result)) ################## reduce s0 = {0, 1, 2, 3} s1 = {1, 2, 3} from functools import reduce result = reduce(lambda a, b: a | b, [s0, s1]) print(result)