卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章64334本站已运行4115

当关系的一侧已存在于数据库中时,使用 SQLModel 插入多对多关系对象

当关系的一侧已存在于数据库中时,使用 sqlmodel 插入多对多关系对象

问题内容

我正在尝试使用 sqlmodel 在数据库中插入记录,其中数据如下所示。 一个 house 对象,它有颜色和许多位置。 地点也将与许多房屋相关联。输入为:

[
    {
        "color": "red",
        "locations": [
            {"type": "country", "name": "netherlands"},
            {"type": "municipality", "name": "amsterdam"},
        ],
    },
    {
        "color": "green",
        "locations": [
            {"type": "country", "name": "netherlands"},
            {"type": "municipality", "name": "amsterdam"},
        ],
    },
]

这是我正在尝试做的事情的可重现示例:

import asyncio
from typing import list

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlmodel import field, relationship, sqlmodel, uniqueconstraint
from sqlmodel.ext.asyncio.session import asyncsession

database_url = "sqlite+aiosqlite:///./database.db"


engine = create_async_engine(database_url, echo=true, future=true)


async def init_db() -> none:
    async with engine.begin() as conn:
        await conn.run_sync(sqlmodel.metadata.create_all)


sessionlocal = sessionmaker(
    autocommit=false,
    autoflush=false,
    bind=engine,
    class_=asyncsession,
    expire_on_commit=false,
)


class houselocationlink(sqlmodel, table=true):
    house_id: int = field(foreign_key="house.id", nullable=false, primary_key=true)
    location_id: int = field(
        foreign_key="location.id", nullable=false, primary_key=true
    )


class location(sqlmodel, table=true):
    id: int = field(primary_key=true)
    type: str  # country, county, municipality, district, city, area, street, etc
    name: str  # amsterdam, germany, my street, etc

    houses: list["house"] = relationship(
        back_populates="locations",
        link_model=houselocationlink,
    )

    __table_args__ = (uniqueconstraint("type", "name"),)


class house(sqlmodel, table=true):
    id: int = field(primary_key=true)
    color: str = field()
    locations: list["location"] = relationship(
        back_populates="houses",
        link_model=houselocationlink,
    )
    # other fields...


data = [
    {
        "color": "red",
        "locations": [
            {"type": "country", "name": "netherlands"},
            {"type": "municipality", "name": "amsterdam"},
        ],
    },
    {
        "color": "green",
        "locations": [
            {"type": "country", "name": "netherlands"},
            {"type": "municipality", "name": "amsterdam"},
        ],
    },
]


async def add_houses(payload) -> list[house]:
    result = []
    async with sessionlocal() as session:
        for item in payload:
            locations = []
            for location in item["locations"]:
                locations.append(location(**location))
            house = house(color=item["color"], locations=locations)
            result.append(house)
        session.add_all(result)
        await session.commit()


asyncio.run(init_db())
asyncio.run(add_houses(data))

问题是,当我运行此代码时,它尝试将重复的位置对象与房屋对象一起插入。 我希望能够在这里使用 relationship,因为它使访问 house.locations 变得非常容易。

但是,我无法弄清楚如何阻止它尝试插入重复的位置。理想情况下,我有一个映射器函数来执行 get_or_create 位置。

我所见过的最能实现这一点的是 sqlalchemy 的关联代理。但看起来 sqlmodel 不支持这一点。

有人知道如何实现这一目标吗?如果您知道如何使用 sqlalchemy 而不是 sqlmodel 来完成此操作,我有兴趣查看您的解决方案。我还没有开始这个项目,所以如果它能让我的生活更轻松的话,我不妨使用 sqlalchemy。

我还尝试使用 sa_relationship_kwargs 进行调整,例如

sa_relationship_kwargs={
    "lazy": "selectin",
    "cascade": "none",
    "viewonly": "true",
}

但这会阻止将关联条目添加到 houselocationlink 表中。

任何指示将不胜感激。即使这意味着完全改变我的方法。

谢谢!


正确答案


