Flask异步任务

由于时效问题,该文某些代码、技术可能已经过期,请注意!!!本文最后更新于:2 年前

Flask-Executor实现异步

Flask 是 Python 中有名的轻量级同步 web 框架,在一些开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的响应状态返回给前端,不让前端界面「卡顿」,当异步任务处理好后,如果需要返回状态,再将状态返回。

网上教程大多是针对 concurrent.futures 的 ThreadPoolExecutor(多线程)和ProcessPoolExecutor(多进程)实现,这里推荐使用
Flask-Executor。
官方文档介绍:Flask-Executor is a Flask extension that makes it easy to work with concurrent.futures in your application.

安装
1
pip install flask-executor
设置
1
2
3
4
5
6
7
8
9
from flask import Flask
from flask_executor import Executor

app = Flask(__name__)
# app.config['EXECUTOR_TYPE'] = 'process'
app.config['EXECUTOR_TYPE'] = 'thread'
app.config['EXECUTOR_MAX_WORKERS'] = 5
app.config['EXECUTOR_PROPAGATE_EXCEPTIONS'] = True ## 方便debug
executor = Executor(app)
官方小例子:
1
2
3
4
5
6
7
8
9
10
11
def fib(n):
if n <= 2:
return 1
else:
return fib(n-1) + fib(n-2)

@app.route('/run_fib')
def run_fib():
executor.submit(fib, 5)
executor.map(fib, range(1, 6))
return 'OK'
收集异步任务信息
1
2
3
4
5
6
7
8
9
10
11
@app.route('/start-task')
def start_task():
executor.submit_stored('calc_power', pow, 323, 1235)
return jsonify({'result':'success'})

@app.route('/get-result')
def get_result():
if not executor.futures.done('calc_power'):
return jsonify({'status': executor.futures._state('calc_power')})
future = executor.futures.pop('calc_power')
return jsonify({'status': done, 'result': future.result()})

详细文档请查阅:https://flask-executor.readthedocs.io/en/latest/

log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import logging
from logging.handlers import TimedRotatingFileHandler

class Logger:
"""
自定义日志打印类,将日志保存到`../logs/`目录
设置日志等级和增加处理器,设置处理器为按照日期切分,最大保留30天
"""

def __init__(self):
# 创建Logger
self.logger = logging.getLogger()
self.logger.setLevel(logging.DEBUG)

# 终端Handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)

# 文件Handler
# file_handler = logging.FileHandler(
# filename='./logs/app.log',
# mode='a+',
# encoding='UTF-8'
# )
file_handler = TimedRotatingFileHandler('./logs/app.log', when='d', interval=1, backupCount=30,
encoding="utf8", delay=False)
file_handler.setLevel(logging.NOTSET)

# Formatter
formatter = logging.Formatter(
fmt='%(asctime)s --- %(levelname)s - %(message)s',
datefmt="%Y-%m-%d %H:%M:%S"
)

console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

# 添加到Logger中
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
  • 添加颜色
    1
    2
    3
    4
    5
    6
    7
    8
    9
    from colorama import Fore,Back,Style

    # 前景色:白色 背景色:绿色
    def white_green(s):
    return Fore.WHITE + Back.GREEN + s + Style.RESET_ALL

    # 前景色:白色 背景色:红色
    def white_red(s):
    return Fore.WHITE + Back.RED + s + Style.RESET_ALL
  • 调用日志
    1
    2
    LOGGER = Logger()
    LOGGER.logger.info(white_green('hello'))
bug

多线程操作sqlite:Recursive use of cursors not allowed

解决方案参考:https://blog.csdn.net/counsellor/article/details/43715007

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading

def update(database, projectId):
lock = threading.Lock()
try:
lock.acquire(True)
tmp_sql = '''update tasks set complete = ? where projectId = ?'''
database.execute_sql(tmp_sql, ('Y', projectId))

input_path = f"static/project/{projectId}/result"
output_path = f"static/project/{projectId}/result.zip"
zipDir(input_path, output_path)

tmp_sql = '''update tasks set result = ? where projectId = ?'''
database.execute_sql(tmp_sql, (output_path, projectId))
finally:
lock.release()

参考:https://blog.csdn.net/briblue/article/details/85220740


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!