2025-04-30 16:57:46 +08:00

270 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
HTTP请求组件类
提供 GET请求组件和POST请求组件
"""
import logging
from typing import Dict, Any
from services.execution.task_context import TaskContext
from .base import BlockHandler, register_handler
from utils.logger import get_logger
from .model.block_name import HttpRequestBlockName
# 获取日志记录器
logger = get_logger("services.execution.handlers.htttp_request")
@register_handler(HttpRequestBlockName.GET)
class HttpGetBlockHandler(BlockHandler):
"""HTTP GET请求处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行HTTP GET请求"""
import aiohttp
import asyncio
import json
try:
# 获取请求参数
url = input_params.get("url")
# 处理headers参数
headers = {}
header_param = input_params.get("header", {})
# 如果header是字符串尝试解析JSON
if isinstance(header_param, str):
try:
header_data = json.loads(header_param)
# 检查解析后的数据是否为列表格式 [{"key0":"value0", "value0":"value"}, ...]
if isinstance(header_data, list):
for item in header_data:
# 寻找以key开头的键如key0, key1等
for key in [k for k in item if k.startswith('key')]:
if key in item and item[key]:
# 对应的value键应该是value0, value1等
value_key = 'value' + key[3:]
if value_key in item and item[value_key]:
headers[item[key]] = item[value_key]
# 如果是字典格式,直接使用
elif isinstance(header_data, dict):
headers = header_data
except json.JSONDecodeError:
logger.warning(f"无法解析headers JSON: {header_param}")
elif isinstance(header_param, dict):
headers = header_param
retry = input_params.get("retry", False)
# 确保retry_times为整数
retry_times = input_params.get("retryTimes", 3)
if isinstance(retry_times, str):
try:
retry_times = int(retry_times)
except ValueError:
retry_times = 3
# 确保retry_interval为整数
retry_interval = input_params.get("retryInterval", 1000)
if isinstance(retry_interval, str):
try:
retry_interval = int(retry_interval)
except ValueError:
retry_interval = 1000
if not url:
result = {
"success": False,
"message": "缺少请求URL"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
# 执行HTTP请求
attempt = 0
max_attempts = 1 if not retry else retry_times + 1
while attempt < max_attempts:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
# 读取响应内容
response_text = await response.text()
# 尝试解析JSON
try:
response_json = await response.json()
context.set_variable("response", response_json)
except:
# 不是有效的JSON保存为文本
context.set_variable("response", response_text)
result = {
"success": True,
"message": f"HTTP GET请求成功状态码: {response.status}",
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
attempt += 1
if attempt >= max_attempts:
raise
# 重试间隔
await asyncio.sleep(retry_interval / 1000)
except Exception as e:
result = {
"success": False,
"message": f"HTTP GET请求失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result
# HTTP POST请求处理器
@register_handler(HttpRequestBlockName.POST)
class HttpPostBlockHandler(BlockHandler):
"""HTTP POST请求处理器"""
async def execute(
self,
block: Dict[str, Any],
input_params: Dict[str, Any],
context: TaskContext
) -> Dict[str, Any]:
"""执行HTTP POST请求"""
import aiohttp
import asyncio
import json
try:
# 获取请求参数
url = input_params.get("url")
params = input_params.get("param", "{}")
# 处理headers参数
headers = {}
header_param = input_params.get("header", {})
# 如果header是字符串尝试解析JSON
if isinstance(header_param, str):
try:
header_data = json.loads(header_param)
# 检查解析后的数据是否为列表格式 [{"key0":"value0", "value0":"value"}, ...]
if isinstance(header_data, list):
for item in header_data:
# 寻找以key开头的键如key0, key1等
for key in [k for k in item if k.startswith('key')]:
if key in item and item[key]:
# 对应的value键应该是value0, value1等
value_key = 'value' + key[3:]
if value_key in item and item[value_key]:
headers[item[key]] = item[value_key]
# 如果是字典格式,直接使用
elif isinstance(header_data, dict):
headers = header_data
except json.JSONDecodeError:
logger.warning(f"无法解析headers JSON: {header_param}")
elif isinstance(header_param, dict):
headers = header_param
media_type = input_params.get("mediaType", "JSON")
retry = input_params.get("retry", False)
# 确保retry_times为整数
retry_times = input_params.get("retryTimes", 3)
if isinstance(retry_times, str):
try:
retry_times = int(retry_times)
except ValueError:
retry_times = 3
# 确保retry_interval为整数
retry_interval = input_params.get("retryInterval", 1000)
if isinstance(retry_interval, str):
try:
retry_interval = int(retry_interval)
except ValueError:
retry_interval = 1000
if not url:
result = {
"success": False,
"message": "缺少请求URL"
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
# 准备请求数据
try:
if isinstance(params, str):
data = json.loads(params)
else:
data = params
except:
data = params # 如果无法解析为JSON使用原始字符串
# 设置Content-Type
content_type = "application/json"
if media_type == "HTML":
content_type = "text/html"
elif media_type == "XML":
content_type = "application/xml"
elif media_type == "XWWWFORMURLENCODED":
content_type = "application/x-www-form-urlencoded"
elif media_type == "JAVASCRIPT":
content_type = "application/javascript"
if "Content-Type" not in headers:
headers["Content-Type"] = content_type
# 执行HTTP请求
attempt = 0
max_attempts = 1 if not retry else retry_times + 1
while attempt < max_attempts:
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data, headers=headers) as response:
# 读取响应内容
response_text = await response.text()
# 尝试解析JSON
try:
response_json = await response.json()
context.set_variable("response", response_json)
except:
# 不是有效的JSON保存为文本
context.set_variable("response", response_text)
result = {
"success": True,
"message": f"HTTP POST请求成功状态码: {response.status}, 响应内容详情: {response_text}",
}
# 记录执行结果
await self._record_task_log(block, result, context)
return result
except Exception as e:
attempt += 1
if attempt >= max_attempts:
raise
# 重试间隔
await asyncio.sleep(retry_interval / 1000)
except Exception as e:
result = {
"success": False,
"message": f"HTTP POST请求失败: {str(e)}"
}
# 记录异常
await self._record_task_log(block, result, context)
return result