import json
class DeviceConfigManager:
_deviceConfig = None
def __init__(self):
if self._deviceConfig is None:
self._deviceConfig = self.load()
def get(self, *inherit_keys):
if inherit_keys is None or 0 == len(inherit_keys):
return self._deviceConfig
sc = self._deviceConfig
for key in inherit_keys:
sc = sc[key]
return sc
def update(self, value, *inherit_keys):
pass
# if inherit_keys is None or 0 == len(inherit_keys):
# raise 'inherit_keys can\'t empty'
# current_key_config = self._deviceConfig
# for i in range(0, len(inherit_keys)):
# key = inherit_keys[i]
# if type(current_key_config) == dict and key not in current_key_config:
# if i == len(inherit_keys) - 1:
# current_key_config[key] = value
# else:
# current_key_config[key] = {}
# else:
# current_key_config = current_key_config[key]
def load(self):
device_config = None
with open('learn/demo_class/device_config.json', 'r') as device_config_file:
device_config = json.load(device_config_file)
return device_config
if __name__ == '__main__':
device_config_manager = DeviceConfigManager()
print(device_config_manager)
print(device_config_manager.get())
print(device_config_manager.get('wifi_config'))
print(device_config_manager.get('wifi_config', 'ssid'))
print(device_config_manager.get('wifi_config', 'password'))
print(DeviceConfigManager.get('wifi_config', 'password'))
分类: python
-
class
-
run_with_args.py
import sys print(__name__) if __name__ == '__main__': print('start ...') print(sys.argv[0]) print(sys.argv[1]) print(sys.argv[2]) -
推导式
# expression for item in iterable if condition # list推导式 lst = [i for i in range(10)] print(lst) lst = [i for i in range(10) if 0 == i % 2] print(lst) # 字典推导式 dict = {i:f"renho{i}" for i in range(10)} print(dict) # 没有元组推导式,叫生成器表达式 s0 = {0, 1, 2, 3} s1 = {1, 2, 3} result = {item for pair in [s0, s1] for item in pair} print(result) -
生成器
# 本质是迭代器 # 可以迭代, 节省内存 def generator(): try: yield [i for i in range(10)] yield [i for i in range(10) if 0 == i % 2] print('try end') finally: print('finally') g = generator() print(g.__next__()) # print(g.__next__()) print('='*50) # for g in generator(): # print(g) datas = [i for i in generator()] print(datas[0] + datas[1]) -
downloader
import os import asyncio import aiohttp import aiofiles semaphore = asyncio.Semaphore(10) async def get_file(url, dir, file_name=None, file_suf=None, session=None): if session is None: session = aiohttp.ClientSession() if file_name is None: file_name = url.split("/")[-1].split(".")[0:-1] if file_suf is None: file_suf = url.split("/")[-1].split(".")[-1] try: headers = { "User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1.6) ", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-us", "Connection": "keep-alive", "Accept-Charset": "GB2312,utf-8;q=0.7,*;q=0.7"} file_path = r'{}/{}.{}'.format(dir, file_name, file_suf) if os.path.exists(file_path): pass async with semaphore: async with session.get(url, headers=headers) as res: res = await res.content.read() async with aiofiles.open(file_path, 'ab') as f1: await f1.write(res) await f1.flush() print('下载完成:{}'.format(url)) except TypeError: print('文件链接为空,跳过!') -
wrapper
"""Module providing a function printing python version.""" def wrapper(fn): """Function printing python version.""" def inner(*args, **kwargs): print('wrapper start') ret = fn(*args, **kwargs) print('wrapper end') return ret return inner def wrapper1(fn): """Function printing python version.""" def inner(*args, **kwargs): print('wrapper1 start') ret = fn(*args, **kwargs) print('wrapper1 end') return ret return inner @wrapper1 @wrapper def show(name): """Function printing python version.""" print(f"show: {name}") return {name: name} result = show('renho') print('done: ' + str(result)) -
线程池
import time import threading from concurrent.futures import ThreadPoolExecutor, as_completed import _thread import demo_requests import asyncio def task(task_id): for run_count in range(5): print(f"id: {task_id} run times: {run_count}") time.sleep(3) def handle_result(future): print(future) pass async def handle(): print('handle') pass def start_multi_thread(threadPoolNum, count): pool = ThreadPoolExecutor(max_workers=threadPoolNum) futures = [] for i in range(count): future = pool.submit(task, i) futures.append(future) print(len(futures)) for future in as_completed(futures): print("for each futures start") future.add_done_callback(handle_result) print("for each futures end") pool.shutdown() if __name__ == '__main__': # start_multi_thread(5, 10) # print('done') t = _thread.start_new_thread(asyncio.run, [handle()]) print(t) -
pandas
读文件
import pandas as pd df = pd.read_excel('data.xlsx') df = pd.read_csv('data.csv') df.columns.tolist() merged_data = pd.merge(df_a, df_b, left_on='id', right_on='aid', how='inner') df.drop('id', axis=1) df = df[df['name'] == 'nnname'] df.to_excel('result.xlsx', index=False) df.head() df.groupby('callno') df['name'].apply(lambda x: is_valid_text(x)) result = df['name'].copy() faq_list = df['答案文本'].dropna().unique().tolist() faq_list = df.iloc[:, 2].dropna().unique().tolist() result_df = pd.DataFrame(results) filtered_df = result_df[result_df['Q相似度得分'] > 90].copy() result_df['Q相似度得分'].mean():.2f filtered_df['更适合的答案'].value_counts().to_string() merged = pd.merge( lx_grouped, yx_grouped, on=['intent', 'best_match'], suffixes=('_lx', '_yx'), how='outer' ) for _, row in merged.iterrows(): rows.append({}) df.values: df.to_dict(orient='records') list(df_e['会话id']) df = df.sort_values(['callno', 'speake_no']).reset_index(drop=True) df.sheet_names df.count() -
async io
import asyncio import threading # 传入name参数: async def hello(name): # 打印name和当前线程: print("Hello %s! (%s)" % (name, threading.current_thread)) # 异步调用asyncio.sleep(1): await asyncio.sleep(10) print("Hello %s again! (%s)" % (name, threading.current_thread)) return name # syncio.ensure_future是Python asyncio库中用于安排协程或Future对象执行的函数,其核心作用是将可等待对象(如协程或Future)加入任务队列并确保其被调度执行,即使未被显式await async def main(): L = await asyncio.gather(hello("Bob"), hello("Alice")) #L = await asyncio.wait(hello("Bob"), hello("Alice")) print(L) asyncio.run(main()) -
async for
import asyncio import time print("\n\n=== 2. 异步模式开始 (请注意观察心跳) ===") # 这是一个背景任务,模拟 UI 动画或者心跳检测 async def background_heartbeat(): while True: print(" 💓 (背景任务) 咚-咚-咚... 程序还活着!") await asyncio.sleep(0.3) # 每0.3秒跳动一次 # 异步生成器 async def async_stream(): for i in range(3): print(f"🔄 (生成器内部) 正在用力下载第 {i+1} 个包...") # 【挂起】告诉 Python:我要等1秒,这期间你去处理那个“心跳”任务吧 await asyncio.sleep(1) yield f"📦 包 {i+1} 下载完成" async def main(): # 1. 启动背景心跳任务 task = asyncio.create_task(background_heartbeat()) start_time = time.time() # 2. 开始 Async For 循环 # 关键点:每次在这里“等”数据的时候,上面的 background_heartbeat 就会插队运行 async for data in async_stream(): print(f"✅ 主程序收到: {data}\n") # 停止心跳任务 task.cancel() print(f"=== 异步模式结束,总耗时: {time.time() - start_time:.2f}秒 ===") # 运行 asyncio.run(main())