mxivideo/python_core/cli/commands/auth.py

120 lines
3.5 KiB
Python

"""
用户认证CLI命令
"""
from typing import Optional
import typer
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
import json
import getpass
from python_core.utils.jsonrpc_enhanced import create_response_handler
from python_core.api.auth_api import auth_api
from python_core.database.user_postgres import user_table
from python_core.utils.jwt_auth import jwt_auth
from uuid import uuid4
console = Console()
auth_app = typer.Typer(name="auth", help="用户认证管理命令")
@auth_app.command("register")
def register_user(
username: str = typer.Argument(..., help="用户名"),
email: str = typer.Argument(..., help="邮箱地址"),
display_name: Optional[str] = typer.Option(None, "--display-name", "-d", help="显示名称"),
password: Optional[str] = typer.Option(None, "--password", "-p", help="密码(不提供则交互式输入)")
):
"""注册新用户"""
response = create_response_handler()
try:
# 获取密码
if not password:
response.error(-32602, "❌ 密码不能为空")
raise typer.Exit(1)
# 执行注册
result = auth_api.register({
"username": username,
"email": email,
"password": password,
"display_name": display_name
})
response.success(result)
except Exception as e:
response.error(-32603, f"❌ 注册失败: {str(e)}")
raise typer.Exit(1)
@auth_app.command("login")
def login_user(
username_or_email: str = typer.Argument(..., help="用户名或邮箱"),
password: Optional[str] = typer.Option(None, "--password", "-p", help="密码(不提供则交互式输入)")
):
"""用户登录"""
response = create_response_handler()
try:
# 获取密码
if not password:
response.error(-32602, "❌ 密码不能为空")
raise typer.Exit(1)
# 执行登录
result = auth_api.login({
"username_or_email": username_or_email,
"password": password
})
response.success(result)
except Exception as e:
response.error(-32603, f"❌ 注册失败: {str(e)}")
raise typer.Exit(1)
@auth_app.command("verify")
def verify_token(
token: str = typer.Argument(..., help="JWT token"),
):
"""验证JWT token"""
response = create_response_handler()
try:
# 验证token
result = auth_api.verify_token({
"token": token
})
response.success(result)
except Exception as e:
response.error(-32603, f"❌ 注册失败: {str(e)}")
raise typer.Exit(1)
@auth_app.command("list")
def list_users(
include_inactive: bool = typer.Option(False, "--include-inactive", "-i", help="包含非活跃用户"),
):
"""列出所有用户"""
response = create_response_handler()
try:
# 获取用户列表
users = user_table.get_all_users(include_inactive)
response.success(users)
except Exception as e:
response.error(-32603, f"❌ 注册失败: {str(e)}")
raise typer.Exit(1)
@auth_app.command("stats")
def show_stats():
"""显示用户统计信息"""
response = create_response_handler()
try:
# 获取统计信息
stats = user_table.get_user_count()
response.success(stats)
except Exception as e:
response.error(-32603, f"❌ 注册失败: {str(e)}")
raise typer.Exit(1)
if __name__ == "__main__":
auth_app()