一个读取excel数据处理完成后读入数据库的例子

最近收集了一批数据,各地根据问题数据做出反馈,但是各地在反馈的时候字段都进行了创新,好在下发的数据内容并没有改变,开始写的单进程的,由于时间较长,耗时380+秒,又改成多进程的,时间缩短为80-秒。现在把程序发出来,请各位大神进行指正。

 import multiprocessing
import os
import time
import pandas as pd
from sqlalchemy import create_engine
import asyncio
import  warnings

# warnings.simplefilter("ignore")
ywstrList=['经办机构', '原子业务编号', '原子业务名称', '风险名称','风险描述',
           '校验规则结果', '创建时间', '风险提示信息', '业务日志号']
ywstrListMemo=['经办机构', '原子业务编号', '原子业务名称', '风险名称','风险描述',
           '校验规则结果', '创建时间', '风险提示信息', '业务日志号','memo','time']
szlist = ['省本', '成都', '自贡', '攀枝', '泸州', '德阳', '绵阳', '广元',
              '遂宁', '内江', '乐山', '南充', '眉山', '宜宾', '广安', '达州',
              '雅安', '巴中', '资阳', '阿坝', '甘孜', '凉山']
# connect = create_engine('mysql+pymysql://root:@127.0.0.1:3306/ywgk?charset=utf8')
connect = create_engine('mysql+mysqlconnector://root:@127.0.0.1:3306/ywgk?charset=utf8')
# engine = create_engine('mysql+mysqlconnector://scott:tiger@localhost/foo')

def findSZ(filename):
    for sz in szlist:
        if filename.find(sz) != -1:
            return sz
    return None


def ReadExcel(filename):
    xlsdf = ''
    xlsdf = pd.read_excel(filename)
    """"
    remove columns='sz' or '市'
    """
    if "市" in list(xlsdf.keys().to_list()):
        xlsdf.drop(columns='市', axis=1, inplace=True)
    if "sz" in list(xlsdf.keys().to_list()):
        xlsdf.drop(columns='sz', axis=1, inplace=True)
    xlsdf = xlsdf.fillna("").astype('string')
    return xlsdf


def filterDataOfSz(filename, xlsdf):
    sz = findSZ(filename)
    print(sz)
    """
    筛选出包含对应市州的数据。
    """
    if sz != None:
        xlsdf = xlsdf[xlsdf['经办机构'].str[:2] == sz]  # 筛出本市州数据
        return xlsdf


def ConCatRestCols(xlsdf):
    """
    去掉业务部分字段,保留市州反馈意见。
    """
    print(xlsdf)
    # if xlsdf==None:
    #     print(filename+"为空,需要处理")
    #     return
    xlfdf_keys_set = set(xlsdf.keys().to_list())
    xlsdf_restkeys_set = xlfdf_keys_set - set(ywstrList)
    xlsdf_restkeys_list = list(xlsdf_restkeys_set)
    xls_rest_df = xlsdf.loc[:, xlsdf_restkeys_list]  # 可以正确操作
    xlsdf['memo'] = '#'
    for col in xlsdf_restkeys_list:
        xlsdf['memo'] += xlsdf[col]

    #
    return xlsdf


def SetTimeStamp(filename, xlsdf):
    xlsdf['time'] = os.stat(str(filename)).st_mtime
    return xlsdf

async def ProcessExcelAndtosql(filename, table):
    df = ReadExcel(filename)
    df = filterDataOfSz(filename=filename, xlsdf=df)
    df = ConCatRestCols(df)
    df = SetTimeStamp(filename=filename, xlsdf=df)
    df = df.loc[:, ywstrListMemo]
    print(filename)
    print(df)
    df.to_sql(name=table, con=connect, if_exists='append', index=False, chunksize=1000, method='multi')
def profile(func):
    def wrapers(*args,**kwargs):
        print("测试开始")
        begin=time.time()
        func(*args,**kwargs)
        end=time.time()
        print(f"耗时{end-begin}秒")
    return wrapers

# async def getmsg(msg):
#     print(f'#{msg}')
#     await asyncio.sleep(1)

def getFiles(src:str):
    import pathlib
    files=[]
    for file in  pathlib.Path(src).rglob("*.xls?"):
        files.append(str(file))
    return files
def process_asyncio(files,table):
    loop=asyncio.new_event_loop()
    tasks=[loop.create_task(ProcessExcelAndtosql(filename,table)) for filename in files]
    loop.run_until_complete(asyncio.wait(tasks))
@profile
def run(iterable,table):
    process_count = multiprocessing.cpu_count()

    # print(process_count)
    pool = multiprocessing.Pool(process_count-2)
    iterable=get_chunks(iterable, process_count)
    for lst in iterable:
        pool.apply_async(process_asyncio, args=(lst,table))
    pool.close()
    pool.join()
def main():
    files = getFiles(r"e:市州返回")
    run(files, 'ywgk3')

def get_chunks(iterable,num):
    # global iterable
    import numpy as np
    return np.array_split(iterable, num)

# import  profile
if __name__=="__main__":    
    main()                   

本人只是编程的业余爱好者,只是把技术用于辅助工作,并没有深入研究技术理论,都是野路子,还请批评指正。

展开阅读全文

页面更新:2024-05-14

标签:阿坝   数据处理   字段   原子   例子   反馈   名称   风险   机构   数据库   业务   时间   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top