import datetime from sqlalchemy.orm import Session from pydantic import BaseModel from typing import List, Optional, Tuple, Dict from . import schema from sqlalchemy import func, or_ from sqlalchemy.orm import aliased from sqlalchemy.orm import Query from collections import defaultdict # # 创建一个别名的Address # AddressAlias = aliased(Address) # # 创建一个子查询 # subquery = session.query( # func.count(AddressAlias.id).label('address_count'), # AddressAlias.user_id # ).group_by(AddressAlias.user_id).subquery() # # 使用子查询和连接查询 # users = session.query(User, subquery.c.address_count).\ # outerjoin(subquery, User.id == subquery.c.user_id).\ # order_by(User.id).all() # for user, address_count in users: # print(f'User {user.name} has {address_count} addresses.') # region User class UserBase(BaseModel): email: str is_active: bool class UserCreate(UserBase): pass class UserUpdate(UserBase): hashed_password: Optional[str] # This is optional because you may not always want to update the password class User(UserBase): id: str def create_user(db: Session, email: str, password: str): db_user = schema.User(email=email, is_active=True) db_user.set_password(password) db.add(db_user) db.commit() db.refresh(db_user) return db_user def get_user(db: Session, user_id: str) -> Optional[schema.User]: return db.query(schema.User).filter(schema.User.id == user_id).first() def get_user_by_email(db: Session, email: str) -> Optional[schema.User]: return db.query(schema.User).filter(schema.User.email == email).first() def get_users(db: Session, skip: int = 0, limit: int = 10) -> List[schema.User]: return db.query(schema.User).offset(skip).limit(limit).all() def get_all_users(db: Session) -> List[schema.User]: return db.query(schema.User).all() def update_user(db: Session, user_id: str, user: UserUpdate): db_user = db.query(schema.User).filter(schema.User.id == user_id).first() if db_user: for key, value in user.dict(exclude_unset=True).items(): setattr(db_user, key, value) db.commit() db.refresh(db_user) return db_user def delete_user(db: Session, user_id: str): db_user = db.query(schema.User).filter(schema.User.id == user_id).first() if db_user: db.delete(db_user) db.commit() return db_user # endregion # region Item class ItemBase(BaseModel): title: str description: str class ItemCreate(ItemBase): pass class ItemUpdate(ItemBase): pass class Item(ItemBase): id: str owner_id: str def create_item(db: Session, item: ItemCreate, owner_id: str): db_item = schema.Item(**item.dict(), owner_id=owner_id) db.add(db_item) db.commit() db.refresh(db_item) return db_item def get_item(db: Session, item_id: str): return db.query(schema.Item).filter(schema.Item.id == item_id).first() def get_items(db: Session, skip: int = 0, limit: int = 10): return db.query(schema.Item).offset(skip).limit(limit).all() def update_item(db: Session, item_id: str, item: ItemUpdate): db_item = db.query(schema.Item).filter(schema.Item.id == item_id).first() if db_item: for key, value in item.dict(exclude_unset=True).items(): setattr(db_item, key, value) db.commit() db.refresh(db_item) return db_item def delete_item(db: Session, item_id: str): db_item = db.query(schema.Item).filter(schema.Item.id == item_id).first() if db_item: db.delete(db_item) db.commit() return db_item # endregion # region UserBook class UserBookBase(BaseModel): owner_id: str book_id: str title: str random: bool batch_size: int memorizing_batch: str = "" class UserBookCreate(UserBookBase): pass class UserBookUpdate(UserBookBase): pass class UserBook(UserBookBase): id: str def create_user_book(db: Session, user_book: UserBookCreate): db_user_book = schema.UserBook(**user_book.dict()) db.add(db_user_book) db.commit() db.refresh(db_user_book) return db_user_book def get_user_book(db: Session, user_book_id: str) -> schema.UserBook: return db.query(schema.UserBook).filter(schema.UserBook.id == user_book_id).first() def get_user_books_by_owner_id(db: Session, owner_id: str) -> List[schema.UserBook]: return db.query(schema.UserBook).filter(schema.UserBook.owner_id == owner_id).all() def get_user_books(db: Session, skip: int = 0, limit: int = 10) -> List[schema.UserBook]: return db.query(schema.UserBook).offset(skip).limit(limit).all() def update_user_book(db: Session, user_book_id: str, user_book: UserBookUpdate): db_user_book = db.query(schema.UserBook).filter(schema.UserBook.id == user_book_id).first() if db_user_book: for key, value in user_book.dict(exclude_unset=True).items(): setattr(db_user_book, key, value) db.commit() db.refresh(db_user_book) return db_user_book def update_user_book_memorizing_batch(db: Session, user_book_id: str, memorizing_batch: str): db_user_book = db.query(schema.UserBook).filter(schema.UserBook.id == user_book_id).first() if db_user_book: db_user_book.memorizing_batch = memorizing_batch db.commit() db.refresh(db_user_book) return db_user_book def delete_user_book(db: Session, user_book_id: str): db_user_book = db.query(schema.UserBook).filter(schema.UserBook.id == user_book_id).first() if db_user_book: db.delete(db_user_book) db.commit() return db_user_book # endregion # region UserMemoryBatch class UserMemoryBatchBase(BaseModel): user_book_id: str story: str translated_story: str batch_type: str = "新词" class UserMemoryBatchCreate(UserMemoryBatchBase): pass class UserMemoryBatchUpdate(UserMemoryBatchBase): pass class UserMemoryBatch(UserMemoryBatchBase): id: str def create_user_memory_batch(db: Session, memory_batch: UserMemoryBatchCreate): db_memory_batch = schema.UserMemoryBatch(**memory_batch.dict()) db.add(db_memory_batch) db.commit() db.refresh(db_memory_batch) return db_memory_batch def get_user_memory_batch(db: Session, memory_batch_id: str): return db.query(schema.UserMemoryBatch).filter(schema.UserMemoryBatch.id == memory_batch_id).first() def get_user_memory_batchs(db: Session, skip: int = 0, limit: int = 10): return db.query(schema.UserMemoryBatch).offset(skip).limit(limit).all() def get_user_memory_batches_by_user_book_id(db: Session, user_book_id: str) -> List[schema.UserMemoryBatch]: return db.query(schema.UserMemoryBatch).filter( schema.UserMemoryBatch.user_book_id == user_book_id ).order_by(schema.UserMemoryBatch.create_time).all() def get_new_user_memory_batches_by_user_book_id(db: Session, user_book_id: str) -> List[schema.UserMemoryBatch]: return db.query(schema.UserMemoryBatch).filter( schema.UserMemoryBatch.user_book_id == user_book_id, schema.UserMemoryBatch.batch_type == "新词", ).order_by(schema.UserMemoryBatch.create_time).all() def actions_infomation(db: Session, action_query: Query[schema.UserMemoryBatchAction]): distinct_actions = action_query.distinct().subquery() batches = db.query(schema.UserMemoryBatch).join(distinct_actions, distinct_actions.c.batch_id == schema.UserMemoryBatch.id).all() batch_id_to_batch = {batch.id: batch for batch in batches} batch_id_to_words = {batch.id: get_words_in_batch(db, batch.id) for batch in batches} return batches, batch_id_to_batch, batch_id_to_words def get_user_memory_batch_history(db: Session, user_book_id: str): action_query = db.query(schema.UserMemoryBatchAction).filter(schema.UserMemoryBatchAction.action == "end").join( schema.UserMemoryBatch, schema.UserMemoryBatch.id == schema.UserMemoryBatchAction.batch_id ).filter(schema.UserMemoryBatch.user_book_id == user_book_id) actions = action_query.order_by(schema.UserMemoryBatchAction.create_time).all() distinct_actions = action_query.distinct().subquery() batches = db.query(schema.UserMemoryBatch).join(distinct_actions, distinct_actions.c.batch_id == schema.UserMemoryBatch.id).all() batch_id_to_batch = {batch.id: batch for batch in batches} batch_id_to_words = {batch.id: get_words_in_batch(db, batch.id) for batch in batches} batch_id_to_actions = {batch.id: get_user_memory_actions_in_batch(db, batch.id) for batch in batches} return actions, batch_id_to_batch, batch_id_to_words, batch_id_to_actions def get_user_memory_batch_history_in_minutes(db: Session, user_book_id: str, minutes: int): action_query = db.query(schema.UserMemoryBatchAction).filter(schema.UserMemoryBatchAction.action == "end").join( schema.UserMemoryBatch, schema.UserMemoryBatch.id == schema.UserMemoryBatchAction.batch_id ).filter( schema.UserMemoryBatch.user_book_id == user_book_id, schema.UserMemoryBatchAction.create_time > datetime.datetime.now() - datetime.timedelta(minutes=minutes), ).limit(20) actions = action_query.order_by(schema.UserMemoryBatchAction.create_time).all() distinct_actions = action_query.distinct().subquery() batches = db.query(schema.UserMemoryBatch).join(distinct_actions, distinct_actions.c.batch_id == schema.UserMemoryBatch.id).all() batch_id_to_batch = {batch.id: batch for batch in batches} batch_id_to_words = {batch.id: get_words_in_batch(db, batch.id) for batch in batches} # batch_actions = db.query(schema.UserMemoryBatchAction).join(distinct_actions, distinct_actions.c.batch_id == schema.UserMemoryBatchAction.batch_id).all() # return actions, batch_id_to_batch, batch_id_to_words, batch_actions return actions, batch_id_to_batch, batch_id_to_words def get_user_memory_word_history_in_minutes(db: Session, user_book_id: str, minutes: int): action_query = db.query(schema.UserMemoryBatchAction).filter(schema.UserMemoryBatchAction.action == "end").join( schema.UserMemoryBatch, schema.UserMemoryBatch.id == schema.UserMemoryBatchAction.batch_id ).filter( schema.UserMemoryBatch.user_book_id == user_book_id, # schema.UserMemoryBatchAction.create_time > datetime.datetime.now() - datetime.timedelta(minutes=minutes), ).limit(200) distinct_actions = action_query.distinct().subquery() batches = db.query(schema.UserMemoryBatch).join(distinct_actions, distinct_actions.c.batch_id == schema.UserMemoryBatch.id).all() words = [get_words_in_batch(db, batch.id) for batch in batches] words = sum(words, []) return words def update_user_memory_batch(db: Session, memory_batch_id: str, memory_batch: UserMemoryBatchUpdate): db_memory_batch = db.query(schema.UserMemoryBatch).filter(schema.UserMemoryBatch.id == memory_batch_id).first() if db_memory_batch: for key, value in memory_batch.dict(exclude_unset=True).items(): setattr(db_memory_batch, key, value) db.commit() db.refresh(db_memory_batch) return db_memory_batch def delete_user_memory_batch(db: Session, memory_batch_id: str): db_memory_batch = db.query(schema.UserMemoryBatch).filter(schema.UserMemoryBatch.id == memory_batch_id).first() if db_memory_batch: db.delete(db_memory_batch) db.commit() return db_memory_batch # endregion # region UserMemoryBatchAction class UserMemoryBatchActionBase(BaseModel): batch_id: str action: str class UserMemoryBatchActionCreate(UserMemoryBatchActionBase): pass class UserMemoryBatchActionUpdate(UserMemoryBatchActionBase): pass class UserMemoryBatchAction(UserMemoryBatchActionBase): id: str create_time: str update_time: str def create_user_memory_batch_action(db: Session, memory_batch_action: UserMemoryBatchActionCreate): db_memory_batch_action = schema.UserMemoryBatchAction(**memory_batch_action.dict()) db.add(db_memory_batch_action) db.commit() db.refresh(db_memory_batch_action) return db_memory_batch_action def get_user_memory_batch_action(db: Session, memory_batch_action_id: str): return db.query(schema.UserMemoryBatchAction).filter(schema.UserMemoryBatchAction.id == memory_batch_action_id).first() def get_user_memory_batch_actions(db: Session, skip: int = 0, limit: int = 10) -> List[schema.UserMemoryBatchAction]: return db.query(schema.UserMemoryBatchAction).offset(skip).limit(limit).all() def get_user_memory_batch_actions_by_user_memory_batch_id(db: Session, user_memory_batch_id: str) -> List[schema.UserMemoryBatchAction]: return db.query(schema.UserMemoryBatchAction).filter(schema.UserMemoryBatchAction.batch_id == user_memory_batch_id).all() def get_actions_at_each_batch(db: Session, memory_batch_ids: List[str]) -> List[schema.UserMemoryBatchAction]: return db.query(schema.UserMemoryBatchAction).filter(schema.UserMemoryBatchAction.batch_id.in_(memory_batch_ids)).all() def get_finished_actions_at_each_batch(db: Session, memory_batch_ids: List[str]) -> List[schema.UserMemoryBatchAction]: return db.query(schema.UserMemoryBatchAction).filter( schema.UserMemoryBatchAction.batch_id.in_(memory_batch_ids), schema.UserMemoryBatchAction.action == "end", ).order_by(schema.UserMemoryBatchAction.create_time).all() def update_user_memory_batch_action(db: Session, memory_batch_action_id: str, memory_batch_action: UserMemoryBatchActionUpdate): db_memory_batch_action = db.query(schema.UserMemoryBatchAction).filter(schema.UserMemoryBatchAction.id == memory_batch_action_id).first() if db_memory_batch_action: for key, value in memory_batch_action.dict(exclude_unset=True).items(): setattr(db_memory_batch_action, key, value) db.commit() db.refresh(db_memory_batch_action) return db_memory_batch_action def delete_user_memory_batch_action(db: Session, memory_batch_action_id: str): db_memory_batch_action = db.query(schema.UserMemoryBatchAction).filter(schema.UserMemoryBatchAction.id == memory_batch_action_id).first() if db_memory_batch_action: db.delete(db_memory_batch_action) db.commit() return db_memory_batch_action # endregion # region UserMemoryBatchGenerationHistory class UserMemoryBatchGenerationHistoryBase(BaseModel): batch_id: str story: str translated_story: str class UserMemoryBatchGenerationHistoryCreate(UserMemoryBatchGenerationHistoryBase): pass class UserMemoryBatchGenerationHistoryUpdate(UserMemoryBatchGenerationHistoryBase): pass class UserMemoryBatchGenerationHistory(UserMemoryBatchGenerationHistoryBase): id: str create_time: str update_time: str def create_user_memory_batch_generation_history(db: Session, memory_batch_generation_history: UserMemoryBatchGenerationHistoryCreate): db_memory_batch_generation_history = schema.UserMemoryBatchGenerationHistory(**memory_batch_generation_history.dict()) db.add(db_memory_batch_generation_history) db.commit() db.refresh(db_memory_batch_generation_history) return db_memory_batch_generation_history def get_user_memory_batch_generation_history(db: Session, memory_batch_generation_history_id: str): return db.query(schema.UserMemoryBatchGenerationHistory).filter(schema.UserMemoryBatchGenerationHistory.id == memory_batch_generation_history_id).first() def get_user_memory_batch_generation_historys(db: Session, skip: int = 0, limit: int = 10) -> List[schema.UserMemoryBatchGenerationHistory]: return db.query(schema.UserMemoryBatchGenerationHistory).offset(skip).limit(limit).all() def get_user_memory_batch_generation_historys_by_user_memory_batch_id(db: Session, user_memory_batch_id: str) -> List[schema.UserMemoryBatchGenerationHistory]: return db.query(schema.UserMemoryBatchGenerationHistory).filter(schema.UserMemoryBatchGenerationHistory.batch_id == user_memory_batch_id).all() def get_generation_historys_at_each_batch(db: Session, memory_batch_ids: List[str]) -> List[schema.UserMemoryBatchGenerationHistory]: return db.query(schema.UserMemoryBatchGenerationHistory).filter(schema.UserMemoryBatchGenerationHistory.batch_id.in_(memory_batch_ids)).all() def get_generation_hostorys_by_user_book_id(db: Session, user_book_id: str) -> Dict[str, Tuple[List[schema.Word], List[schema.UserMemoryBatchGenerationHistory]]]: batches = get_user_memory_batches_by_user_book_id(db, user_book_id) batch_ids = [batch.id for batch in batches] batch_id_to_words_and_history = {} for batch_id in batch_ids: historys = get_user_memory_batch_generation_historys_by_user_memory_batch_id(db, batch_id) if len(historys) == 0: continue words = get_words_in_batch(db, batch_id) batch_id_to_words_and_history[batch_id] = (words, historys) return batch_id_to_words_and_history def update_user_memory_batch_generation_history(db: Session, memory_batch_generation_history_id: str, memory_batch_generation_history: UserMemoryBatchGenerationHistoryUpdate): db_memory_batch_generation_history = db.query(schema.UserMemoryBatchGenerationHistory).filter(schema.UserMemoryBatchGenerationHistory.id == memory_batch_generation_history_id).first() if db_memory_batch_generation_history: for key, value in memory_batch_generation_history.dict(exclude_unset=True).items(): setattr(db_memory_batch_generation_history, key, value) db.commit() db.refresh(db_memory_batch_generation_history) return db_memory_batch_generation_history def delete_user_memory_batch_generation_history(db: Session, memory_batch_generation_history_id: str): db_memory_batch_generation_history = db.query(schema.UserMemoryBatchGenerationHistory).filter(schema.UserMemoryBatchGenerationHistory.id == memory_batch_generation_history_id).first() if db_memory_batch_generation_history: db.delete(db_memory_batch_generation_history) db.commit() return db_memory_batch_generation_history # endregion # region UserMemoryWord class UserMemoryWordBase(BaseModel): batch_id: str word_id: str class UserMemoryWordCreate(UserMemoryWordBase): pass class UserMemoryWordUpdate(UserMemoryWordBase): pass class UserMemoryWord(UserMemoryWordBase): id: str def create_user_memory_word(db: Session, memory_word: UserMemoryWordCreate): db_memory_word = schema.UserMemoryWord(**memory_word.dict()) db.add(db_memory_word) db.commit() db.refresh(db_memory_word) return db_memory_word def get_user_memory_word(db: Session, memory_word_id: str): return db.query(schema.UserMemoryWord).filter(schema.UserMemoryWord.id == memory_word_id).first() def get_user_memory_words(db: Session, skip: int = 0, limit: int = 10) -> List[schema.UserMemoryWord]: return db.query(schema.UserMemoryWord).offset(skip).limit(limit).all() def get_user_memory_words_by_batch_id(db: Session, batch_id: str) -> List[schema.UserMemoryWord]: return db.query(schema.UserMemoryWord).filter(schema.UserMemoryWord.batch_id == batch_id).all() def update_user_memory_word(db: Session, memory_word_id: str, memory_word: UserMemoryWordUpdate): db_memory_word = db.query(schema.UserMemoryWord).filter(schema.UserMemoryWord.id == memory_word_id).first() if db_memory_word: for key, value in memory_word.dict(exclude_unset=True).items(): setattr(db_memory_word, key, value) db.commit() db.refresh(db_memory_word) return db_memory_word def delete_user_memory_word(db: Session, memory_word_id: str): db_memory_word = db.query(schema.UserMemoryWord).filter(schema.UserMemoryWord.id == memory_word_id).first() if db_memory_word: db.delete(db_memory_word) db.commit() return db_memory_word # endregion # region UserMemoryAction class UserMemoryActionBase(BaseModel): batch_id: str word_id: str action: str class UserMemoryActionCreate(UserMemoryActionBase): pass class UserMemoryActionUpdate(UserMemoryActionBase): pass class UserMemoryAction(UserMemoryActionBase): id: str create_time: str update_time: str def create_user_memory_action(db: Session, memory_action: UserMemoryActionCreate): db_memory_action = schema.UserMemoryAction(**memory_action.dict()) db.add(db_memory_action) db.commit() db.refresh(db_memory_action) return db_memory_action def get_user_memory_action(db: Session, memory_action_id: str) -> schema.UserMemoryAction: return db.query(schema.UserMemoryAction).filter(schema.UserMemoryAction.id == memory_action_id).first() def get_user_memory_actions(db: Session, skip: int = 0, limit: int = 10)-> List[schema.UserMemoryAction]: return db.query(schema.UserMemoryAction).offset(skip).limit(limit).all() def get_user_memory_actions_by_word_id(db: Session, word_id: str)-> List[schema.UserMemoryAction]: return db.query(schema.UserMemoryAction).filter(schema.UserMemoryAction.word_id == word_id).all() def get_actions_at_each_word(db: Session, word_ids: List[str]) -> List[schema.UserMemoryAction]: return db.query(schema.UserMemoryAction).filter(schema.UserMemoryAction.word_id.in_(word_ids)).all() def get_user_memory_actions_in_batch(db: Session, batch_id: str) -> List[schema.UserMemoryAction]: return db.query(schema.UserMemoryAction).filter(schema.UserMemoryAction.batch_id == batch_id).all() def update_user_memory_action(db: Session, memory_action_id: str, memory_action: UserMemoryActionUpdate): db_memory_action = db.query(schema.UserMemoryAction).filter(schema.UserMemoryAction.id == memory_action_id).first() if db_memory_action: for key, value in memory_action.dict(exclude_unset=True).items(): setattr(db_memory_action, key, value) db.commit() db.refresh(db_memory_action) return db_memory_action def delete_user_memory_action(db: Session, memory_action_id: str): db_memory_action = db.query(schema.UserMemoryAction).filter(schema.UserMemoryAction.id == memory_action_id).first() if db_memory_action: db.delete(db_memory_action) db.commit() return db_memory_action # endregion # region Book class BookBase(BaseModel): bk_order: float = 0 bk_name: str bk_item_num: int bk_author: str = "" bk_comment: str = "" bk_organization: str = "" bk_publisher: str = "" bk_version: str = "" permission: str = "private" creator: str class BookCreate(BookBase): pass class BookUpdate(BookBase): pass class Book(BookBase): bk_id: str def create_book(db: Session, book: BookCreate): db_book = schema.Book(**book.dict()) db.add(db_book) db.commit() db.refresh(db_book) return db_book def get_book(db: Session, book_id: str): return db.query(schema.Book).filter(schema.Book.bk_id == book_id).first() def get_book_by_name(db: Session, book_name: str): return db.query(schema.Book).filter(schema.Book.bk_name == book_name).first() def get_books(db: Session, skip: int = 0, limit: int = 10): return db.query(schema.Book).offset(skip).limit(limit).all() def get_all_books(db: Session): return db.query(schema.Book).all() def get_all_books_for_user(db: Session, user_id: str): return db.query(schema.Book).filter( or_(schema.Book.creator == user_id, schema.Book.permission == "public") ).order_by(schema.Book.permission, schema.Book.create_time).all() def get_book_count(db: Session): return db.query(schema.Book).count() def update_book(db: Session, book_id: str, book: BookUpdate): db_book = db.query(schema.Book).filter(schema.Book.bk_id == book_id).first() if db_book: for key, value in book.dict(exclude_unset=True).items(): setattr(db_book, key, value) db.commit() db.refresh(db_book) return db_book def delete_book(db: Session, book_id: str): db_book = db.query(schema.Book).filter(schema.Book.bk_id == book_id).first() if db_book: db.delete(db_book) db.commit() return db_book # endregion # region Unit class UnitBase(BaseModel): bv_book_id: str bv_voc_id: str bv_flag: int = 1 bv_tag: str = "" bv_order: int = 1 class UnitCreate(UnitBase): pass class UnitUpdate(UnitBase): pass class Unit(UnitBase): pass def create_unit(db: Session, unit: UnitCreate): db_unit = schema.Unit(**unit.dict()) db.add(db_unit) db.commit() db.refresh(db_unit) return db_unit def get_unit(db: Session, unit_id: str): return db.query(schema.Unit).filter(schema.Unit.bv_id == unit_id).first() def get_units(db: Session, skip: int = 0, limit: int = 10): return db.query(schema.Unit).offset(skip).limit(limit).all() def update_unit(db: Session, unit_id: str, unit: UnitUpdate): db_unit = db.query(schema.Unit).filter(schema.Unit.bv_id == unit_id).first() if db_unit: for key, value in unit.dict(exclude_unset=True).items(): setattr(db_unit, key, value) db.commit() db.refresh(db_unit) return db_unit def delete_unit(db: Session, unit_id: str): db_unit = db.query(schema.Unit).filter(schema.Unit.bv_id == unit_id).first() if db_unit: db.delete(db_unit) db.commit() return db_unit # endregion # region Word class WordBase(BaseModel): vc_id: str vc_vocabulary: str vc_phonetic_uk: str vc_phonetic_us: str vc_frequency: float vc_difficulty: float vc_acknowledge_rate: float class WordCreate(WordBase): pass class WordUpdate(WordBase): pass class Word(WordBase): pass def create_word(db: Session, word: WordCreate, unit_id: str): db_word = schema.Word(**word.dict(), vc_id=unit_id) db.add(db_word) db.commit() db.refresh(db_word) return db_word def get_word(db: Session, word_id: str): return db.query(schema.Word).filter(schema.Word.vc_id == word_id).first() def get_words(db: Session, skip: int = 0, limit: int = 10) -> List[schema.Word]: return db.query(schema.Word).offset(skip).limit(limit).all() def get_words_by_vocabulary(db: Session, vocabulary: List[str]) -> List[schema.Word]: return db.query(schema.Word).filter(schema.Word.vc_vocabulary.in_(vocabulary)).all() def get_words_by_ids(db: Session, ids: List[str]) -> List[schema.Word]: return db.query(schema.Word).filter(schema.Word.vc_id.in_(ids)).all() def get_words_in_batch(db: Session, batch_id: str) -> List[schema.Word]: return db.query(schema.Word).join(schema.UserMemoryWord, schema.UserMemoryWord.word_id == schema.Word.vc_id).filter(schema.UserMemoryWord.batch_id == batch_id).all() def get_words_at_each_batch(db: Session, batch_ids: List[str]) -> List[schema.Word]: return db.query(schema.Word).join(schema.UserMemoryWord, schema.UserMemoryWord.word_id == schema.Word.vc_id).filter(schema.UserMemoryWord.batch_id.in_(batch_ids)).all() def get_words_in_user_book(db: Session, user_book_id: str) -> List[schema.Word]: batches = get_new_user_memory_batches_by_user_book_id(db, user_book_id) batch_ids = [batch.id for batch in batches] return get_words_at_each_batch(db, batch_ids) def update_word(db: Session, word_id: str, word: WordUpdate): db_word = db.query(schema.Word).filter(schema.Word.vc_id == word_id).first() if db_word: for key, value in word.dict(exclude_unset=True).items(): setattr(db_word, key, value) db.commit() db.refresh(db_word) return db_word def delete_word(db: Session, word_id: str): db_word = db.query(schema.Word).filter(schema.Word.vc_id == word_id).first() if db_word: db.delete(db_word) db.commit() return db_word # endregion