python脚本请求数量达到上限,http请求重试问题
更新时间:2024年06月27日 09:54:05 作者:拾牙慧者
这篇文章主要介绍了python脚本请求数量达到上限,http请求重试问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
python请求数量达到上限,http请求重试
由于在内网发送http请求同一个token会限制次数,所以很容易达到网关流量上限。
业务中使用了多线程并发,一个线程发起一次http请求,得到正确结果后返回。
这里采用的策略是,如果解析出来达到流量上限,那么该线程休眠一段时间,然后重试请求,如果还是失败,那么继续休眠,每次休眠的时间随着重试轮次增加:
# 探测是否触及网关流量上限 def probe_func(m_url, m_headers, m_json, m_timeout): json_rep = requests.post(url = m_url, headers = m_headers, json = m_json, timeout = m_timeout) zhiyan_data = json_rep.json() if(zhiyan_data['code'] != 0): return None else: return json_rep # 解析数据包,不涉及probe_func中的检测内容 def parse(json_rep, room_name, metric_name): if json_rep == None: logging.info(room_name + " json_rep == None") return 0 if (json_rep.content and json_rep.status_code != 204 and json_rep.headers["content-type"].strip().startswith("application/json")): zhiyan_data = json_rep.json() if len(zhiyan_data['data']) == 0: logging.warning(zhiyan_data['日志信息拉取无结果']) return 0 else: res = zhiyan_data['data']['chart_info'][0]['key_data_list'][3]['current'] logging.info(room_name + str(res)) if str(res) == "None": logging.warning(room_name + ":拉取zhiyan_data:" + metric_name + " 出现了问题,拉取数据为None") return 0 else: return res else: return 0 # 具有可靠性地获取数据 def request_post_reliable(m_url, m_headers, m_json, m_timeout): sleep_time_s = 1 sleep_time_max = 60 res = probe_func(m_url, m_headers, m_json, m_timeout) # 如果探测失败则线程睡眠一段时间后再尝试 while (res == None): logging.info("探测失败,线程睡眠"+str(sleep_time_s)+"秒") time.sleep(sleep_time_s) tmp = sleep_time_s * 2 if tmp < sleep_time_max: sleep_time_s = tmp else: sleep_time_s = sleep_time_max logging.info("睡眠结束,线程重新探测") res = probe_func(m_url, m_headers, m_json, m_timeout) # 直到探测成功,返回正确结果 return res
python请求http/https时设置失败重试次数
使用Python的requests库时,默认是没有失败时重试请求的,通过下面的方式可以支持重试请求
设置请求时的重试规则
import requests from requests.adapters import HTTPAdapter s = requests.Session() a = HTTPAdapter(max_retries=3) b = HTTPAdapter(max_retries=3) #将重试规则挂载到http和https请求 s.mount('http://', a) s.mount('https://', b)
请求Url
上面设置完毕后,通过改Session的请求就可以支持失败重试
r = s.get('http://api.map.baidu.com/geocoder?location=39.90733345,116.391244079988&output=json') # 返回的状态码 r.status_code # 响应内容,中文为utf8编码 r.content # 响应的字符串形式,中文为unicode编码 r.text # 响应头中的编码 r.encoding # 响应头信息 r.headers
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
最新评论