605 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
HTTP请求模块
提供HTTP请求相关功能
"""
import json
import requests
import xmltodict
import asyncio
import concurrent.futures
import aiohttp
from typing import Dict, Optional, Callable
from utils.logger import get_logger
from utils.json_parser import safe_parse_dict
logger = get_logger("services.online_script.http_module")
class VWEDHttpModule:
"""HTTP请求模块"""
def __init__(self, script_id: str):
self.script_id = script_id
self.session = requests.Session()
self.global_headers = {}
# 强制禁用所有代理,包括环境变量中的代理设置
self.session.proxies = {}
self.session.trust_env = False # 忽略环境变量中的代理设置
def _execute_safely(self, func, *args, **kwargs):
"""安全执行HTTP请求在事件循环中使用独立线程池"""
try:
# 检查是否在事件循环中
in_event_loop = False
try:
asyncio.get_running_loop()
in_event_loop = True
except RuntimeError:
in_event_loop = False
if in_event_loop:
# 在事件循环中使用独立的线程池执行HTTP请求
# 增加线程数量和超时时间,避免死锁
with concurrent.futures.ThreadPoolExecutor(max_workers=3, thread_name_prefix=f"http_script_{self.script_id}") as executor:
future = executor.submit(func, *args, **kwargs)
return future.result(timeout=60) # 增加到60秒超时
else:
# 没有事件循环,直接执行
return func(*args, **kwargs)
except concurrent.futures.TimeoutError:
logger.error(f"HTTP请求线程池执行超时: {func.__name__}, 脚本: {self.script_id}")
return None
except Exception as e:
logger.error(f"HTTP请求执行失败: {func.__name__}, 脚本: {self.script_id}, 错误: {str(e)}")
return None
async def request_post(self, url: str, param: str) -> Optional[str]:
"""
发送POST请求参数为JSON格式
Args:
url: 请求的URL
param: JSON字符串请求的参数
Returns:
若请求成功返回响应的JSON字符串
若请求失败None
"""
# 检查是否是本地请求,如果是则使用异步方式避免死锁
is_local_request = any(host in url for host in ['127.0.0.1', 'localhost', '::1'])
if is_local_request:
try:
# 检查是否在事件循环中
asyncio.get_running_loop()
return await self._async_request_post(url, param)
except RuntimeError:
# 不在事件循环中,使用同步方式
pass
return self._execute_safely(self._sync_request_post, url, param)
async def _async_request_post(self, url: str, param: str) -> Optional[str]:
"""异步执行POST请求的内部方法专门处理本地循环请求"""
try:
headers = {"Content-Type": "application/json"}
headers.update(self.global_headers)
# 尝试将参数转换为有效JSON
dict_param = safe_parse_dict(param)
if dict_param is None:
logger.error(f"异步POST请求失败: {url}, 参数无法转换为有效JSON格式")
return None
# 记录请求信息用于调试
logger.info(f"发送异步POST请求: {url}, 脚本: {self.script_id}")
# 使用aiohttp进行异步请求避免阻塞事件循环
timeout = aiohttp.ClientTimeout(total=60) # 60秒超时
async with aiohttp.ClientSession(timeout=timeout, trust_env=False) as session:
# 使用json参数而不是data参数让aiohttp自动处理JSON序列化和Content-Type
async with session.post(url, json=dict_param, headers={k: v for k, v in headers.items() if k.lower() != 'content-type'}) as response:
response.raise_for_status()
result = await response.text()
logger.info(f"异步POST请求成功: {url}, 状态码: {response.status}")
return result
except asyncio.TimeoutError:
logger.error(f"异步POST请求超时: {url}")
return None
def _sync_request_post(self, url: str, param: str) -> Optional[str]:
"""同步执行POST请求的内部方法"""
try:
headers = {"Content-Type": "application/json"}
headers.update(self.global_headers)
# 尝试将参数转换为有效JSON
json_param = self._prepare_json_param(param)
if json_param is None:
logger.error(f"POST请求失败: {url}, 参数无法转换为有效JSON格式")
return None
# 记录请求信息用于调试
logger.info(f"发送POST请求: {url}, 代理设置: {self.session.proxies}")
# 检查是否是本地循环请求,如果是则增加超时时间
is_local_request = any(host in url for host in ['127.0.0.1', 'localhost', '::1'])
timeout = 45 if is_local_request else 30
# 设置超时时间,避免无限阻塞,强制不使用代理
response = self.session.post(
url,
data=json_param,
headers=headers,
timeout=timeout,
proxies={} # 强制在请求级别也不使用代理
)
response.raise_for_status()
logger.info(f"POST请求成功: {url}, 状态码: {response.status_code}")
return response.text
except requests.exceptions.Timeout:
logger.error(f"POST请求超时: {url}")
return None
except requests.exceptions.ConnectionError as e:
logger.error(f"POST请求连接失败: {url}, 错误: {str(e)}")
return None
except Exception as e:
logger.error(f"POST请求失败: {url}, 错误: {str(e)}")
return None
def _prepare_json_param(self, param: str) -> Optional[str]:
"""准备JSON参数处理多种输入格式"""
try:
# 首先尝试直接解析为JSON
test_parse = safe_parse_dict(param)
if test_parse is not None:
# 是有效JSON直接返回
return param
# 如果解析失败尝试使用eval()解析Python字典字符串安全性有限制
# 只在字符串包含特定Python字典格式时使用
if param.strip().startswith('{') and param.strip().endswith('}'):
try:
# 尝试使用安全的字面量评估
import ast
dict_obj = ast.literal_eval(param)
if isinstance(dict_obj, dict):
return json.dumps(dict_obj, ensure_ascii=False)
except (ValueError, SyntaxError):
# 如果ast.literal_eval失败记录错误
logger.warning(f"参数格式无法解析: {param[:100]}...")
return None
except Exception as e:
logger.error(f"准备JSON参数时发生错误: {str(e)}")
return None
async def _generic_request(self, url: str, method: str = "POST", param: str = None,
headers: Dict[str, str] = None,
data_processor: Callable = None,
response_processor: Callable = None) -> Optional[str]:
"""
通用请求处理函数,支持异步和同步请求,避免死锁
Args:
url: 请求的URL
method: HTTP方法 (GET, POST, PUT等)
param: 请求参数
headers: 请求头
data_processor: 数据预处理函数
response_processor: 响应后处理函数
Returns:
请求结果或None
"""
# 检查是否是本地请求,如果是则使用异步方式避免死锁
is_local_request = any(host in url for host in ['127.0.0.1', 'localhost', '::1'])
if is_local_request:
try:
# 检查是否在事件循环中
asyncio.get_running_loop()
return await self._async_generic_request(url, method, param, headers, data_processor, response_processor)
except RuntimeError:
# 不在事件循环中,使用同步方式
pass
return self._execute_safely(self._sync_generic_request, url, method, param, headers, data_processor, response_processor)
async def _async_generic_request(self, url: str, method: str, param: str,
headers: Dict[str, str], data_processor: Callable,
response_processor: Callable) -> Optional[str]:
"""异步执行通用请求的内部方法"""
try:
# 合并请求头
request_headers = headers or {}
request_headers.update(self.global_headers)
# 数据预处理
processed_data = data_processor(param) if data_processor and param else param
if param and not processed_data and data_processor:
logger.error(f"异步{method}请求失败: {url}, 数据预处理失败")
return None
# 记录请求信息用于调试
logger.info(f"发送异步{method}请求: {url}, 脚本: {self.script_id}")
# 使用aiohttp进行异步请求避免阻塞事件循环
timeout = aiohttp.ClientTimeout(total=60) # 60秒超时
async with aiohttp.ClientSession(timeout=timeout, trust_env=False) as session:
# 根据方法选择合适的请求方式
if method.upper() == "GET":
async with session.get(url, headers=request_headers) as response:
response.raise_for_status()
result = await response.text()
elif method.upper() == "POST":
# 判断数据类型,选择合适的参数
if 'application/json' in request_headers.get('Content-Type', ''):
# JSON数据使用json参数过滤Content-Type避免重复
filtered_headers = {k: v for k, v in request_headers.items() if k.lower() != 'content-type'}
async with session.post(url, json=processed_data, headers=filtered_headers) as response:
response.raise_for_status()
result = await response.text()
else:
# 其他数据使用data参数
async with session.post(url, data=processed_data, headers=request_headers) as response:
response.raise_for_status()
result = await response.text()
elif method.upper() == "PUT":
if 'application/json' in request_headers.get('Content-Type', ''):
filtered_headers = {k: v for k, v in request_headers.items() if k.lower() != 'content-type'}
async with session.put(url, json=processed_data, headers=filtered_headers) as response:
response.raise_for_status()
result = await response.text()
else:
async with session.put(url, data=processed_data, headers=request_headers) as response:
response.raise_for_status()
result = await response.text()
else:
logger.error(f"不支持的异步HTTP方法: {method}")
return None
logger.info(f"异步{method}请求成功: {url}, 状态码: {response.status}")
# 响应后处理
return response_processor(result) if response_processor else result
except asyncio.TimeoutError:
logger.error(f"异步{method}请求超时: {url}")
return None
def _sync_generic_request(self, url: str, method: str, param: str,
headers: Dict[str, str], data_processor: Callable,
response_processor: Callable) -> Optional[str]:
"""同步执行通用请求的内部方法"""
try:
# 合并请求头
request_headers = headers or {}
request_headers.update(self.global_headers)
# 数据预处理
processed_data = data_processor(param) if data_processor and param else param
if param and not processed_data and data_processor:
logger.error(f"{method}请求失败: {url}, 数据预处理失败")
return None
# 记录请求信息用于调试
logger.info(f"发送{method}请求: {url}, 代理设置: {self.session.proxies}")
# 检查是否是本地循环请求,如果是则增加超时时间
is_local_request = any(host in url for host in ['127.0.0.1', 'localhost', '::1'])
timeout = 45 if is_local_request else 30
# 根据方法选择合适的请求方式
if method.upper() == "GET":
response = self.session.get(
url,
headers=request_headers,
timeout=timeout,
proxies={} # 强制在请求级别也不使用代理
)
elif method.upper() == "POST":
response = self.session.post(
url,
data=processed_data,
headers=request_headers,
timeout=timeout,
proxies={} # 强制在请求级别也不使用代理
)
elif method.upper() == "PUT":
response = self.session.put(
url,
data=processed_data,
headers=request_headers,
timeout=timeout,
proxies={} # 强制在请求级别也不使用代理
)
else:
logger.error(f"不支持的HTTP方法: {method}")
return None
response.raise_for_status()
logger.info(f"{method}请求成功: {url}, 状态码: {response.status_code}")
# 响应后处理
return response_processor(response.text) if response_processor else response.text
except requests.exceptions.Timeout:
logger.error(f"{method}请求超时: {url}")
return None
except requests.exceptions.ConnectionError as e:
logger.error(f"{method}请求连接失败: {url}, 错误: {str(e)}")
return None
except Exception as e:
logger.error(f"{method}请求失败: {url}, 错误: {str(e)}")
return None
async def request_post_xml(self, url: str, param: str) -> Optional[str]:
"""
发送POST请求参数为XML格式
在本方法中会自动将传入的JSON格式字符串转化为XML格式
Args:
url: 请求的URL
param: JSON字符串请求的参数
Returns:
若请求成功将响应的XML格式字符串转化为JSON格式并返回
若请求失败None
"""
def json_to_xml_processor(param_str: str) -> str:
"""将JSON参数转换为XML数据"""
param_dict = safe_parse_dict(param_str)
if param_dict is None:
return None
return xmltodict.unparse({"root": param_dict})
def xml_to_json_processor(response_text: str) -> str:
"""将XML响应转换为JSON格式"""
xml_dict = xmltodict.parse(response_text)
return json.dumps(xml_dict, ensure_ascii=False)
headers = {"Content-Type": "application/xml"}
return await self._generic_request(
url=url,
method="POST",
param=param,
headers=headers,
data_processor=json_to_xml_processor,
response_processor=xml_to_json_processor
)
async def request_put_json(self, url: str, param: str) -> Optional[str]:
"""
发送PUT请求参数为JSON格式
Args:
url: 请求的URL
param: JSON字符串请求的参数
Returns:
若请求成功返回响应的JSON字符串
若请求失败None
"""
def json_to_dict_processor(param_str: str) -> Optional[dict]:
"""将JSON字符串转换为字典对象用于异步请求"""
return safe_parse_dict(param_str)
headers = {"Content-Type": "application/json"}
return await self._generic_request(
url=url,
method="PUT",
param=param,
headers=headers,
data_processor=json_to_dict_processor
)
async def request_get(self, url: str) -> Optional[str]:
"""
发送GET请求
Args:
url: 请求的URL
Returns:
若请求成功:请求返回的字符串
若请求失败None
"""
# 检查是否是本地请求,如果是则使用异步方式避免死锁
is_local_request = any(host in url for host in ['127.0.0.1', 'localhost', '::1'])
if is_local_request:
try:
# 检查是否在事件循环中
asyncio.get_running_loop()
return await self._async_request_get(url)
except RuntimeError:
# 不在事件循环中,使用同步方式
pass
return self._execute_safely(self._sync_request_get, url)
async def _async_request_get(self, url: str) -> Optional[str]:
"""异步执行GET请求的内部方法专门处理本地循环请求"""
try:
headers = self.global_headers.copy()
# 记录请求信息用于调试
logger.info(f"发送异步GET请求: {url}, 脚本: {self.script_id}")
# 使用aiohttp进行异步请求避免阻塞事件循环
timeout = aiohttp.ClientTimeout(total=60) # 60秒超时
async with aiohttp.ClientSession(timeout=timeout, trust_env=False) as session:
async with session.get(url, headers=headers) as response:
response.raise_for_status()
result = await response.text()
logger.info(f"异步GET请求成功: {url}, 状态码: {response.status}")
return result
except asyncio.TimeoutError:
logger.error(f"异步GET请求超时: {url}")
return None
except aiohttp.ClientError as e:
logger.error(f"异步GET请求客户端错误: {url}, 错误: {str(e)}")
return None
except Exception as e:
logger.error(f"异步GET请求失败: {url}, 错误: {str(e)}")
return None
def _sync_request_get(self, url: str) -> Optional[str]:
"""同步执行GET请求的内部方法"""
try:
headers = self.global_headers.copy()
response = self.session.get(url, headers=headers, timeout=30)
response.raise_for_status()
return response.text
except Exception as e:
logger.error(f"GET请求失败: {url}, 错误: {str(e)}")
return None
def set_header(self, key: str, value: str) -> None:
"""
设置请求头,设置一次之后,对所有请求都有效
Args:
key: 请求头的key
value: 请求头的value
"""
self.global_headers[key] = value
async def request_http_post(self, url: str, param: str, head_param: str, media_type: str) -> Optional[str]:
"""
发送POST请求增强版
可设置请求媒体类型(MediaType),可设置请求头
Args:
url: 请求的URL
param: 请求的入参json字符串
head_param: 请求头如有传json字符串无传空字符串
media_type: 请求类型可选择JSON、JAVASCRIPT、HTML、XML、XWWWFORMURLENCODED
Returns:
成功返回json字符串失败返回None
"""
headers = self._get_media_type_header(media_type)
# 解析额外请求头
if head_param:
extra_headers = safe_parse_dict(head_param)
if extra_headers:
headers.update(extra_headers)
# 如果是JSON类型使用字典处理器
data_processor = None
if media_type.upper() == "JSON":
def json_to_dict_processor(param_str: str) -> Optional[dict]:
return safe_parse_dict(param_str)
data_processor = json_to_dict_processor
return await self._generic_request(
url=url,
method="POST",
param=param,
headers=headers,
data_processor=data_processor
)
async def request_http_get(self, url: str, head_param: str) -> Optional[str]:
"""
发送GET请求增强版
可设置请求头
Args:
url: 请求的URL
head_param: 请求头如有传json字符串无传空字符串
Returns:
成功返回json字符串失败返回None
"""
headers = {}
# 解析额外请求头
if head_param:
extra_headers = safe_parse_dict(head_param)
if extra_headers:
headers.update(extra_headers)
return await self._generic_request(
url=url,
method="GET",
headers=headers
)
async def request_http_put(self, url: str, head_param: str, media_type: str, param: str) -> Optional[str]:
"""
发送PUT请求增强版
可设置请求媒体类型(MediaType),可设置请求头
Args:
url: 请求的URL
head_param: 请求头如有传json字符串无传空字符串
media_type: 请求类型可选择JSON、JAVASCRIPT、HTML、XML、XWWWFORMURLENCODED
param: 请求的入参json字符串
Returns:
成功返回json字符串失败返回None
"""
headers = self._get_media_type_header(media_type)
# 解析额外请求头
if head_param:
extra_headers = safe_parse_dict(head_param)
if extra_headers:
headers.update(extra_headers)
# 如果是JSON类型使用字典处理器
data_processor = None
if media_type.upper() == "JSON":
def json_to_dict_processor(param_str: str) -> Optional[dict]:
return safe_parse_dict(param_str)
data_processor = json_to_dict_processor
return await self._generic_request(
url=url,
method="PUT",
param=param,
headers=headers,
data_processor=data_processor
)
def _get_media_type_header(self, media_type: str) -> Dict[str, str]:
"""
根据媒体类型获取对应的Content-Type请求头
Args:
media_type: 媒体类型
Returns:
包含Content-Type的字典
"""
media_type_mapping = {
"JSON": "application/json",
"JAVASCRIPT": "application/javascript",
"HTML": "text/html",
"XML": "application/xml",
"XWWWFORMURLENCODED": "application/x-www-form-urlencoded"
}
content_type = media_type_mapping.get(media_type.upper(), "application/json")
return {"Content-Type": content_type}