fastapi-sqlalchemy

ooowl
  • python🐍
  • fastAPI⚡
  • python🐍
  • fastAPI⚡
  • sqlalchemy
About 7 min

sqlalchemy

sqlalchemy是python的orm工具,被整合进fastapi里面了

updatetime应该在model生成而不是 请求时生成,因为前端会给你传乱七八糟的,这个字段应该是对用户只读或者不可见的,应该在model里定义而不是pydantic里定义。
默认值的

踩过的坑

懒加载

默认的sqlalchemy的session是懒加载的,有时候update数据不进去可以考虑这个原因,使用session.__repr__()无感调用实现加载

密码编码

Engine Configuration — SQLAlchemy 1.4 Documentationopen in new window
当密码中含有特殊字符比如@的时候,应该用特殊编码替换掉,其实python自带了一个包处理这个

import urllib.parse
urllib.parse.quote_plus("kx@jj5/g")
=>'kx%40jj5%2Fg' # @被替换成了%40,/替换成了%2F

Warning

Changed in version 1.4: Support for @ signs in hostnames and database names has been fixed. As a side effect of this fix, @ signs in passwords must be escaped.

在版本1.4中改变: 在主机名和数据库名中支持@符号已经修复。作为此修复的一个副作用,必须转义密码中的@符号。

update的时候报错

update方法默认为synchronize_session='evaluate'
update方法中,还有一个可选参数synchronize_session,它用于指定何时将更改同步到数据库中。synchronize_session有三种取值:FalseTrue'evaluate'

  • synchronize_session=False表示不同步任何更改,而是只将更改应用于查询对象本身。这意味着查询结果可能不反映实际数据库中的更改,因为更改将在提交事务之前被回滚。这个选项通常用于性能优化,特别是在需要更新大量记录的情况下。
  • synchronize_session='fetch'表示立即同步更改到数据库中。查询结果将反映实际数据库中的更改。但是,这也可能导致脏读或死锁问题,因为其他并发事务可能正在访问被更新的记录。因此,这个选项通常用于更新少量记录或在并发性不是关键问题的情况下。
  • synchronize_session='evaluate'表示根据查询对象自动决定何时同步更改。是默认值。如果查询对象是一个普通的SELECT查询,则会使用True;如果查询对象是一个JOIN查询,则会使用False
sqlalchemy.exc.InvalidRequestError: Synchronize session setting must be set to 'evaluate' or False for bulk operations

使用filter方法进行筛选后,调用update方法批量修改记录,SQLAlchemy无法确定何时将更改同步到数据库中,导致更新操作失败
update的时候synchronize_session=False参数的诡异问题,忘了哪个错误了,直接设置为False就行了

插入json字汉字被unicode编码

插入json字段的时候,其中的汉字被编码为\u4f60\u597d 这样在数据库里和日志里都是unicode编码,但是查出来之后是正常的。
检查sqlalchemy的engine,pg的client db都是utf-8但是不知道为啥哪里还是latin1(我恨他)
[1]

select * from users;
alice|["𝓓𝓞Γ"]
bob|["\ud835\udcd3\ud835\udcde\u0393"]

在issue[2]里有人问了,最后官方加上了个参数[3] 在create_engine申请的时候加入参数指定json序列化器

engine = create_engine("mysql://scott:tiger@hostname/dbname",
                       encoding='utf8', echo=True,
                       json_serializer=lambda x: json.dumps(x, ensure_ascii=False))

之后存数据库就不会了

json字段的模糊查询

#todo pgsql的json字段模糊查询

session传递和生命周期

python根据函数实际参数的类型不同进行传递:

  1. 值传递:适用于实参类型为不可变类型(字符串、数字、元组)
  2. 引用(地址)传递:适用于实参类型为可变类型(列表,字典)

sqlalchemy 的session是引用传递,在函数中传递的时候,子函数查询完了,在外层中可以读出来。

查询出来的ORM对象本身在 Sessionopen in new window ,在名为 identity mapopen in new window -维护每个对象唯一副本的数据结构,其中“唯一”表示“只有一个具有特定主键的对象”。 Session以无状态的形式开始。一旦发出查询或其他对象被持久化,它将从 Engineopen in new windowSessionopen in new window ,然后在该连接上建立一个事务 .

也就是说在执行query之后的db对象维护的identity map不变,所以对这些对象还可以进行其他操作,但是此时如果query之后再update那么缓冲区就变成了update的结果,此时无法进行其他操作
session相当于每个query返回一个cur,多个query实际上是分离的
所以同一个query,应该先查出数据来再修改,但是取出来的数据会和db里面的不一致

使用sqlalchemy 的engine直接执行sql engine.execute(sql_)

