由于时效问题,该文某些代码、技术可能已经过期,请注意!!!本文最后更新于:2 年前
Flask-Executor实现异步
Flask 是 Python 中有名的轻量级同步 web 框架,在一些开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的响应状态返回给前端,不让前端界面「卡顿」,当异步任务处理好后,如果需要返回状态,再将状态返回。
网上教程大多是针对 concurrent.futures 的 ThreadPoolExecutor(多线程)和ProcessPoolExecutor(多进程)实现,这里推荐使用 Flask-Executor。 官方文档介绍:Flask-Executor is a Flask extension that makes it easy to work with concurrent.futures in your application.
安装 1 pip install flask-executor
设置 1 2 3 4 5 6 7 8 9 from flask import Flaskfrom flask_executor import Executor app = Flask(__name__) # app.config['EXECUTOR_TYPE' ] = 'process' app.config['EXECUTOR_TYPE' ] = 'thread' app.config['EXECUTOR_MAX_WORKERS' ] = 5 app.config['EXECUTOR_PROPAGATE_EXCEPTIONS' ] = True ## 方便debug executor = Executor(app)
官方小例子: 1 2 3 4 5 6 7 8 9 10 11 def fib (n ): if n <= 2 : return 1 else : return fib(n-1 ) + fib(n-2 )@app.route('/run_fib' ) def run_fib (): executor.submit(fib, 5 ) executor.map (fib, range (1 , 6 )) return 'OK'
收集异步任务信息 1 2 3 4 5 6 7 8 9 10 11 @app .route('/start-task' ) def start_task(): executor.submit_stored('calc_power' , pow, 323 , 1235 ) return jsonify({'result' :'success' })@app .route('/get-result' ) def get_result(): if not executor.futures.done('calc_power' ): return jsonify({'status' : executor.futures._state('calc_power' )}) future = executor.futures.pop('calc_power' ) return jsonify({'status' : done, 'result' : future.result()})
详细文档请查阅:https://flask-executor.readthedocs.io/en/latest/
log 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import loggingfrom logging.handlers import TimedRotatingFileHandlerclass Logger : """ 自定义日志打印类,将日志保存到`../logs/`目录 设置日志等级和增加处理器,设置处理器为按照日期切分,最大保留30天 """ def __init__ (self ): self.logger = logging.getLogger() self.logger.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) file_handler = TimedRotatingFileHandler('./logs/app.log' , when='d' , interval=1 , backupCount=30 , encoding="utf8" , delay=False ) file_handler.setLevel(logging.NOTSET) formatter = logging.Formatter( fmt='%(asctime)s --- %(levelname)s - %(message)s' , datefmt="%Y-%m-%d %H:%M:%S" ) console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.addHandler(console_handler)
添加颜色1 2 3 4 5 6 7 8 9 from colorama import Fore,Back,Styledef white_green (s ): return Fore.WHITE + Back.GREEN + s + Style.RESET_ALL def white_red (s ): return Fore.WHITE + Back.RED + s + Style.RESET_ALL
调用日志1 2 LOGGER = Logger ()LOGGER.logger.info (white_green ('hello' ))
bug 多线程操作sqlite:Recursive use of cursors not allowed
解决方案参考:https://blog.csdn.net/counsellor/article/details/43715007
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import threadingdef update (database, projectId ): lock = threading.Lock() try : lock.acquire(True ) tmp_sql = '''update tasks set complete = ? where projectId = ?''' database.execute_sql(tmp_sql, ('Y' , projectId)) input_path = f"static/project/{projectId} /result" output_path = f"static/project/{projectId} /result.zip" zipDir(input_path, output_path) tmp_sql = '''update tasks set result = ? where projectId = ?''' database.execute_sql(tmp_sql, (output_path, projectId)) finally : lock.release()
参考:https://blog.csdn.net/briblue/article/details/85220740