Skip to content
Commits on Source (3)
* text eol=lf
...@@ -37,6 +37,7 @@ out/ ...@@ -37,6 +37,7 @@ out/
bin/ bin/
*.apk *.apk
*.ipa *.ipa
*.zip
# IDE 配置 # IDE 配置
.idea/ .idea/
......
#!/usr/bin/env bash
set -euo pipefail
# 工作目录为脚本所在目录
BASE_DIR="$(cd "$(dirname "$0")" && pwd)"
ZIP_FILE="$BASE_DIR/tushare-web-front.zip"
TARGET_DIR="$BASE_DIR/tushare-web-front"
TIMESTAMP="$(date +%y%m%d%H%M%S)"
if [ ! -f "$ZIP_FILE" ]; then
echo "[ERROR] 找不到压缩包: $ZIP_FILE"
exit 1
fi
if ! command -v unzip >/dev/null 2>&1; then
echo "[ERROR] 未安装 unzip,请先安装 unzip"
exit 1
fi
# 备份当前的 tushare-web-front 目录(如果存在)
if [ -d "$TARGET_DIR" ]; then
BACKUP_DIR="${TARGET_DIR}_${TIMESTAMP}"
echo "[INFO] 备份目录: $TARGET_DIR -> $BACKUP_DIR"
cp -a "$TARGET_DIR" "$BACKUP_DIR"
fi
# 确保目标目录存在
mkdir -p "$TARGET_DIR"
# 直接解压到目标目录并覆盖同名文件
echo "[INFO] 解压 $ZIP_FILE$TARGET_DIR (覆盖同名文件) ..."
unzip -q -o "$ZIP_FILE" -d "$TARGET_DIR"
echo "[SUCCESS] 部署完成: $TARGET_DIR"
...@@ -44,7 +44,7 @@ def check_token(token: str, client_ip: str): ...@@ -44,7 +44,7 @@ def check_token(token: str, client_ip: str):
# 1. token是否存在(只查内存 ALL_TOKENS,直接取字典) # 1. token是否存在(只查内存 ALL_TOKENS,直接取字典)
token_info = ALL_TOKENS.get(token) token_info = ALL_TOKENS.get(token)
if not token_info: if not token_info:
return False, "Invalid or expired token" return False, "Token已过期或不存在,请购买新token"
now = datetime.now() now = datetime.now()
# 2. 是否被锁定(用 is_locked 字段判断) # 2. 是否被锁定(用 is_locked 字段判断)
if token_info.get('is_locked'): if token_info.get('is_locked'):
...@@ -58,17 +58,21 @@ def check_token(token: str, client_ip: str): ...@@ -58,17 +58,21 @@ def check_token(token: str, client_ip: str):
# 3. 是否过期 # 3. 是否过期
end_time = token_info.get('end_time') end_time = token_info.get('end_time')
if end_time and now > datetime.fromisoformat(end_time): if end_time and now > datetime.fromisoformat(end_time):
return False, "token已过期" return False, "Token已过期或不存在,请购买新token"
# 4. 是否首次使用 # 4. 是否首次使用
if token not in TOKEN_IP_MAP: if token not in TOKEN_IP_MAP:
TOKEN_IP_MAP[token] = {'ip': client_ip, 'locked_at': None} TOKEN_IP_MAP[token] = {'ip': client_ip, 'locked_at': None}
# 首次使用仍需记录访问,执行频控判定
locked = TokenService.record_access_and_maybe_lock(token_info, client_ip)
if locked:
TOKEN_IP_MAP[token]['locked_at'] = now
return False, "检测到同一token多IP高频访问,已锁定,请等待12个小时解锁"
return True, None return True, None
# 5. 检查ip是否一致 # 5. 非首次使用:记录访问并执行频控判定
if TOKEN_IP_MAP[token]['ip'] != client_ip: locked = TokenService.record_access_and_maybe_lock(token_info, client_ip)
# 锁定token if locked:
TokenService.lock_token(token_info.get('id'), operator='system')
TOKEN_IP_MAP[token]['locked_at'] = now TOKEN_IP_MAP[token]['locked_at'] = now
return False, "多台机器使用同一token,已锁定,请等待12个小时解锁" return False, "检测到同一token多IP高频访问,已锁定,请等待12个小时解锁"
return True, None return True, None
for method_name in get_public_methods(tushare_funet.pro_api): for method_name in get_public_methods(tushare_funet.pro_api):
......
...@@ -30,7 +30,7 @@ class Database: ...@@ -30,7 +30,7 @@ class Database:
self.engine = create_engine( self.engine = create_engine(
db_url, db_url,
poolclass=QueuePool, poolclass=QueuePool,
pool_size=10, pool_size=5,
max_overflow=20, max_overflow=20,
pool_pre_ping=True, pool_pre_ping=True,
pool_recycle=3600, pool_recycle=3600,
......
...@@ -3,7 +3,7 @@ Token模型 ...@@ -3,7 +3,7 @@ Token模型
""" """
from datetime import datetime from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import declarative_base
Base = declarative_base() Base = declarative_base()
......
...@@ -3,7 +3,8 @@ Token服务类 ...@@ -3,7 +3,8 @@ Token服务类
""" """
import uuid import uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List, Deque
from collections import defaultdict, deque
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import and_, or_ from sqlalchemy import and_, or_
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
...@@ -20,6 +21,12 @@ logger = get_logger(__name__) ...@@ -20,6 +21,12 @@ logger = get_logger(__name__)
class TokenService: class TokenService:
"""Token服务类""" """Token服务类"""
# 访问频率控制:5分钟窗口、单IP阈值10次
ACCESS_WINDOW_MINUTES: int = 1
ACCESS_LIMIT_PER_IP: int = 10
# 内存窗口:token_value -> ip -> deque[timestamps]
_token_ip_access_windows: Dict[str, Dict[str, Deque[datetime]]] = defaultdict(lambda: defaultdict(deque))
@staticmethod @staticmethod
def generate_token() -> str: def generate_token() -> str:
"""生成Token值""" """生成Token值"""
...@@ -81,6 +88,58 @@ class TokenService: ...@@ -81,6 +88,58 @@ class TokenService:
finally: finally:
close_db_session() close_db_session()
@staticmethod
def record_access_and_maybe_lock(token_info: Dict[str, Any], client_ip: str) -> bool:
"""记录一次访问,并根据5分钟内多IP高频访问策略决定是否锁定。
策略:在最近 ACCESS_WINDOW_MINUTES 分钟内,如果该 token 存在至少2个不同 IP,且每个 IP 的访问次数都超过 ACCESS_LIMIT_PER_IP,则锁定 token。
Returns:
True 表示本次判断已触发锁定;False 表示未锁定。
"""
try:
token_value = token_info.get('token_value')
token_id = token_info.get('id')
if not token_value or not token_id:
return False
now = datetime.now()
window_start = now - timedelta(minutes=TokenService.ACCESS_WINDOW_MINUTES)
access_map = TokenService._token_ip_access_windows[token_value]
ip_deque = access_map[client_ip]
ip_deque.append(now)
# 清理当前 IP 过窗的访问
while ip_deque and ip_deque[0] < window_start:
ip_deque.popleft()
# 统计所有 IP 的窗口内访问次数,并清理无效项
ips_over_limit = 0
for ip, q in list(access_map.items()):
# 清理过窗访问
while q and q[0] < window_start:
q.popleft()
if len(q) > TokenService.ACCESS_LIMIT_PER_IP:
ips_over_limit += 1
if not q:
# 清理空队列,避免内存增长
del access_map[ip]
if ips_over_limit >= 2:
# 满足锁定条件
locked = TokenService.lock_token(token_id=token_id, operator='system')
if locked:
logger.warning(
f"Token {token_value} 被锁定:{TokenService.ACCESS_WINDOW_MINUTES}分钟内,至少2个IP访问次数均超过"
f"{TokenService.ACCESS_LIMIT_PER_IP}"
)
return True
return False
except Exception as e:
logger.error(f"记录访问/判断锁定异常: {str(e)}")
return False
@staticmethod @staticmethod
def get_tokens( def get_tokens(
page: int = 1, page: int = 1,
......
...@@ -7,3 +7,4 @@ requests ...@@ -7,3 +7,4 @@ requests
pandas pandas
python-dateutil python-dateutil
python-dotenv python-dotenv
pytest
\ No newline at end of file
...@@ -3,7 +3,9 @@ ...@@ -3,7 +3,9 @@
# 配置参数 # 配置参数
APP_DIR="tushare-web-back" # 应用文件夹名称 APP_DIR="tushare-web-back" # 应用文件夹名称
ZIP_FILE="${APP_DIR}.zip" # 上传的zip文件名 ZIP_FILE="${APP_DIR}.zip" # 上传的zip文件名
BACKUP_DIR="${APP_DIR}-backup" # 备份文件夹名称 # 备份文件夹增加时间后缀(YYYYMMDDHHSS)
TIMESTAMP=$(date +%Y%m%d%H%S)
BACKUP_DIR="${APP_DIR}-backup-${TIMESTAMP}"
SERVICE_NAME="tushareweb-manager" # Supervisor中的服务名(根据实际情况修改) SERVICE_NAME="tushareweb-manager" # Supervisor中的服务名(根据实际情况修改)
# 检查zip文件是否存在 # 检查zip文件是否存在
...@@ -14,14 +16,7 @@ fi ...@@ -14,14 +16,7 @@ fi
# 备份现有应用文件夹 # 备份现有应用文件夹
echo "开始备份 $APP_DIR$BACKUP_DIR..." echo "开始备份 $APP_DIR$BACKUP_DIR..."
# 复制当前应用文件夹到备份(带时间戳,不清理历史备份)
# 删除旧的备份(如果存在)
if [ -d "$BACKUP_DIR" ]; then
rm -rf "$BACKUP_DIR"
echo "已删除旧的备份文件夹 $BACKUP_DIR"
fi
# 复制当前应用文件夹到备份
if [ -d "$APP_DIR" ]; then if [ -d "$APP_DIR" ]; then
cp -r "$APP_DIR" "$BACKUP_DIR" cp -r "$APP_DIR" "$BACKUP_DIR"
if [ $? -eq 0 ]; then if [ $? -eq 0 ]; then
......