我正在编写这个解决方案,因为您提到您愿意使用 sqlalchemy。正如您所提到的,您需要关联代理,但您还需要“唯一对象”。我已将其调整为异步查询(而不是同步)功能,与我的个人偏好保持一致,所有这些都没有显着改变逻辑。

import asyncio
from sqlalchemy import UniqueConstraint, ForeignKey, select, text, func
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped, relationship
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.associationproxy import AssociationProxy, association_proxy

class Base(DeclarativeBase):
    pass

class UniqueMixin:
    cache = {}

    @classmethod
    async def as_unique(cls, session: AsyncSession, *args, **kwargs):
        key = cls, cls.unique_hash(*args, **kwargs)
        if key in cls.cache:
            return cls.cache[key]
        with session.no_autoflush:
            statement = select(cls).where(cls.unique_filter(*args, **kwargs)).limit(1)
            obj = (await session.scalars(statement)).first()
            if obj is None:
                obj = cls(*args, **kwargs)
                session.add(obj)
        cls.cache[key] = obj
        return obj

    @classmethod
    def unique_hash(cls, *args, **kwargs):
        raise NotImplementedError("Implement this in subclass")

    @classmethod
    def unique_filter(cls, *args, **kwargs):
        raise NotImplementedError("Implement this in subclass")

class Location(UniqueMixin, Base):
    __tablename__ = "location"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column()
    type: Mapped[str] = mapped_column()
    house_associations: Mapped[list["HouseLocationLink"]] = relationship(back_populates="location")
    __table_args = (UniqueConstraint(type, name),)

    @classmethod
    def unique_hash(cls, name, type):
        # this is the key for the dict
        return type, name

    @classmethod
    def unique_filter(cls, name, type):
        # this is how you want to establish the uniqueness
        # the result of this filter will be the value in the dict
        return (cls.type == type) & (cls.name == name)

class House(Base):
    __tablename__ = "house"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column()
    location_associations: Mapped[list["HouseLocationLink"]] = relationship(back_populates="house")
    locations: AssociationProxy[list[Location]] = association_proxy(
        "location_associations",
        "location",
        # you need this so you can directly add ``Location`` objects to ``House``
        creator=lambda location: HouseLocationLink(location=location),
    )

class HouseLocationLink(Base):
    __tablename__ = "houselocationlink"
    house_id: Mapped[int] = mapped_column(ForeignKey(House.id), primary_key=True)
    location_id: Mapped[int] = mapped_column(ForeignKey(Location.id), primary_key=True)
    location: Mapped[Location] = relationship(back_populates="house_associations")
    house: Mapped[House] = relationship(back_populates="location_associations")

engine = create_async_engine("sqlite+aiosqlite:///test.sqlite")

async def main():
    data = [
        {
            "name": "red",
            "locations": [
                {"type": "country", "name": "Netherlands"},
                {"type": "municipality", "name": "Amsterdam"},
            ],
        },
        {
            "name": "green",
            "locations": [
                {"type": "country", "name": "Netherlands"},
                {"type": "municipality", "name": "Amsterdam"},
            ],
        },
    ]

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async with AsyncSession(engine) as session, session.begin():
        for item in data:
            house = House(
                name=item["name"],
                locations=[await Location.as_unique(session, **location) for location in item["locations"]]
            )
            session.add(house)

    async with AsyncSession(engine) as session:
        statement = select(func.count(text("*")), Location)
        assert await session.scalar(statement) == 2

        statement = select(func.count(text("*")), House)
        assert await session.scalar(statement) == 2

        statement = select(func.count(text("*")), HouseLocationLink)
        assert await session.scalar(statement) == 4


asyncio.run(main())

您可以注意到断言确实通过,没有违反唯一约束,也没有多次插入。我留下了一些内联注释,其中提到了这段代码的“关键”方面。如果多次运行此代码,您会注意到仅添加了新的 house 对象和相应的 houselocationlink,而没有添加新的 location 对象。对于每个键值对,只会进行一次查询来缓存此行为。

卓越飞翔博客
上一篇: 增加 Go 中的 gRPC 超时
下一篇: 在 go 中打印用户输入的字符串 n 次
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