Flask + Celery 应用

article/2025/7/15 1:14:03

目录

    • Flask + Celery 应用
      • 项目结构
      • 1. 创建app.py
      • 2. 创建tasks.py
      • 3. 创建celery_worker.py
      • 4. 创建templates目录和index.html
      • 运行应用
      • 测试文件

Flask + Celery 应用

对于Flask与Celery结合的例子,需要创建几个文件。首先安装必要的依赖:

pip install flask celery redis requests

项目结构

让我们创建以下文件结构:

flask_celery/
├── app.py          # Flask应用
├── celery_worker.py # Celery worker
├── tasks.py        # Celery任务定义
└── templates/      # 模板目录└── index.html  # 主页模板

1. 创建app.py

from flask import Flask, request, render_template, jsonify
from celery_worker import celery
import tasks
import timeapp = Flask(__name__)@app.route('/')
def index():return render_template('index.html')@app.route('/process', methods=['POST'])
def process():# 获取表单数据data = request.form.get('data', '')# 启动异步任务task = tasks.process_data.delay(data)return jsonify({'task_id': task.id,'status': '任务已提交','message': f'正在处理数据: {data}'})@app.route('/status/<task_id>')
def task_status(task_id):# 获取任务状态task = tasks.process_data.AsyncResult(task_id)if task.state == 'PENDING':response = {'state': task.state,'status': '任务等待中...'}elif task.state == 'FAILURE':response = {'state': task.state,'status': '任务失败','error': str(task.info)}else:response = {'state': task.state,'status': '任务完成' if task.state == 'SUCCESS' else '任务处理中','result': task.result if task.state == 'SUCCESS' else None}return jsonify(response)if __name__ == '__main__':app.run(debug=True)

2. 创建tasks.py

from celery_worker import celery
import time@celery.task()
def process_data(data):# 模拟耗时操作time.sleep(5)# 处理数据(这里只是简单地转换为大写)result = data.upper()return {'original_data': data,'processed_data': result,'processing_time': '5秒'}

3. 创建celery_worker.py

from celery import Celery# 创建Celery实例
celery = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0'
)# 配置Celery
celery.conf.update(task_serializer='json',accept_content=['json'],result_serializer='json',timezone='Asia/Shanghai',enable_utc=True,
)

4. 创建templates目录和index.html

首先创建templates目录:

mkdir templates

然后创建index.html文件:

