分类: python

  • class

    import json
    
    
    class DeviceConfigManager:
    
        _deviceConfig = None
    
        def __init__(self):
            if self._deviceConfig is None:
                self._deviceConfig = self.load()
    
        def get(self, *inherit_keys):
            if inherit_keys is None or 0 == len(inherit_keys):
                return self._deviceConfig
            sc = self._deviceConfig
            for key in inherit_keys:
                sc = sc[key]
            return sc
    
        def update(self, value, *inherit_keys):
            pass
            # if inherit_keys is None or 0 == len(inherit_keys):
            #     raise 'inherit_keys can\'t empty'
            # current_key_config = self._deviceConfig
            # for i in range(0, len(inherit_keys)):
            #     key = inherit_keys[i]
            #     if type(current_key_config) == dict and key not in current_key_config:
            #         if i == len(inherit_keys) - 1:
            #             current_key_config[key] = value
            #         else:
            #             current_key_config[key] = {}
            #     else:
            #     current_key_config = current_key_config[key]
    
    
        def load(self):
            device_config = None
            with open('learn/demo_class/device_config.json', 'r') as device_config_file:
                device_config = json.load(device_config_file)
            return device_config
    
    if __name__ == '__main__':
        device_config_manager = DeviceConfigManager()
        print(device_config_manager)
        print(device_config_manager.get())
        print(device_config_manager.get('wifi_config'))
        print(device_config_manager.get('wifi_config', 'ssid'))
        print(device_config_manager.get('wifi_config', 'password'))
        print(DeviceConfigManager.get('wifi_config', 'password'))
    
  • run_with_args.py

    import sys
    
    print(__name__)
    if __name__ == '__main__':
        print('start ...')
        print(sys.argv[0])
        print(sys.argv[1])
        print(sys.argv[2])
    
  • 推导式

    # expression for item in iterable if condition
    # list推导式
    lst = [i for i in range(10)]
    print(lst)
    lst = [i for i in range(10) if 0 == i % 2]
    print(lst)
    
    # 字典推导式
    dict = {i:f"renho{i}" for i in range(10)}
    print(dict)
    
    # 没有元组推导式,叫生成器表达式
    
    s0 = {0, 1, 2, 3}
    s1 = {1, 2, 3}
    result = {item for pair in [s0, s1] for item in pair}
    print(result)
    
  • 生成器

    # 本质是迭代器
    # 可以迭代, 节省内存
    def generator():
        try:
            yield [i for i in range(10)]
            yield [i for i in range(10) if 0 == i % 2]
            print('try end')
        finally:
            print('finally')
    
    
    g = generator()
    print(g.__next__())
    # print(g.__next__())
    
    print('='*50)
    
    # for g in generator():
    #     print(g)
    
    datas = [i for i in generator()]
    print(datas[0] + datas[1])
    
    
  • downloader

    import os
    import asyncio
    import aiohttp
    import aiofiles
    
    semaphore = asyncio.Semaphore(10)
    
    async def get_file(url, dir, file_name=None, file_suf=None, session=None):
        if session is None:
            session = aiohttp.ClientSession()
        if file_name is None:
            file_name = url.split("/")[-1].split(".")[0:-1]
        if file_suf is None:
            file_suf = url.split("/")[-1].split(".")[-1]
        try:
            headers = {
                "User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1.6) ",
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
                "Accept-Language": "en-us",
                "Connection": "keep-alive",
                "Accept-Charset": "GB2312,utf-8;q=0.7,*;q=0.7"}
            file_path = r'{}/{}.{}'.format(dir, file_name, file_suf)
            if os.path.exists(file_path):
                pass
            async with semaphore:
                async with session.get(url, headers=headers) as res:
                    res = await res.content.read()
                    async with aiofiles.open(file_path, 'ab') as f1:
                        await f1.write(res)
                        await f1.flush()
                        print('下载完成:{}'.format(url))
        except TypeError:
            print('文件链接为空,跳过!')
    
  • wrapper

    """Module providing a function printing python version."""
    
    
    def wrapper(fn):
        """Function printing python version."""
        def inner(*args, **kwargs):
            print('wrapper start')
            ret = fn(*args, **kwargs)
            print('wrapper end')
            return ret
        return inner
    
    def wrapper1(fn):
        """Function printing python version."""
        def inner(*args, **kwargs):
            print('wrapper1 start')
            ret = fn(*args, **kwargs)
            print('wrapper1 end')
            return ret
        return inner
    
    @wrapper1
    @wrapper
    def show(name):
        """Function printing python version."""
        print(f"show: {name}")
        return {name: name}
    
    result = show('renho')
    print('done: ' + str(result))
    
  • 线程池

    import time
    import threading
    from concurrent.futures import ThreadPoolExecutor, as_completed
    import _thread
    import demo_requests
    import asyncio
    
    
    def task(task_id):
        for run_count in range(5):
            print(f"id: {task_id} run times: {run_count}")
            time.sleep(3)
    
    def handle_result(future):  
        print(future)  
        pass
    
    async def handle():
        print('handle')
        pass
    
    def start_multi_thread(threadPoolNum, count):
        pool = ThreadPoolExecutor(max_workers=threadPoolNum)
        futures = []
        for i in range(count):
            future = pool.submit(task, i)
            futures.append(future)
        print(len(futures))
        for future in as_completed(futures):
            print("for each futures start")
            future.add_done_callback(handle_result)
            print("for each futures end")
        pool.shutdown()
    
    if __name__ == '__main__':
        # start_multi_thread(5, 10)
        # print('done')
        t = _thread.start_new_thread(asyncio.run, [handle()])
        print(t)
    
    
  • pandas

    读文件

    import pandas as pd
    
    df = pd.read_excel('data.xlsx')
    df = pd.read_csv('data.csv')
    df.columns.tolist()
    merged_data = pd.merge(df_a, df_b, left_on='id', right_on='aid', how='inner')
    df.drop('id', axis=1)
    df = df[df['name'] == 'nnname']
    df.to_excel('result.xlsx', index=False)
    df.head()
    df.groupby('callno')
    df['name'].apply(lambda x: is_valid_text(x))
    result = df['name'].copy()
    faq_list = df['答案文本'].dropna().unique().tolist()
    faq_list = df.iloc[:, 2].dropna().unique().tolist()
    result_df = pd.DataFrame(results)
    filtered_df = result_df[result_df['Q相似度得分'] > 90].copy()
    result_df['Q相似度得分'].mean():.2f
    filtered_df['更适合的答案'].value_counts().to_string()
    merged = pd.merge(
        lx_grouped, 
        yx_grouped, 
        on=['intent', 'best_match'], 
        suffixes=('_lx', '_yx'),
        how='outer'
    )
    for _, row in merged.iterrows():
    rows.append({})
    df.values:
    df.to_dict(orient='records')
    list(df_e['会话id'])
    df = df.sort_values(['callno', 'speake_no']).reset_index(drop=True)
    df.sheet_names
    df.count()
    
    
  • async io

    import asyncio
    import threading
    
    
    # 传入name参数:
    async def hello(name):
        # 打印name和当前线程:
        print("Hello %s! (%s)" % (name, threading.current_thread))
        # 异步调用asyncio.sleep(1):
        await asyncio.sleep(10)
        print("Hello %s again! (%s)" % (name, threading.current_thread))
        return name
    
    # syncio.ensure_future是Python asyncio库中用于安排协程或Future对象执行的函数,其核心作用是将可等待对象(如协程或Future)加入任务队列并确保其被调度执行,即使未被显式await
    
    async def main():
        L = await asyncio.gather(hello("Bob"), hello("Alice"))
        #L = await asyncio.wait(hello("Bob"), hello("Alice"))
        print(L)
    
    asyncio.run(main())
    
    
  • async for

    import asyncio
    import time
    
    print("\n\n=== 2. 异步模式开始 (请注意观察心跳) ===")
    
    # 这是一个背景任务,模拟 UI 动画或者心跳检测
    async def background_heartbeat():
        while True:
            print("   💓 (背景任务) 咚-咚-咚... 程序还活着!")
            await asyncio.sleep(0.3) # 每0.3秒跳动一次
    
    # 异步生成器
    async def async_stream():
        for i in range(3):
            print(f"🔄 (生成器内部) 正在用力下载第 {i+1} 个包...")
            # 【挂起】告诉 Python:我要等1秒,这期间你去处理那个“心跳”任务吧
            await asyncio.sleep(1) 
            yield f"📦 包 {i+1} 下载完成"
    
    async def main():
        # 1. 启动背景心跳任务
        task = asyncio.create_task(background_heartbeat())
    
        start_time = time.time()
    
        # 2. 开始 Async For 循环
        # 关键点:每次在这里“等”数据的时候,上面的 background_heartbeat 就会插队运行
        async for data in async_stream():
            print(f"✅ 主程序收到: {data}\n")
    
        # 停止心跳任务
        task.cancel()
        print(f"=== 异步模式结束,总耗时: {time.time() - start_time:.2f}秒 ===")
    
    # 运行
    asyncio.run(main())