mxivideo/python_core/cli/commands/auth.py

255 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
用户认证CLI命令
"""
from typing import Optional
import typer
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
import json
import getpass
from python_core.utils.jsonrpc_enhanced import create_response_handler
from python_core.api.auth_api import auth_api
from python_core.services.user_storage import user_storage
from python_core.utils.jwt_auth import jwt_auth
from uuid import uuid4
console = Console()
auth_app = typer.Typer(name="auth", help="用户认证管理命令")
@auth_app.command("register")
def register_user(
username: str = typer.Argument(..., help="用户名"),
email: str = typer.Argument(..., help="邮箱地址"),
display_name: Optional[str] = typer.Option(None, "--display-name", "-d", help="显示名称"),
password: Optional[str] = typer.Option(None, "--password", "-p", help="密码(不提供则交互式输入)")
):
"""注册新用户"""
response = create_response_handler(str(uuid4()))
try:
# 获取密码
if not password:
response.error(-32602, "❌ 密码不能为空")
raise typer.Exit(1)
# 执行注册
result = auth_api.register({
"username": username,
"email": email,
"password": password,
"display_name": display_name
})
response.success(result)
except Exception as e:
response.error(-32603, f"❌ 注册失败: {str(e)}")
raise typer.Exit(1)
@auth_app.command("login")
def login_user(
username_or_email: str = typer.Argument(..., help="用户名或邮箱"),
password: Optional[str] = typer.Option(None, "--password", "-p", help="密码(不提供则交互式输入)")
):
"""用户登录"""
response = create_response_handler(str(uuid4()))
try:
# 获取密码
if not password:
response.error(-32602, "❌ 密码不能为空")
raise typer.Exit(1)
# 执行登录
result = auth_api.login({
"username_or_email": username_or_email,
"password": password
})
response.success(result)
except Exception as e:
response.error(-32603, f"❌ 注册失败: {str(e)}")
raise typer.Exit(1)
@auth_app.command("verify")
def verify_token(
token: str = typer.Argument(..., help="JWT token"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出")
):
"""验证JWT token"""
try:
console.print(f"🔍 [bold blue]验证Token[/bold blue]")
# 验证token
result = auth_api.verify_token({
"token": token
})
if result["success"]:
console.print(f"\n✅ [bold green]Token有效![/bold green]")
user = result['data']['user']
console.print(f"用户: {user['display_name']} ({user['username']})")
console.print(f"邮箱: {user['email']}")
if verbose:
# 显示token详细信息
token_info = jwt_auth.get_token_info(token)
info_text = f"""
👤 用户信息:
ID: {user['user_id']}
用户名: {user['username']}
邮箱: {user['email']}
显示名称: {user['display_name']}
🔑 Token信息:
有效性: {'有效' if token_info['valid'] else '无效'}
签发时间: {token_info.get('issued_at', 'Unknown')}
过期时间: {token_info.get('expires_at', 'Unknown')}
剩余时间: {token_info.get('time_remaining', 'Unknown')}
"""
panel = Panel(info_text.strip(), title="Token详情", border_style="green")
console.print(panel)
else:
console.print(f"[red]❌ Token无效: {result['message']}[/red]")
if verbose:
# 显示token信息即使无效
token_info = jwt_auth.get_token_info(token)
if token_info.get('error'):
console.print(f"错误: {token_info['error']}")
else:
console.print(f"Token状态: {'过期' if token_info.get('is_expired') else '无效'}")
raise typer.Exit(1)
except Exception as e:
console.print(f"[red]❌ Token验证失败: {str(e)}[/red]")
raise typer.Exit(1)
@auth_app.command("list")
def list_users(
include_inactive: bool = typer.Option(False, "--include-inactive", "-i", help="包含非活跃用户"),
limit: int = typer.Option(20, "--limit", "-l", help="显示数量限制"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="详细输出")
):
"""列出所有用户"""
try:
console.print(f"👥 [bold blue]用户列表[/bold blue]")
# 获取用户列表
users = user_storage.get_all_users(include_inactive)
if not users:
console.print("📭 没有用户")
return
# 限制显示数量
total_count = len(users)
if len(users) > limit:
users = users[:limit]
console.print(f"📊 显示前 {limit} 个用户(共 {total_count} 个)")
else:
console.print(f"📊 共 {total_count} 个用户")
# 创建表格
table = Table(title="用户列表")
table.add_column("ID", style="cyan", width=8)
table.add_column("用户名", style="green")
table.add_column("邮箱", style="yellow")
table.add_column("显示名称", style="magenta")
table.add_column("状态", style="blue")
table.add_column("创建时间", style="dim")
if verbose:
table.add_column("最后登录", style="dim")
for user in users:
# 状态
status = "✅ 活跃" if user.is_active else "❌ 禁用"
# 格式化创建时间
created_at = user.created_at
if 'T' in created_at:
created_at = created_at.split('T')[0] + ' ' + created_at.split('T')[1][:8]
row = [
user.id[:8],
user.username,
user.email,
user.display_name,
status,
created_at
]
if verbose:
last_login = user.last_login or "从未登录"
if last_login != "从未登录" and 'T' in last_login:
last_login = last_login.split('T')[0] + ' ' + last_login.split('T')[1][:8]
row.append(last_login)
table.add_row(*row)
console.print(table)
except Exception as e:
console.print(f"[red]❌ 获取用户列表失败: {str(e)}[/red]")
raise typer.Exit(1)
@auth_app.command("stats")
def show_stats():
"""显示用户统计信息"""
try:
console.print(f"📊 [bold blue]用户统计信息[/bold blue]")
# 获取统计信息
stats = user_storage.get_user_count()
# 创建统计面板
stats_content = f"""
📈 [bold green]用户统计[/bold green]
总用户数: {stats['total']}
活跃用户: {stats['active']}
禁用用户: {stats['inactive']}
"""
stats_panel = Panel(stats_content.strip(), title="用户统计", border_style="green")
console.print(stats_panel)
# 显示最近注册的用户
recent_users = user_storage.get_all_users()[:5]
if recent_users:
console.print(f"\n🕒 [bold green]最近注册的用户[/bold green]")
recent_table = Table()
recent_table.add_column("用户名", style="green")
recent_table.add_column("邮箱", style="yellow")
recent_table.add_column("注册时间", style="dim")
for user in recent_users:
created_at = user.created_at
if 'T' in created_at:
created_at = created_at.split('T')[0] + ' ' + created_at.split('T')[1][:8]
recent_table.add_row(
user.username,
user.email,
created_at
)
console.print(recent_table)
except Exception as e:
console.print(f"[red]❌ 获取统计信息失败: {str(e)}[/red]")
raise typer.Exit(1)
if __name__ == "__main__":
auth_app()