网站建设的实训报告的实训感受,装饰网站设计模板,微网站报价,做网站公司 营销通过requests.session().request 封装request方法 考虑到请求HTTP/2.0 同时封装httpx 来处理HTTP/2.0的请求
封装requests
# 遇到请求失败的情况时 重新请求#xff0c;请求5次等待2s
retry(stop_max_attempt_number5, retry_on_resultlambda re_data: re_data is None, wai…通过requests.session().request 封装request方法 考虑到请求HTTP/2.0 同时封装httpx 来处理HTTP/2.0的请求
封装requests
# 遇到请求失败的情况时 重新请求请求5次等待2s
retry(stop_max_attempt_number5, retry_on_resultlambda re_data: re_data is None, wait_fixed2000)def requests_request(self, method, url, paramsNone, dataNone, jsonNone, headersNone, filesNone, verifyFalse,certNone, timeoutNone, proxiesNone, proxyNone, **kwargs):# 对异常进行捕获try:封装request请求将请求方法、请求地址请求参数、请求头等信息入参。注 verify: True/False默认为True认证SSL证书开关cert: 本地SSL证书。如果不需要ssl认证可将这两个入参去掉使用session管理器requests.session(): 维持会话,跨请求的时候保存参数 # 处理代理proxies Noneif proxy:proxies {http://: http:// proxy,https://: https:// proxy,}# 使用requests.session().request 请求re_data requests.session().request(method, url, paramsparams, datadata, jsonjson, headersheaders,filesfiles, certcert, timeouttimeout, verifyverify,proxiesproxies, **kwargs)# 异常处理 报错显示具体信息except Exception as e:re_data None# 打印异常print(请求失败{0}.format(e))logger.error(Error occurred: %s, str(e), exc_infoTrue)# 重新抛出异常触发 retry 机制raise e# 返回响应结果return re_data封装httpx
retry(stop_max_attempt_number5, retry_on_resultlambda re_data: re_data is None, wait_fixed2000)def httpx_request(self, method, url, is_http2False, contentNone, dataNone, filesNone, jsonNone, paramsNone,headersNone, cookiesNone, timeoutNone, extensionsNone, proxyNone, **kwargs):# 对异常进行捕获try:使用client method.upper() 请求方法都转为大写# 处理代理proxies Noneif proxy:proxies {http://: http:// proxy,https://: https:// proxy,}re_data httpx.Client(http2is_http2, proxiesproxies).request(method.upper(), url, contentcontent,datadata, filesfiles, jsonjson,paramsparams, headersheaders,cookiescookies, timeouttimeout,extensionsextensions, **kwargs)# 异常处理 报错显示具体信息except Exception as e:re_data None# 打印异常print(请求失败{0}.format(e))logger.error(Error occurred: %s, str(e), exc_infoTrue)# 重新抛出异常触发 retry 机制raise e# 返回响应结果return re_data
将两个请求封装在一个方法里
retry(stop_max_attempt_number5, retry_on_resultlambda re_data: re_data is None, wait_fixed2000)def request(self, method, url, is_http2False, paramsNone, dataNone, jsonNone, headersNone, filesNone,verifyFalse, certNone, timeoutNone, proxiesNone, contentNone, cookiesNone, extensionsNone,**kwargs):try:if is_http2:re_data self.httpx_request(methodmethod.upper(), urlurl, is_http2is_http2, contentcontent,datadata, filesfiles, jsonjson, paramsparams, headersheaders,cookiescookies, timeouttimeout, extensionsextensions, **kwargs)else:re_data self.requests_request(methodmethod, urlurl, paramsparams, datadata, jsonjson,headersheaders, filesfiles, certcert, timeouttimeout, verifyverify,proxiesproxies, **kwargs)# 异常处理 报错显示具体信息except Exception as e:re_data None# 打印异常print(请求失败{0}.format(e))logger.error(Error occurred: %s, str(e), exc_infoTrue)# 重新抛出异常触发 retry 机制raise e# 返回响应结果return re_data
通过is_http2来区分
测试代码如下
if __name__ __main__:# request_requests 使用requests请求request_data request_main.requests_request(get, https://spa16.scrape.center/)if request_data:print(request_data.text)print(request_data.status_code)# httpx 请求HTTP/2.0# response re.httpx_request(GET, https://spa16.scrape.center/, True)# httpx 一般请求# headers {User-Agent: my-app/0.0.1}# response re.httpx_request(get, https://www.httpbin.org/get,params{name: germey})# print(response.text)# print(response.status_code)print(datetime.datetime.now())