104 lines
3.5 KiB
Python
104 lines
3.5 KiB
Python
# components/http_components.py
|
|
"""
|
|
HTTP请求组件
|
|
"""
|
|
import requests
|
|
from typing import Dict, Any, Optional
|
|
from core.component import Component, ComponentFactory
|
|
from core.exceptions import ComponentError
|
|
|
|
|
|
@ComponentFactory.register("http_get_request")
|
|
class HttpGetRequestComponent(Component):
|
|
"""HTTP GET请求组件"""
|
|
|
|
def execute(self) -> Dict[str, Any]:
|
|
# 获取参数
|
|
url = self.resolve_param("url")
|
|
headers = self.resolve_param("headers", {})
|
|
params = self.resolve_param("params", {})
|
|
timeout = self.resolve_param("timeout", 30)
|
|
|
|
# 验证必要参数
|
|
self.validate_required_params(["url"])
|
|
|
|
try:
|
|
# 执行HTTP请求
|
|
response = requests.get(url, headers=headers, params=params, timeout=timeout)
|
|
response.raise_for_status()
|
|
|
|
# 处理响应
|
|
try:
|
|
response_data = response.json()
|
|
except ValueError:
|
|
response_data = response.text
|
|
|
|
result = {
|
|
"response": response_data,
|
|
"status_code": response.status_code,
|
|
"headers": dict(response.headers)
|
|
}
|
|
|
|
# 存储结果
|
|
self.store_result(result)
|
|
return result
|
|
|
|
except requests.RequestException as e:
|
|
error_result = {
|
|
"error": str(e),
|
|
"status_code": getattr(e.response, "status_code", None) if hasattr(e, "response") else None
|
|
}
|
|
self.store_result(error_result)
|
|
raise ComponentError(f"HTTP GET请求失败: {str(e)}")
|
|
|
|
@ComponentFactory.register("http_post_request")
|
|
class HttpPostRequestComponent(Component):
|
|
"""HTTP POST请求组件"""
|
|
|
|
def execute(self) -> Dict[str, Any]:
|
|
# 获取参数
|
|
url = self.resolve_param("url")
|
|
data = self.resolve_param("data", {})
|
|
headers = self.resolve_param("headers", {})
|
|
content_type = self.resolve_param("content_type", "json")
|
|
timeout = self.resolve_param("timeout", 30)
|
|
|
|
# 验证必要参数
|
|
self.validate_required_params(["url"])
|
|
|
|
try:
|
|
# 设置内容类型
|
|
if content_type == "json" and "Content-Type" not in headers:
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
# 执行HTTP请求
|
|
if content_type == "json":
|
|
response = requests.post(url, json=data, headers=headers, timeout=timeout)
|
|
else:
|
|
response = requests.post(url, data=data, headers=headers, timeout=timeout)
|
|
|
|
response.raise_for_status()
|
|
|
|
# 处理响应
|
|
try:
|
|
response_data = response.json()
|
|
except ValueError:
|
|
response_data = response.text
|
|
|
|
result = {
|
|
"response": response_data,
|
|
"status_code": response.status_code,
|
|
"headers": dict(response.headers)
|
|
}
|
|
|
|
# 存储结果
|
|
self.store_result(result)
|
|
return result
|
|
|
|
except requests.RequestException as e:
|
|
error_result = {
|
|
"error": str(e),
|
|
"status_code": getattr(e.response, "status_code", None) if hasattr(e, "response") else None
|
|
}
|
|
self.store_result(error_result)
|
|
raise ComponentError(f"HTTP POST请求失败: {str(e)}") |