dressed_for_succes_store/backend/app/services.py

482 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from sqlalchemy.orm import Session
from fastapi import HTTPException, status, UploadFile
from typing import List, Optional, Dict, Any, Union
from datetime import datetime, timedelta
import os
import uuid
import shutil
from pathlib import Path
from app.config import settings
from app.core import create_access_token, get_password_hash, verify_password
from app.repositories import (
user_repo, catalog_repo, order_repo, review_repo, content_repo
)
from app.schemas.user_schemas import UserCreate, UserUpdate, AddressCreate, AddressUpdate, Token
from app.schemas.catalog_schemas import (
CategoryCreate, CategoryUpdate,
ProductCreate, ProductUpdate,
ProductVariantCreate, ProductVariantUpdate,
ProductImageCreate, ProductImageUpdate
)
from app.schemas.order_schemas import CartItemCreate, CartItemUpdate, OrderCreate, OrderUpdate
from app.schemas.review_schemas import ReviewCreate, ReviewUpdate
from app.schemas.content_schemas import PageCreate, PageUpdate, AnalyticsLogCreate
# Сервисы аутентификации и пользователей
def register_user(db: Session, user: UserCreate) -> Dict[str, Any]:
# Создаем пользователя
db_user = user_repo.create_user(db, user)
# Создаем токен доступа
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": db_user.username}, expires_delta=access_token_expires
)
return {
"user": db_user,
"access_token": access_token,
"token_type": "bearer"
}
def login_user(db: Session, username: str, password: str) -> Dict[str, Any]:
# Аутентифицируем пользователя
user = user_repo.authenticate_user(db, username, password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
# Проверяем, что пользователь активен
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неактивный пользователь"
)
# Создаем токен доступа
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer"
}
def get_user_profile(db: Session, user_id: int) -> Dict[str, Any]:
user = user_repo.get_user(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Пользователь не найден"
)
# Получаем адреса пользователя
addresses = user_repo.get_user_addresses(db, user_id)
# Получаем заказы пользователя
orders = order_repo.get_user_orders(db, user_id)
# Получаем отзывы пользователя
reviews = review_repo.get_user_reviews(db, user_id)
return {
"user": user,
"addresses": addresses,
"orders": orders,
"reviews": reviews
}
def update_user_profile(db: Session, user_id: int, user_data: UserUpdate) -> Dict[str, Any]:
updated_user = user_repo.update_user(db, user_id, user_data)
return {"user": updated_user}
def add_user_address(db: Session, user_id: int, address: AddressCreate) -> Dict[str, Any]:
new_address = user_repo.create_address(db, address, user_id)
return {"address": new_address}
def update_user_address(db: Session, user_id: int, address_id: int, address: AddressUpdate) -> Dict[str, Any]:
updated_address = user_repo.update_address(db, address_id, address, user_id)
return {"address": updated_address}
def delete_user_address(db: Session, user_id: int, address_id: int) -> Dict[str, Any]:
success = user_repo.delete_address(db, address_id, user_id)
return {"success": success}
# Сервисы каталога
def create_category(db: Session, category: CategoryCreate) -> Dict[str, Any]:
new_category = catalog_repo.create_category(db, category)
return {"category": new_category}
def update_category(db: Session, category_id: int, category: CategoryUpdate) -> Dict[str, Any]:
updated_category = catalog_repo.update_category(db, category_id, category)
return {"category": updated_category}
def delete_category(db: Session, category_id: int) -> Dict[str, Any]:
success = catalog_repo.delete_category(db, category_id)
return {"success": success}
def get_category_tree(db: Session) -> List[Dict[str, Any]]:
# Получаем все категории верхнего уровня
root_categories = catalog_repo.get_categories(db, parent_id=None)
result = []
for category in root_categories:
# Рекурсивно получаем подкатегории
category_dict = {
"id": category.id,
"name": category.name,
"slug": category.slug,
"description": category.description,
"is_active": category.is_active,
"subcategories": _get_subcategories(db, category.id)
}
result.append(category_dict)
return result
def _get_subcategories(db: Session, parent_id: int) -> List[Dict[str, Any]]:
subcategories = catalog_repo.get_categories(db, parent_id=parent_id)
result = []
for category in subcategories:
category_dict = {
"id": category.id,
"name": category.name,
"slug": category.slug,
"description": category.description,
"is_active": category.is_active,
"subcategories": _get_subcategories(db, category.id)
}
result.append(category_dict)
return result
def create_product(db: Session, product: ProductCreate) -> Dict[str, Any]:
new_product = catalog_repo.create_product(db, product)
return {"product": new_product}
def update_product(db: Session, product_id: int, product: ProductUpdate) -> Dict[str, Any]:
updated_product = catalog_repo.update_product(db, product_id, product)
return {"product": updated_product}
def delete_product(db: Session, product_id: int) -> Dict[str, Any]:
success = catalog_repo.delete_product(db, product_id)
return {"success": success}
def get_product_details(db: Session, product_id: int) -> Dict[str, Any]:
product = catalog_repo.get_product(db, product_id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Продукт не найден"
)
# Получаем варианты продукта
variants = catalog_repo.get_product_variants(db, product_id)
# Получаем изображения продукта
images = catalog_repo.get_product_images(db, product_id)
# Получаем рейтинг продукта
rating = review_repo.get_product_rating(db, product_id)
# Получаем отзывы продукта
reviews = review_repo.get_product_reviews(db, product_id, limit=5)
return {
"product": product,
"variants": variants,
"images": images,
"rating": rating,
"reviews": reviews
}
def add_product_variant(db: Session, variant: ProductVariantCreate) -> Dict[str, Any]:
new_variant = catalog_repo.create_variant(db, variant)
return {"variant": new_variant}
def update_product_variant(db: Session, variant_id: int, variant: ProductVariantUpdate) -> Dict[str, Any]:
updated_variant = catalog_repo.update_variant(db, variant_id, variant)
return {"variant": updated_variant}
def delete_product_variant(db: Session, variant_id: int) -> Dict[str, Any]:
success = catalog_repo.delete_variant(db, variant_id)
return {"success": success}
def upload_product_image(db: Session, product_id: int, file: UploadFile, is_primary: bool = False) -> Dict[str, Any]:
# Проверяем, что продукт существует
product = catalog_repo.get_product(db, product_id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Продукт не найден"
)
# Проверяем расширение файла
file_extension = file.filename.split(".")[-1].lower()
if file_extension not in settings.ALLOWED_UPLOAD_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Неподдерживаемый формат файла. Разрешены: {', '.join(settings.ALLOWED_UPLOAD_EXTENSIONS)}"
)
# Создаем директорию для загрузок, если она не существует
upload_dir = Path(settings.UPLOAD_DIRECTORY) / "products" / str(product_id)
upload_dir.mkdir(parents=True, exist_ok=True)
# Генерируем уникальное имя файла
unique_filename = f"{uuid.uuid4()}.{file_extension}"
file_path = upload_dir / unique_filename
# Сохраняем файл
with file_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Создаем запись об изображении в БД
image_data = ProductImageCreate(
product_id=product_id,
image_url=f"/uploads/products/{product_id}/{unique_filename}",
alt_text=file.filename,
is_primary=is_primary
)
new_image = catalog_repo.create_image(db, image_data)
return {"image": new_image}
def update_product_image(db: Session, image_id: int, image: ProductImageUpdate) -> Dict[str, Any]:
updated_image = catalog_repo.update_image(db, image_id, image)
return {"image": updated_image}
def delete_product_image(db: Session, image_id: int) -> Dict[str, Any]:
# Получаем информацию об изображении перед удалением
image = catalog_repo.get_image(db, image_id)
if not image:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Изображение не найдено"
)
# Удаляем запись из БД
success = catalog_repo.delete_image(db, image_id)
# Удаляем файл с диска
if success:
try:
# Получаем путь к файлу из URL
file_path = Path(settings.UPLOAD_DIRECTORY) / image.image_url.lstrip("/uploads/")
if file_path.exists():
file_path.unlink()
except Exception:
# Если не удалось удалить файл, просто логируем ошибку
# В реальном приложении здесь должно быть логирование
pass
return {"success": success}
# Сервисы корзины и заказов
def add_to_cart(db: Session, user_id: int, cart_item: CartItemCreate) -> Dict[str, Any]:
new_cart_item = order_repo.create_cart_item(db, cart_item, user_id)
# Логируем событие добавления в корзину
log_data = AnalyticsLogCreate(
user_id=user_id,
event_type="add_to_cart",
product_id=cart_item.product_id,
additional_data={"quantity": cart_item.quantity}
)
content_repo.log_analytics_event(db, log_data)
return {"cart_item": new_cart_item}
def update_cart_item(db: Session, user_id: int, cart_item_id: int, cart_item: CartItemUpdate) -> Dict[str, Any]:
updated_cart_item = order_repo.update_cart_item(db, cart_item_id, cart_item, user_id)
return {"cart_item": updated_cart_item}
def remove_from_cart(db: Session, user_id: int, cart_item_id: int) -> Dict[str, Any]:
success = order_repo.delete_cart_item(db, cart_item_id, user_id)
return {"success": success}
def clear_cart(db: Session, user_id: int) -> Dict[str, Any]:
success = order_repo.clear_cart(db, user_id)
return {"success": success}
def get_cart(db: Session, user_id: int) -> Dict[str, Any]:
cart_items = order_repo.get_cart_with_product_details(db, user_id)
# Рассчитываем общую сумму корзины
total_amount = sum(item["total_price"] for item in cart_items)
return {
"items": cart_items,
"total_amount": total_amount,
"items_count": len(cart_items)
}
def create_order(db: Session, user_id: int, order: OrderCreate) -> Dict[str, Any]:
new_order = order_repo.create_order(db, order, user_id)
# Логируем событие создания заказа
log_data = AnalyticsLogCreate(
user_id=user_id,
event_type="order_created",
additional_data={"order_id": new_order.id, "total_amount": new_order.total_amount}
)
content_repo.log_analytics_event(db, log_data)
return {"order": new_order}
def get_order(db: Session, user_id: int, order_id: int, is_admin: bool = False) -> Dict[str, Any]:
# Получаем заказ с деталями
order_details = order_repo.get_order_with_details(db, order_id)
# Проверяем права доступа
if not is_admin and order_details["user_id"] != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для просмотра этого заказа"
)
return {"order": order_details}
def update_order(db: Session, user_id: int, order_id: int, order: OrderUpdate, is_admin: bool = False) -> Dict[str, Any]:
updated_order = order_repo.update_order(db, order_id, order, is_admin)
# Проверяем права доступа
if not is_admin and updated_order.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для обновления этого заказа"
)
return {"order": updated_order}
def cancel_order(db: Session, user_id: int, order_id: int) -> Dict[str, Any]:
# Отменяем заказ (обычный пользователь может только отменить заказ)
order_update = OrderUpdate(status="cancelled")
updated_order = order_repo.update_order(db, order_id, order_update, is_admin=False)
# Проверяем права доступа
if updated_order.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для отмены этого заказа"
)
return {"order": updated_order}
# Сервисы отзывов
def create_review(db: Session, user_id: int, review: ReviewCreate) -> Dict[str, Any]:
new_review = review_repo.create_review(db, review, user_id)
return {"review": new_review}
def update_review(db: Session, user_id: int, review_id: int, review: ReviewUpdate, is_admin: bool = False) -> Dict[str, Any]:
updated_review = review_repo.update_review(db, review_id, review, user_id, is_admin)
return {"review": updated_review}
def delete_review(db: Session, user_id: int, review_id: int, is_admin: bool = False) -> Dict[str, Any]:
success = review_repo.delete_review(db, review_id, user_id, is_admin)
return {"success": success}
def approve_review(db: Session, review_id: int) -> Dict[str, Any]:
approved_review = review_repo.approve_review(db, review_id)
return {"review": approved_review}
def get_product_reviews(db: Session, product_id: int, skip: int = 0, limit: int = 10) -> Dict[str, Any]:
reviews = review_repo.get_product_reviews(db, product_id, skip, limit)
# Получаем рейтинг продукта
rating = review_repo.get_product_rating(db, product_id)
return {
"reviews": reviews,
"rating": rating,
"total": rating["total_reviews"],
"skip": skip,
"limit": limit
}
# Сервисы информационных страниц
def create_page(db: Session, page: PageCreate) -> Dict[str, Any]:
new_page = content_repo.create_page(db, page)
return {"page": new_page}
def update_page(db: Session, page_id: int, page: PageUpdate) -> Dict[str, Any]:
updated_page = content_repo.update_page(db, page_id, page)
return {"page": updated_page}
def delete_page(db: Session, page_id: int) -> Dict[str, Any]:
success = content_repo.delete_page(db, page_id)
return {"success": success}
def get_page_by_slug(db: Session, slug: str) -> Dict[str, Any]:
page = content_repo.get_page_by_slug(db, slug)
if not page:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Страница не найдена"
)
return {"page": page}
# Сервисы аналитики
def log_event(db: Session, log: AnalyticsLogCreate) -> Dict[str, Any]:
new_log = content_repo.log_analytics_event(db, log)
return {"log": new_log}
def get_analytics_report(
db: Session,
period: str = "day",
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
report = content_repo.get_analytics_report(db, period, start_date, end_date)
return {"report": report}