Redis集成

#TODO Redis OM aioredis构建池,fastapi如何同步的建立redis连接池,普通使用如何建立连接池,怎样让redis链接随着app启动和关闭
小记一次FastAPI使用连接池调用Redis时,切换数据库的问题 - 一灰的随手记open in new window

任务调度

使用schedule去调度,轻量好用,但是除了定时之外没有其他功能,而且是固定时间间隔检测,执行任务的时候线程会阻塞。

import schedule
import time

def job():
    print("Job started")
    time.sleep(5)  # 任务执行时间为5秒
    print("Job completed")

# 每3秒调度一次任务
schedule.every(3).seconds.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)


在fastapi中的最佳实践,在app_starup的时候启动一个线程,然后去定时执行,如果有多个Uvicorn的worker则会执行多次,此时需要锁,引入外部的锁背离了schdule的初衷,不如直接apschduler,所以需要更简便的锁
任务要保证时间间隔内要执行完毕,否则任务会积压,schdule是维护一个内部的任务表,每当符合条件就往表里添加一个任务。
使用fcntl(内置库),在系统中自动生成锁文件,这样进程就只会启动一个线程,没有什么副作用。
要注意如果任务出错线程会直接崩掉所以不要拿来做非常重要的事情

Click to see more

这是任务

import time  
from datetime import datetime  
  
import schedule  
from sqlalchemy import and_  
  
from app.db.session import SessionLocal  
from app.modules.user.user_models import PushWXMsgInfoDelay  
from app.modules.user.wx_request_msg import push_wx_msg  
  
  
# import your model here  
  
# your class&function here  
def push_wx_notify():  
    with SessionLocal() as db:  
        today = datetime.today()  
  
        # 将日期转换为 datetime 类型  
        today_start = datetime(today.year, today.month, today.day, 0, 0, 0)  
        # 获取今天的结束时间(23:59:59.999999)  
        today_end = datetime(today.year, today.month, today.day, 23, 59, 59, 999999)  
        # 查询 push_date 是今天的所有记录  
        results = db.query(PushWXMsgInfoDelay).filter(  
            and_(  
                PushWXMsgInfoDelay.push_date >= today_start,  
                PushWXMsgInfoDelay.push_date < today_end,  
                PushWXMsgInfoDelay.pushed == 0  
            )  
        ).all()  
        for push in results:  
            print("延时推送:", push)  
            result = push_wx_msg(**push.param)  
            print("推送结果:",result)  
        db.query(PushWXMsgInfoDelay).filter(  
            and_(  
                PushWXMsgInfoDelay.push_date >= today_start,  
                PushWXMsgInfoDelay.push_date < today_end,  
                PushWXMsgInfoDelay.pushed == 0  
            )  
        ).update({"pushed": 1})  
        db.commit()  
        print("update record !")  
        print(f"Task executed at now {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")  
  
  
def schedule_task():  
    # 每天7点执行  
    schedule.every().day.at("07:00").do(push_wx_notify)  
  
    while True:  
        schedule.run_pending()  
        time.sleep(3)  # 3 秒检查一次,不会占用过多CPU

这是fastapi启动的时候,一种多进程的实践

import fcntl  
import os  
import threading

@app.on_event("startup")  
def start_scheduler():  
    lock_file = "/tmp/scheduler.lock"  # 锁文件的路径,可以根据需要调整  
    lock_file_descriptor = None  
  
    try:  
        # 打开锁文件  
        lock_file_descriptor = os.open(lock_file, os.O_CREAT | os.O_RDWR)  
  
        # 尝试获取文件锁  
        fcntl.flock(lock_file_descriptor, fcntl.LOCK_EX | fcntl.LOCK_NB)  
        print("Acquired lock, starting scheduler thread.")  
  
        # 如果成功获取锁,启动守护线程  
        task_thread = threading.Thread(target=schedule_task)  
        task_thread.daemon = True  # 确保程序退出时线程也能自动结束  
        task_thread.start()  
    except BlockingIOError:  
        # 如果未能获取锁,表示另一个worker已经启动了线程  
        print("Lock already acquired by another worker. Skipping scheduler thread.")  
    finally:  
        if lock_file_descriptor:  
            # 保持锁的文件描述符打开,直到程序结束,以防止其他进程获取锁  
            pass


  1. Correct name for json_serializer / json_deserializer, document and test (I1dbfe439) · Gerrit Code Reviewopen in new window
    ↩︎

  2. Error when querying JSON columns with wide unicode characters. · Issue #4798 · sqlalchemy/sqlalchemy · GitHubopen in new window
    ↩︎

  3. 引擎配置 — SQLAlchemy 1.4 Documentationopen in new window ↩︎

Loading...