请问flask项目如何在子线程中获取数据库连接?

我在用flask_apscheduler来编写一个控制定时任务的web项目,我遇到了一个问题,就是这些任务是在子线程中运行的,而子线程中访问不到flaskapp或者SQLAlchemy的数据库连接。

我的项目大致如下:

使用threadpool的调度器,并使用SQLAlchemy进行持久化。然后提供一个web接口,可以通过请求添加任务,并在一个单独的任务信息表中记录任务必要的信息,因为任务信息可能会改变,而且很多,所以不是在注册任务时作为参数传递给任务调度器,而是传递一个任务的UUID,等实际要运行时再临时查库获取任务数据,代码类似于:

def add_task_with_data(task_data):
    logger = get_logger(__name__)

    task_name = task_data.get("task_name")
    interval = task_data.get("task_interval")
    trigger = IntervalTrigger(seconds=interval)
    task_uuid = str(uuid.uuid4())

    scheduler.add_job(id=task_uuid, func=execute_task, trigger=trigger, name=task_name, args=[task_uuid])
    logger.info(f"调度器添加任务成功: {task_data}")

    task = Task(task_name=task_name,
                task_uuid=task_uuid,
                task_type=task_data.get("task_type"),
                task_interval=interval,
                task_proxy=task_data.get("task_proxy"),
                task_data=json.dumps(task_data["other_data"]))
    db.session.add(task)
    db.session.commit()
    logger.info(f"任务数据入库成功")

接下来任务执行的时候,要从数据库里先查任务数据,再开始执行任务,而此时因为任务是在子线程中执行的,所以没法访问主线程的flaskapp或者数据库连接,我目前的代码如下:


def execute_task(task_uuid):
    logger = get_logger(__name__)
    task_object = None
    try:
        logger.info(f"进入任务函数")
        task_result = db.session.query(Task).filter_by(task_uuid=task_uuid).one_or_none()
        if task_result:
            logger.info(f"获取任务信息成功: {task_result}")

报错为:

RuntimeError: Working outside of application context.

This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.

Chatgpt建议我手动创建flaskapp上下文,但是实际上因为这是在子线程里,没有已存在的flaskapp,所以用with手动创建上下文也没什么用

with current_app.app_context():  # 不顶用!

我目前能想到几个方法,但是都十分丑陋,比如

  • 每个线程里再创建一个flaskapp
  • 创建一个webapi,在子线程里通过这个webapi查库,获取任务信息
  • 通过类似于管道之类的方法,把这个对象传递到子线程(?)

请问大家有没有什么好办法?非常感谢!

另附我的初始化代码

def init_flask_app():
    app = Flask(__name__)

    class Config:
        SQLALCHEMY_DATABASE_URI = f'sqlite:///{app.root_path}/db/database.db'
        SQLALCHEMY_TRACK_MODIFICATIONS = False
        SCHEDULER_API_ENABLED = True  # 启用调度 API
        SCHEDULER_JOBSTORES = {
            'default': {
                'type': 'sqlalchemy',
                'url': SQLALCHEMY_DATABASE_URI
            }
        }
        SCHEDULER_EXECUTORS = {
            'default': {'type': 'threadpool', 'max_workers': 20}
        }
    app.config.from_object(Config)

    db.init_app(app)

    # 设置日志
    app.logger.setLevel(logging.DEBUG)
    file_handler = logging.FileHandler('logs/app.log', encoding='utf-8')
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    app.logger.addHandler(file_handler)

    with app.app_context():
        db.create_all()
        app.logger.info("数据库连接成功")

    return app

def main():
    app = init_flask_app()
    register_routes(app)

    # 初始化 Flask-APScheduler
    scheduler.init_app(app)
    scheduler.start()

    # 添加示例任务
    # scheduler.add_job(id='my_task', func=my_task_function, trigger='interval', seconds=30)

    app.run(host='0.0.0.0', port=5000)

我目前的方法是在每个线程里获取SQLALCHEMY的数据库连接,代码如下:

# db_noflaskapp.py
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

# 创建数据库引擎
engine = create_engine('sqlite:///db/database.db')

# 创建 scoped_session
Session = scoped_session(sessionmaker(bind=engine))

def get_session():
    """
    获取一个新的 session,保证线程安全。
    """
    return Session()

def remove_session():
    """
    移除当前线程的 session。
    """
    Session.remove()
# 执行任务的函数
def execute_task(task_uuid):
    logger = get_logger(__name__)
    task_object = None
    try:
        logger.info(f"进入任务函数")
        session = get_session()
        task_result = session.query(Task).filter_by(task_uuid=task_uuid).one_or_none()