<!DOCTYPE html>
<html>
<head><title>Flask + Celery 示例</title><style>body { font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; }h1 { color: #4285f4; }.container { max-width: 800px; margin: 0 auto; }.form-group { margin-bottom: 15px; }input[type="text"] { padding: 8px; width: 300px; }button { padding: 8px 15px; background-color: #4285f4; color: white; border: none; cursor: pointer; }button:hover { background-color: #3b78e7; }#result { margin-top: 20px; padding: 15px; background-color: #f5f5f5; border-radius: 5px; display: none; }#status { margin-top: 10px; font-style: italic; }</style>
</head>
<body><div class="container"><h1>Flask + Celery 异步任务示例</h1><div class="form-group"><label for="data">输入要处理的数据:</label><br><input type="text" id="data" name="data" placeholder="输入一些文本..."><button onclick="submitTask()">提交任务</button></div><div id="status"></div><div id="result"><h3>任务结果:</h3><pre id="result-data"></pre></div></div><script>function submitTask() {const data = document.getElementById('data').value;if (!data) {alert('请输入数据!');return;}const statusDiv = document.getElementById('status');statusDiv.textContent = '提交任务中...';// 提交任务fetch('/process', {method: 'POST',headers: {'Content-Type': 'application/x-www-form-urlencoded',},body: `data=${encodeURIComponent(data)}`}).then(response => response.json()).then(data => {statusDiv.textContent = data.status;// 开始轮询任务状态pollTaskStatus(data.task_id);}).catch(error => {statusDiv.textContent = `错误: ${error}`;});}function pollTaskStatus(taskId) {const statusDiv = document.getElementById('status');const resultDiv = document.getElementById('result');const resultDataPre = document.getElementById('result-data');// 定期检查任务状态const interval = setInterval(() => {fetch(`/status/${taskId}`).then(response => response.json()).then(data => {statusDiv.textContent = `状态: ${data.status}`;if (data.state === 'SUCCESS') {clearInterval(interval);resultDiv.style.display = 'block';resultDataPre.textContent = JSON.stringify(data.result, null, 2);} else if (data.state === 'FAILURE') {clearInterval(interval);statusDiv.textContent = `错误: ${data.error}`;}}).catch(error => {clearInterval(interval);statusDiv.textContent = `轮询错误: ${error}`;});}, 1000);}</script>
</body>
</html>

运行应用

  1. 首先,确保Redis服务器正在运行(Celery需要它作为消息代理)可以用docker启动:
docker run --name redis-server -p 6379:6379 -d redis

在这里插入图片描述

  1. 启动Celery worker:
celery -A tasks worker --loglevel=info

在这里插入图片描述

  1. 在另一个终端中启动Flask应用:
python app.py

在这里插入图片描述

测试文件

创建一个新文件test_app.py

import requests
import time
import json# 应用服务器地址
BASE_URL = 'http://localhost:5000'def test_process_endpoint():"""测试/process端点"""print("测试1: 提交任务处理")# 准备测试数据test_data = "hello world"# 发送POST请求到/process端点response = requests.post(f"{BASE_URL}/process",data={"data": test_data})# 检查响应状态码if response.status_code == 200:print("✓ 状态码正确: 200")else:print(f"✗ 状态码错误: {response.status_code}")# 解析响应JSONresult = response.json()# 检查响应内容if 'task_id' in result:print(f"✓ 成功获取任务ID: {result['task_id']}")return result['task_id']else:print("✗ 未能获取任务ID")return Nonedef test_status_endpoint(task_id):"""测试/status/<task_id>端点"""print("\n测试2: 检查任务状态")if not task_id:print("✗ 无法测试状态端点: 缺少任务ID")return# 轮询任务状态,最多等待10秒max_attempts = 10for attempt in range(1, max_attempts + 1):print(f"\n轮询 {attempt}/{max_attempts}...")# 发送GET请求到/status/<task_id>端点response = requests.get(f"{BASE_URL}/status/{task_id}")# 检查响应状态码if response.status_code == 200:print("✓ 状态码正确: 200")else:print(f"✗ 状态码错误: {response.status_code}")continue# 解析响应JSONresult = response.json()print(f"当前状态: {result.get('state', '未知')}")# 如果任务完成,显示结果并退出循环if result.get('state') == 'SUCCESS':print("\n✓ 任务成功完成!")print("结果:")print(json.dumps(result.get('result', {}), indent=2, ensure_ascii=False))return True# 如果任务失败,显示错误并退出循环if result.get('state') == 'FAILURE':print(f"\n✗ 任务失败: {result.get('error', '未知错误')}")return False# 等待1秒后再次检查time.sleep(1)print("\n✗ 超时: 任务未在预期时间内完成")return Falsedef test_index_endpoint():"""测试首页端点"""print("\n测试3: 访问首页")# 发送GET请求到/端点response = requests.get(BASE_URL)# 检查响应状态码if response.status_code == 200:print("✓ 状态码正确: 200")print("✓ 成功访问首页")else:print(f"✗ 状态码错误: {response.status_code}")def run_all_tests():"""运行所有测试"""print("开始测试Flask+Celery应用...\n")# 测试1: 提交任务task_id = test_process_endpoint()# 测试2: 检查任务状态if task_id:test_status_endpoint(task_id)# 测试3: 访问首页test_index_endpoint()print("\n测试完成!")if __name__ == "__main__":run_all_tests()

然后,在另一个终端中运行测试:

python test_app.py

在这里插入图片描述


http://www.hkcw.cn/article/RxkcUjbxrA.shtml

相关文章

鸿蒙电脑会在国内逐渐取代windows电脑吗?

点击上方关注 “终端研发部” 设为“星标”&#xff0c;和你一起掌握更多数据库知识 10年内应该不会 用Windows、MacOS操作系统的后果是你的个人信息可能会被美国FBI看到&#xff0c;但绝大多数人的信息FBI没兴趣去看 你用某家公司的电脑系统,那就得做好被某些人监视的下场,相信…

安全态势感知中的告警误报思考

如果说2020年还是小打小闹&#xff0c;那么2021年无疑是杀疯了&#xff0c;我带着我的误报识别系统和事件专家系统在流量态势感知领域杀疯了。先说说这个误报识别系统&#xff0c;我创新性的定义了灰色事件&#xff0c;认为告警不是非黑即白的&#xff0c;而是需要持续评估的&a…

电脑wifi显示已禁用怎么点都无法启用

一、重启路由器与电脑 有时候&#xff0c;简单的重启可以解决很多小故障。试着先断开电源让路由器休息一会儿再接通&#xff1b;对于电脑&#xff0c;则可选择重启系统看看情况是否有改善。 二、检查驱动程序 无线网卡驱动程序的问题也是导致WiFi无法启用的常见原因之一。我…

MAC电脑怎么通过触摸屏打开右键

在Mac电脑上&#xff0c;通过触摸屏打开右键菜单的方法如下&#xff1a; 法1:双指轻点&#xff1a;在触控板上同时用两根手指轻点&#xff0c;即可触发右键菜单。这是Mac上常用的右键操作方法。 法2:自定义触控板角落&#xff1a;可以设置触控板的右下角或左下角作为右键区域…

【音视频】 FFmpeg 硬件(AMD)解码H264

参考链接&#xff1a;https://trac.ffmpeg.org/wiki/HWAccelIntro 硬件编解码的概念 硬件编解码是⾮CPU通过烧写运⾏视频加速功能对⾼清视频流进⾏编解码&#xff0c;其中⾮CPU可包括GPU、FPGA或者ASIC等独⽴硬件模块&#xff0c;把CPU⾼使⽤率的视频解码⼯作从CPU⾥分离出来&…

【笔记】为 Python 项目安装图像处理与科学计算依赖(MINGW64 环境)

&#x1f4dd; 为 Python 项目安装图像处理与科学计算依赖&#xff08;MINGW64 环境&#xff09; &#x1f3af; 安装目的说明 本次安装是为了在 MSYS2 的 MINGW64 工具链环境中&#xff0c;搭建一个完整的 Python 图像处理和科学计算开发环境。 主要目的是支持以下类型的 Pyth…

软件测评师教程 第2章 软件测试基础 笔记

第2章 软件测试基础 笔记 25.03.18 2.1 软件测试的基本概念 2.1.1 什么是软件测试 软件测试的定义&#xff1a; IEEE 使用人工或自动手段来运行或测定某个系统的过程&#xff0c;其目的在于检验它是否满足规定的需求 或是 弄清预期结果与实际结果之间的差异。 软件测试应尽…

[网页五子棋][匹配对战]落子实现思路、发送落子请求、处理落子响应

文章目录 落子实现思路发送落子请求处理落子响应两种棋盘的区别实现 handleTextMessage实现对弈功能控制台打印棋盘完善前端逻辑 落子实现思路 先来实现&#xff1a;点击棋盘&#xff0c;能发送落子请求 客户端 1 点击了棋盘位置&#xff0c;先不着急画子&#xff0c;而是给服…

【QT控件】QWidget 常用核心属性介绍 -- 万字详解

目录 一、控件概述 二、QWidget 核心属性 2.1 核心属性概览 2.2 enabled ​编辑 2.3 geometry 2.4 windowTitle 2.5 windowIcon 使用qrc文件管理资源 2.6 windowOpacity 2.7 cursor 2.8 font ​编辑 2.9 toolTip 2.10 focusPolicy 2.11 styleSheet QT专栏&…

【Java EE初阶】计算机是如何⼯作的

计算机是如何⼯作的 计算机发展史冯诺依曼体系&#xff08;Von Neumann Architecture&#xff09;CPU指令&#xff08;Instruction&#xff09;CPU 是如何执行指令的&#xff08;重点&#xff09; 操作系统&#xff08;Operating System&#xff09;进程(process) 进程 PCB 中的…

【仿muduo库实现并发服务器】使用正则表达式提取HTTP元素

使用正则表达式提取HTTP元素 1.正则表达式2.正则库的使用3.使用正则表达式提取HTTP请求行 1.正则表达式 正则表达式它其实是描述了一种字符串匹配的模式&#xff0c;它可以用来在一个字符串中检测一个特定格式的字串&#xff0c;以及可以将符合特定规则的字串进行替换或者提取…

力扣刷题Day 68:搜索插入位置(35)

1.题目描述 2.思路 方法1&#xff1a;回溯的二分查找。 方法2&#xff1a;看到了一个佬很简洁的写法&#xff0c;代码贴在下面了。 3.代码&#xff08;Python3&#xff09; 方法1&#xff1a; class Solution:def searchInsert(self, nums: List[int], target: int) ->…

23. Merge k Sorted Lists

目录 题目描述 方法一、k-1次两两合并 方法二、分治法合并 方法三、使用优先队列 题目描述 23. Merge k Sorted Lists 方法一、k-1次两两合并 选第一个链表作为结果链表&#xff0c;每次将后面未合并的链表合并到结果链表中&#xff0c;经过k-1次合并&#xff0c;即可得到…

docker-部署Nginx以及Tomcat

一、docker 部署Nginx 1、搜索镜像&#xff08;nginx&#xff09; [rootlocalhost /]# docker search nginx Error response from daemon: Get "https://index.docker.io/v1/search?qnginx&n25": dial tcp 192.133.77.133:443: connect: connection refused 简…

【dshow】VIDEOINFOHEADER2 头文件

VIDEOINFOHEADER2 是用于描述视频流格式的结构体&#xff0c;常用于 Windows 平台的 DirectShow 或 Media Foundation 编程中。 它的定义在以下头文件中&#xff1a; ✅ 所在头文件&#xff1a; #include <dvdmedia.h>&#x1f4cc; 前置说明&#xff1a; VIDEOINFOHEA…

CentOS8.3+Kubernetes1.32.5+Docker28.2.2高可用集群二进制部署

一、准备工作 1.1 主机列表 HostnameHost IPDocker IPRolek8s31.vm.com192.168.26.3110.26.31.1/24master&worker、etcd、dockerk8s32.vm.com192.168.26.3210.26.32.1/24master&worker、etcd、dockerk8s33.vm.com192.168.26.3310.26.33.1/24master&worker、etcd、…

Python----目标检测(Ultralytics安装和YOLO-V8快速上手)

一、Ultralytics安装 网址&#xff1a;主页 -Ultralytics YOLO 文档 Ultralytics提供了各种安装方法&#xff0c;包括pip、conda和Docker。通过 ultralytics pip包安装最新稳定版本的YOLOv8&#xff0c;或克隆Ultralytics GitHub 存储库以获取最新版本。可以使用Docker在隔离的…

Git实战--基于已有分支克隆进行项目开发的完整流程

Git克隆项目开发流程 ✅ 一、完整流程概述✅ 二、详细操作步骤Step 1&#xff1a;克隆仓库&#xff08;如果尚未克隆&#xff09;Step 2&#xff1a;获取远程分支信息并切换到 feature/ 获取所有远程分支Step 3&#xff1a;创建并切换到你的新分支Step 4&#xff1a;开始开发新…

FreeBSD 14.3 候选版本附带 Docker 镜像和关键修复

新的月份已经到来&#xff0c;FreeBSD 14.3 候选发布版 1 现已开放测试&#xff0c;它带来了一些您可能会觉得有用的更新&#xff0c;特别是如果您对Docker容器感兴趣的话。RC1 版本中一个非常受欢迎的改进是&#xff0c;FreeBSD 项目已开始将官方开放容器计划 (OCI) 镜像发布到…

界面分析 - 上

上方&#xff1a;图标&#xff0c;搜索框&#xff0c;功能按钮 左侧&#xff1a;文本显示&#xff0c;自定义按钮&#xff0c;点击自定义按钮的时候&#xff0c;最底下播放条不变&#xff0c;右侧界面随着按钮的改变而改变 右侧&#xff1a;文本信息显示&#xff0c;图片按钮…