我在用flask_apscheduler来编写一个控制定时任务的web项目,我遇到了一个问题,就是这些任务是在子线程中运行的,而子线程中访问不到flaskapp或者SQLAlchemy的数据库连接。
我的项目大致如下:
使用threadpool的调度器,并使用SQLAlchemy进行持久化。然后提供一个web接口,可以通过请求添加任务,并在一个单独的任务信息表中记录任务必要的信息,因为任务信息可能会改变,而且很多,所以不是在注册任务时作为参数传递给任务调度器,而是传递一个任务的UUID,等实际要运行时再临时查库获取任务数据,代码类似于:
def add_task_with_data(task_data):
logger = get_logger(__name__)
task_name = task_data.get("task_name")
interval = task_data.get("task_interval")
trigger = IntervalTrigger(seconds=interval)
task_uuid = str(uuid.uuid4())
scheduler.add_job(id=task_uuid, func=execute_task, trigger=trigger, name=task_name, args=[task_uuid])
logger.info(f"调度器添加任务成功: {task_data}")
task = Task(task_name=task_name,
task_uuid=task_uuid,
task_type=task_data.get("task_type"),
task_interval=interval,
task_proxy=task_data.get("task_proxy"),
task_data=json.dumps(task_data["other_data"]))
db.session.add(task)
db.session.commit()
logger.info(f"任务数据入库成功")
接下来任务执行的时候,要从数据库里先查任务数据,再开始执行任务,而此时因为任务是在子线程中执行的,所以没法访问主线程的flaskapp或者数据库连接,我目前的代码如下:
def execute_task(task_uuid):
logger = get_logger(__name__)
task_object = None
try:
logger.info(f"进入任务函数")
task_result = db.session.query(Task).filter_by(task_uuid=task_uuid).one_or_none()
if task_result:
logger.info(f"获取任务信息成功: {task_result}")
报错为:
RuntimeError: Working outside of application context.
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
Chatgpt建议我手动创建flaskapp上下文,但是实际上因为这是在子线程里,没有已存在的flaskapp,所以用with手动创建上下文也没什么用
with current_app.app_context(): # 不顶用!
我目前能想到几个方法,但是都十分丑陋,比如
- 每个线程里再创建一个flaskapp
- 创建一个webapi,在子线程里通过这个webapi查库,获取任务信息
- 通过类似于管道之类的方法,把这个对象传递到子线程(?)
请问大家有没有什么好办法?非常感谢!