from app.external.zhiban import default_zhiban_api_client
import requests
import json
from datetime import datetimedef send_daily_reminder():# app_map = [# {"name": "平台-存储云平台服务号", "type": "app"},# {"name": "MySQL/TiDB-DBA服务号", "type": "app"},# {"name": "Redis-Redis服务号", "type": "prd"},# {"name": "RedKV-RedKV服务号", "type": "prd"},# {"name": "RedTable-RedTable服务号", "type": "prd"},# {"name": "RedDTS-RedDTS服务号", "type": "app"},# {"name": "RedTAO-RedTAO服务号", "type": "app"},# ]app_map = [{"name": "base.db", "type": "app", "display_name": "平台-存储云平台服务号"},{"name": "base.db.db-all", "type": "app", "display_name": "MySQL/TiDB-DBA服务号"},{"name": "base.redis", "type": "prd", "display_name": "Redis-Redis服务号"},{"name": "base.redkv", "type": "prd", "display_name": "RedKV-RedKV服务号"},{"name": "base.redtable", "type": "prd", "display_name": "RedTable-RedTable服务号"},{"name": "base.db.red-dts", "type": "app", "display_name": "RedDTS-RedDTS服务号"},{"name": "base.redtao.redtao-manage", "type": "app", "display_name": "RedTAO-RedTAO服务号"},]platform_duty_map = {}all_duty_users = set() # 用于收集所有值班人for item in app_map:platform_name = item['name']print(f"处理应用 {platform_name} 的值班信息")platform_duty_map[platform_name] = {'primary': None}response = default_zhiban_api_client.get_zhiban_by_app(item["name"], item["type"])if response:if response.get('user'):user_id = response['user']['user_id']platform_duty_map[platform_name]['primary'] = user_idall_duty_users.add(user_id)print(f"添加主值班人: {user_id}")if not platform_duty_map:print("今日无值班人")return# 获取当前日期current_date = datetime.now().strftime('%Y年%m月%d日')# 构建消息内容message_parts = [f"{current_date}值班安排:"] # 添加日期信息for platform, duty_info in platform_duty_map.items():display_name = next((item['display_name'] for item in app_map if item['name'] == platform), platform)platform_msg = f"\n{display_name}:"if duty_info['primary']:# 使用 <@邮箱> 格式platform_msg += f"\n主值班人: <@{duty_info['primary']}>"message_parts.append(platform_msg)content = "\n".join(message_parts)# 构建请求数据webhook_url = "c6c5-4100-b42d-1456664f0538"data = {"msgtype": "text","text": {"content": content,}}# 发送消息try:response = requests.post(webhook_url,headers={'Content-Type': 'application/json'},data=json.dumps(data))response.raise_for_status()print("值班提醒已发送")except Exception as e:print(f"发送提醒失败: {str(e)}")
