python requests实现上传excel数据流
requests上传excel数据流
headers=self.headers #获取导入模版 file_home = self.import_template log.info(file_home) wb = load_workbook(filename=file_home) ws = wb['sheet1'] # 修改产废单位名称,以及备注 ws['b3'] = 'itest产废单位'+self.dic["t"] ws['s3'] = 'i原料销售'+self.dic["t"] wb.save(file_home) url=self.url_1+"/companies/import?companyType=2" payload={} m=MultipartEncoder( fields={ "parent_dir":'/', "name":'file', "filename":'name.xlsx', 'file':('name.xlsx',open(file_home,'rb'),'application/vnd.ms-excel') } ) headers['Content-Type']=m.content_type r=requests.post(url,headers=headers,data=m) log.info(r.json())
数据驱动之python+requests+excel
数据驱动
是根据数据来测试的,如读取 excel表中的测试用例自动填写测试结果,发送测试报告包括以下模块:
- 1.获取用例
- 2.调用接口
- 3.校验结果
- 4.发送测试报告
- 5.异常处理
- 6.日志模块
1. 首先设计好测试用例
2.建立文件结构
该自动化测试框架命名为:ATP,bin目录下写主程序,cases目录下放测试用例,conf目录下放配置文件,lib目录下放各个封装好的模块,logs目录下放日志文件,和readme文件。
3.封装模块
common.py:封装读取excel用例、调用接口、检验结果、写入报告这几个模块。
""" 第一步:读取excel中用例 第二步:根据用例发送请求 第三步:校验结果 第四步:将测试结果、返回报文写入excel """ import xlrd,requests from xlutils import copy from lib.log import atp_log class OpCase(object): def get_case(self,file_path): cases= [] #定义一个列表存放所有的cases if file_path.endswith('.xls') or file_path.endswith('.xlsx'): try: book = xlrd.open_workbook(file_path) sheet = book.sheet_by_index(0) for i in range(1,sheet.nrows): row_data = sheet.row_values(i) #获取的每一行数据存到列表row_data cases.append(row_data[4:8]) atp_log.info('共读取%s条用例'%(len(cases))) self.file_path = file_path #因为该函数已经传了参数路径,为方便write_excel引用,在此实例化 except Exception as e: atp_log.error('[%s]用例获取失败,错误信息:%s'%(file_path,e)) else: atp_log.error('用例文件不合法,%s'%file_path) return cases def my_request(self,url,method,data): data = self.dataToDict(data) try: if method.upper() == 'POST': res = requests.post(url,data).text elif method.uper() == 'GET': res = requests.get(url,params=data).text else: atp_log.warning('该请求方式暂不支持') res = '该请求方式暂不支持' except Exception as e: msg = '【%s】接口调用失败,%s'%(url,e) atp_log.error(msg) res = msg return res def dataToDict(self,data): #把数据转成字典。 res = {} data = data.split(',') for d in data: # k, v = d.split('=') res[k] = v def check_res(self,res,check): #res:实际结果,check:预期结果 res = res.replace('": "','=').replace('": ','=') for c in check.split(','): if c not in res: atp_log.info('结果校验失败,预期结果:【%s】,实际结果【%s】'%(c,res)) return '失败' return '成功' def write_excel(self,case_res): book = xlrd.open_workbook(self.file_path) new_book = copy.copy(book) sheet = new_book.get_sheet(0) row = 1 for case_case in case_res: sheet.write(row,8,case_case[0]) sheet.write(row,9,case_case[1]) row += 1 new_book.save(self.file_path.replace('xlsx','xls'))
log.py:封装日志模块
import logging,os from logging import handlers from conf import setting class Mylogger(): def __init__(self,file_name,level='info',backCount=5,when='D'): logger = logging.getLogger() # 先实例化一个logger对象,先创建一个办公室 logger.setLevel(self.get_level(level)) # 设置日志的级别 # f1 = logging.FileHandler(filename='a.log',mode='a',encoding='utf-8') #找到写日志文件的这个人 c1 = logging.StreamHandler() # 负责往控制台输出的 b1 = handlers.TimedRotatingFileHandler(filename=file_name, when=when, interval=1, backupCount=backCount, encoding='utf-8') fmt = logging.Formatter('%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s') c1.setFormatter(fmt) b1.setFormatter(fmt) logger.addHandler(c1) logger.addHandler(b1) self.logger = logger def get_level(self,str): level = { 'debug':logging.DEBUG, 'info':logging.INFO, 'warm':logging.WARNING, 'error':logging.ERROR } str = str.lower() return level.get(str) path = os.path.join(setting.LOG_PATH,setting.LOG_NAME) atp_log = Mylogger(path,'debug').logger #直接在这里实例化,用的时候不用再实例化了 #别的地方用的时候,直接atp_log.warnning('xxxx')
send_mail.py:封装发送邮件模块
import yagmail from conf import setting from lib.log import atp_log def sendmail(title,content,attrs=None): m = yagmail.SMTP(host=setting.MAIL_HOST,user=setting.MAIL_USER, password=setting.MAIL_PASSWRD,smtp_ssl=True) m.send(to=setting.TO, subject=title, contents = content, attachments = attrs) atp_log.info('发送邮件完成')
4.配置文件
setting.py,配置文件:设置邮件地址、日志默认级别、用例存放路径、日志存放路径、日志文件名
import os BASE_PATH = os.path.dirname( os.path.dirname(os.path.abspath(__file__)) ) #三层目录定位到ATP目录 MAIL_HOST = 'smtp.qq.com' MAIL_USER='12*****89@qq.com' MAIL_PASSWRD = 'gjn*****bcgh' TO = [ '12*****9@qq.com' ] LEVEL = 'debug' #设置日志默认级别 LOG_PATH = os.path.join(BASE_PATH,'logs') #日志文件在logs目录下 CASE_PATH = os.path.join(BASE_PATH,'cases') #用例文件在cases目录下 LOG_NAME = 'atp_log' #设置日志文件名
5.将ATP文件
Mark directory as Sources Root
6.编写主程序
start.py
import os,sys BASE_PATH = os.path.dirname( os.path.dirname(os.path.abspath(__file__)) ) sys.path.insert(0,BASE_PATH) from lib.common import OpCase from lib.send_mail import sendmail from conf import setting class CaseRun(object): def find_case(self): op = OpCase() for f in os.listdir(setting.CASE_PATH): #每次循环的时候读一个excel abs_path = os.path.join(setting.CASE_PATH,f) case_list = op.get_case(abs_path) res_list = [] pass_count,fail_count= 0,0 for case in case_list: #循环每一个excel里面的所有用例 url,method,req_data,check = case res = op.my_request(url,method,req_data) #调用完接口返回的结果 status = op.check_res(res,check) res_list.append([res,status]) if status == '通过': pass_count += 1 else: fail_count += 1 op.write_excel(res_list) msg = ''' xx你好, 本次共运行%s条用例,通过%s条,失败%s条。 '''%(len(res_list),pass_count,fail_count) sendmail('测试用例运行结果',content=msg,attrs=abs_path) CaseRun().find_case()
OK,数据驱动自动化测试框架编写完成,运行 start.py 程序,收到邮件内容如下:
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
Python报错ValueError: cannot reindex from
当处理Pandas数据框(DataFrame)时,你是否遇到过ValueError: cannot reindex from a duplicate axis的报错?这个问题通常发生在尝试对DataFrame进行重索引时,如果索引有重复值,就会触发这个错误,下面,我们将探讨这个问题并提供解决方法2024-09-09
最新评论