обновлена стуруктура бекенда

This commit is contained in:
Zikil 2025-02-28 21:57:31 +07:00
parent f2214e2b9e
commit bfc77be0d6
38 changed files with 927 additions and 854 deletions

BIN
.DS_Store vendored

Binary file not shown.

BIN
backend/.DS_Store vendored

Binary file not shown.

BIN
backend/app/.DS_Store vendored

Binary file not shown.

View File

@ -1,9 +1,10 @@
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse
import os
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from sqlalchemy.exc import SQLAlchemyError
from app.config import settings
from app.routers import router
@ -14,27 +15,26 @@ Base.metadata.create_all(bind=engine)
# Создаем экземпляр приложения FastAPI
app = FastAPI(
title=settings.APP_NAME,
description=settings.APP_DESCRIPTION,
version=settings.APP_VERSION
title="Dressed for Success API",
description="API для интернет-магазина одежды",
version="1.0.0"
)
# Настраиваем CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_origins=["*"], # В продакшене нужно указать конкретные домены
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Обработчик исключений
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
# В реальном приложении здесь должно быть логирование ошибок
# Обработка ошибок SQLAlchemy
@app.exception_handler(SQLAlchemyError)
async def sqlalchemy_exception_handler(request: Request, exc: SQLAlchemyError):
return JSONResponse(
status_code=500,
content={"detail": "Внутренняя ошибка сервера"}
content={"detail": "Ошибка базы данных", "error": str(exc)},
)
# Подключаем роутеры
@ -50,8 +50,4 @@ app.mount("/uploads", StaticFiles(directory=settings.UPLOAD_DIRECTORY), name="up
# Корневой маршрут
@app.get("/")
async def root():
return {
"message": "Добро пожаловать в API интернет-магазина",
"docs_url": "/docs",
"redoc_url": "/redoc"
}
return {"message": "Добро пожаловать в API интернет-магазина Dressed for Success"}

View File

@ -1,354 +0,0 @@
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Query, Body, Request
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from datetime import datetime
from app.core import get_db, get_current_user, get_current_active_user, get_current_admin_user
from app.services import (
register_user, login_user, get_user_profile, update_user_profile,
add_user_address, update_user_address, delete_user_address,
create_category, update_category, delete_category, get_category_tree,
create_product, update_product, delete_product, get_product_details,
add_product_variant, update_product_variant, delete_product_variant,
upload_product_image, update_product_image, delete_product_image,
add_to_cart, update_cart_item, remove_from_cart, clear_cart, get_cart,
create_order, get_order, update_order, cancel_order,
create_review, update_review, delete_review, approve_review, get_product_reviews,
create_page, update_page, delete_page, get_page_by_slug,
log_event, get_analytics_report
)
from app.schemas.user_schemas import (
UserCreate, UserUpdate, User, AddressCreate, AddressUpdate, Address, Token
)
from app.schemas.catalog_schemas import (
CategoryCreate, CategoryUpdate, Category,
ProductCreate, ProductUpdate, Product,
ProductVariantCreate, ProductVariantUpdate, ProductVariant,
ProductImageCreate, ProductImageUpdate, ProductImage
)
from app.schemas.order_schemas import (
CartItemCreate, CartItemUpdate, CartItem, CartItemWithProduct,
OrderCreate, OrderUpdate, Order, OrderWithDetails
)
from app.schemas.review_schemas import (
ReviewCreate, ReviewUpdate, Review, ReviewWithUser
)
from app.schemas.content_schemas import (
PageCreate, PageUpdate, Page, AnalyticsLogCreate, AnalyticsLog, AnalyticsReport
)
from app.models.user_models import User as UserModel
# Создаем основной роутер
router = APIRouter()
# Роутеры для аутентификации и пользователей
auth_router = APIRouter(prefix="/auth", tags=["Аутентификация"])
@auth_router.post("/register", response_model=Dict[str, Any])
async def register(user: UserCreate, db: Session = Depends(get_db)):
return register_user(db, user)
@auth_router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
result = login_user(db, form_data.username, form_data.password)
return result
user_router = APIRouter(prefix="/users", tags=["Пользователи"])
@user_router.get("/me", response_model=Dict[str, Any])
async def read_users_me(current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return get_user_profile(db, current_user.id)
@user_router.put("/me", response_model=Dict[str, Any])
async def update_user_me(user_data: UserUpdate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return update_user_profile(db, current_user.id, user_data)
@user_router.post("/me/addresses", response_model=Dict[str, Any])
async def create_user_address(address: AddressCreate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return add_user_address(db, current_user.id, address)
@user_router.put("/me/addresses/{address_id}", response_model=Dict[str, Any])
async def update_user_address_endpoint(address_id: int, address: AddressUpdate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return update_user_address(db, current_user.id, address_id, address)
@user_router.delete("/me/addresses/{address_id}", response_model=Dict[str, Any])
async def delete_user_address_endpoint(address_id: int, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return delete_user_address(db, current_user.id, address_id)
@user_router.get("/{user_id}", response_model=Dict[str, Any])
async def read_user(user_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return get_user_profile(db, user_id)
# Роутеры для каталога
catalog_router = APIRouter(prefix="/catalog", tags=["Каталог"])
@catalog_router.post("/categories", response_model=Dict[str, Any])
async def create_category_endpoint(category: CategoryCreate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return create_category(db, category)
@catalog_router.put("/categories/{category_id}", response_model=Dict[str, Any])
async def update_category_endpoint(category_id: int, category: CategoryUpdate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return update_category(db, category_id, category)
@catalog_router.delete("/categories/{category_id}", response_model=Dict[str, Any])
async def delete_category_endpoint(category_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return delete_category(db, category_id)
@catalog_router.get("/categories", response_model=List[Dict[str, Any]])
async def get_categories_tree(db: Session = Depends(get_db)):
return get_category_tree(db)
@catalog_router.post("/products", response_model=Dict[str, Any])
async def create_product_endpoint(product: ProductCreate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return create_product(db, product)
@catalog_router.put("/products/{product_id}", response_model=Dict[str, Any])
async def update_product_endpoint(product_id: int, product: ProductUpdate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return update_product(db, product_id, product)
@catalog_router.delete("/products/{product_id}", response_model=Dict[str, Any])
async def delete_product_endpoint(product_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return delete_product(db, product_id)
@catalog_router.get("/products/{product_id}", response_model=Dict[str, Any])
async def get_product_details_endpoint(product_id: int, db: Session = Depends(get_db)):
return get_product_details(db, product_id)
@catalog_router.post("/products/{product_id}/variants", response_model=Dict[str, Any])
async def add_product_variant_endpoint(product_id: int, variant: ProductVariantCreate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
variant.product_id = product_id
return add_product_variant(db, variant)
@catalog_router.put("/variants/{variant_id}", response_model=Dict[str, Any])
async def update_product_variant_endpoint(variant_id: int, variant: ProductVariantUpdate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return update_product_variant(db, variant_id, variant)
@catalog_router.delete("/variants/{variant_id}", response_model=Dict[str, Any])
async def delete_product_variant_endpoint(variant_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return delete_product_variant(db, variant_id)
@catalog_router.post("/products/{product_id}/images", response_model=Dict[str, Any])
async def upload_product_image_endpoint(
product_id: int,
file: UploadFile = File(...),
is_primary: bool = Form(False),
current_user: UserModel = Depends(get_current_admin_user),
db: Session = Depends(get_db)
):
return upload_product_image(db, product_id, file, is_primary)
@catalog_router.put("/images/{image_id}", response_model=Dict[str, Any])
async def update_product_image_endpoint(image_id: int, image: ProductImageUpdate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return update_product_image(db, image_id, image)
@catalog_router.delete("/images/{image_id}", response_model=Dict[str, Any])
async def delete_product_image_endpoint(image_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return delete_product_image(db, image_id)
@catalog_router.get("/products", response_model=List[Product])
async def get_products(
skip: int = 0,
limit: int = 100,
category_id: Optional[int] = None,
search: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
is_active: Optional[bool] = True,
db: Session = Depends(get_db)
):
from app.repositories.catalog_repo import get_products
return get_products(db, skip, limit, category_id, search, min_price, max_price, is_active)
# Роутеры для корзины и заказов
cart_router = APIRouter(prefix="/cart", tags=["Корзина"])
@cart_router.post("/items", response_model=Dict[str, Any])
async def add_to_cart_endpoint(cart_item: CartItemCreate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return add_to_cart(db, current_user.id, cart_item)
@cart_router.put("/items/{cart_item_id}", response_model=Dict[str, Any])
async def update_cart_item_endpoint(cart_item_id: int, cart_item: CartItemUpdate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return update_cart_item(db, current_user.id, cart_item_id, cart_item)
@cart_router.delete("/items/{cart_item_id}", response_model=Dict[str, Any])
async def remove_from_cart_endpoint(cart_item_id: int, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return remove_from_cart(db, current_user.id, cart_item_id)
@cart_router.delete("/clear", response_model=Dict[str, Any])
async def clear_cart_endpoint(current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return clear_cart(db, current_user.id)
@cart_router.get("/", response_model=Dict[str, Any])
async def get_cart_endpoint(current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return get_cart(db, current_user.id)
order_router = APIRouter(prefix="/orders", tags=["Заказы"])
@order_router.post("/", response_model=Dict[str, Any])
async def create_order_endpoint(order: OrderCreate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return create_order(db, current_user.id, order)
@order_router.get("/{order_id}", response_model=Dict[str, Any])
async def get_order_endpoint(order_id: int, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return get_order(db, current_user.id, order_id, current_user.is_admin)
@order_router.put("/{order_id}", response_model=Dict[str, Any])
async def update_order_endpoint(order_id: int, order: OrderUpdate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return update_order(db, current_user.id, order_id, order, current_user.is_admin)
@order_router.post("/{order_id}/cancel", response_model=Dict[str, Any])
async def cancel_order_endpoint(order_id: int, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return cancel_order(db, current_user.id, order_id)
@order_router.get("/", response_model=List[Order])
async def get_orders(
skip: int = 0,
limit: int = 100,
status: Optional[str] = None,
current_user: UserModel = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
if current_user.is_admin:
from app.repositories.order_repo import get_all_orders
return get_all_orders(db, skip, limit, status)
else:
from app.repositories.order_repo import get_user_orders
return get_user_orders(db, current_user.id, skip, limit)
# Роутеры для отзывов
review_router = APIRouter(prefix="/reviews", tags=["Отзывы"])
@review_router.post("/", response_model=Dict[str, Any])
async def create_review_endpoint(review: ReviewCreate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return create_review(db, current_user.id, review)
@review_router.put("/{review_id}", response_model=Dict[str, Any])
async def update_review_endpoint(review_id: int, review: ReviewUpdate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return update_review(db, current_user.id, review_id, review, current_user.is_admin)
@review_router.delete("/{review_id}", response_model=Dict[str, Any])
async def delete_review_endpoint(review_id: int, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return delete_review(db, current_user.id, review_id, current_user.is_admin)
@review_router.post("/{review_id}/approve", response_model=Dict[str, Any])
async def approve_review_endpoint(review_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return approve_review(db, review_id)
@review_router.get("/products/{product_id}", response_model=Dict[str, Any])
async def get_product_reviews_endpoint(product_id: int, skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
return get_product_reviews(db, product_id, skip, limit)
# Роутеры для информационных страниц
content_router = APIRouter(prefix="/content", tags=["Контент"])
@content_router.post("/pages", response_model=Dict[str, Any])
async def create_page_endpoint(page: PageCreate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return create_page(db, page)
@content_router.put("/pages/{page_id}", response_model=Dict[str, Any])
async def update_page_endpoint(page_id: int, page: PageUpdate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return update_page(db, page_id, page)
@content_router.delete("/pages/{page_id}", response_model=Dict[str, Any])
async def delete_page_endpoint(page_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return delete_page(db, page_id)
@content_router.get("/pages/{slug}", response_model=Dict[str, Any])
async def get_page_endpoint(slug: str, db: Session = Depends(get_db)):
return get_page_by_slug(db, slug)
@content_router.get("/pages", response_model=List[Page])
async def get_pages(
skip: int = 0,
limit: int = 100,
published_only: bool = True,
current_user: Optional[UserModel] = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Если пользователь не админ, показываем только опубликованные страницы
is_admin = current_user and current_user.is_admin
from app.repositories.content_repo import get_pages
return get_pages(db, skip, limit, published_only=(not is_admin) and published_only)
# Роутеры для аналитики
analytics_router = APIRouter(prefix="/analytics", tags=["Аналитика"])
@analytics_router.post("/events", response_model=Dict[str, Any])
async def log_event_endpoint(log: AnalyticsLogCreate, request: Request, db: Session = Depends(get_db)):
# Добавляем IP-адрес и User-Agent, если они не указаны
if not log.ip_address:
log.ip_address = request.client.host
if not log.user_agent:
user_agent = request.headers.get("user-agent")
if user_agent:
log.user_agent = user_agent
return log_event(db, log)
@analytics_router.get("/reports", response_model=Dict[str, Any])
async def get_analytics_report_endpoint(
period: str = "day",
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
current_user: UserModel = Depends(get_current_admin_user),
db: Session = Depends(get_db)
):
return get_analytics_report(db, period, start_date, end_date)
# Включаем все роутеры в основной роутер
router.include_router(auth_router)
router.include_router(user_router)
router.include_router(catalog_router)
router.include_router(cart_router)
router.include_router(order_router)
router.include_router(review_router)
router.include_router(content_router)
router.include_router(analytics_router)

View File

@ -0,0 +1,23 @@
from fastapi import APIRouter
from app.routers.auth_router import auth_router
from app.routers.user_router import user_router
from app.routers.catalog_router import catalog_router
from app.routers.cart_router import cart_router
from app.routers.order_router import order_router
from app.routers.review_router import review_router
from app.routers.content_router import content_router
from app.routers.analytics_router import analytics_router
# Создаем основной роутер
router = APIRouter()
# Включаем все роутеры в основной роутер
router.include_router(auth_router)
router.include_router(user_router)
router.include_router(catalog_router)
router.include_router(cart_router)
router.include_router(order_router)
router.include_router(review_router)
router.include_router(content_router)
router.include_router(analytics_router)

View File

@ -0,0 +1,40 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Dict, Any, List, Optional
from datetime import datetime
from app.core import get_db, get_current_active_user, get_current_admin_user
from app import services
from app.schemas.content_schemas import AnalyticsLogCreate
from app.models.user_models import User as UserModel
# Роутер для аналитики
analytics_router = APIRouter(prefix="/analytics", tags=["Аналитика"])
@analytics_router.post("/log", response_model=Dict[str, Any])
async def log_analytics_event_endpoint(log: AnalyticsLogCreate, db: Session = Depends(get_db)):
return services.log_analytics_event(db, log)
@analytics_router.get("/logs", response_model=Dict[str, Any])
async def get_analytics_logs_endpoint(
event_type: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
skip: int = 0,
limit: int = 100,
current_user: UserModel = Depends(get_current_admin_user),
db: Session = Depends(get_db)
):
return services.get_analytics_logs(db, event_type, start_date, end_date, skip, limit)
@analytics_router.get("/report", response_model=Dict[str, Any])
async def get_analytics_report_endpoint(
report_type: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
current_user: UserModel = Depends(get_current_admin_user),
db: Session = Depends(get_db)
):
return services.get_analytics_report(db, report_type, start_date, end_date)

View File

@ -0,0 +1,21 @@
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import Dict, Any
from app.core import get_db
from app import services
from app.schemas.user_schemas import UserCreate, Token
# Роутер для аутентификации
auth_router = APIRouter(prefix="/auth", tags=["Аутентификация"])
@auth_router.post("/register", response_model=Dict[str, Any])
async def register(user: UserCreate, db: Session = Depends(get_db)):
return services.register_user(db, user)
@auth_router.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
result = services.login_user(db, form_data.username, form_data.password)
return result

View File

@ -0,0 +1,35 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Dict, Any
from app.core import get_db, get_current_active_user
from app import services
from app.schemas.order_schemas import CartItemCreate, CartItemUpdate
from app.models.user_models import User as UserModel
# Роутер для корзины
cart_router = APIRouter(prefix="/cart", tags=["Корзина"])
@cart_router.post("/items", response_model=Dict[str, Any])
async def add_to_cart_endpoint(cart_item: CartItemCreate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.add_to_cart(db, current_user.id, cart_item)
@cart_router.put("/items/{cart_item_id}", response_model=Dict[str, Any])
async def update_cart_item_endpoint(cart_item_id: int, cart_item: CartItemUpdate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.update_cart_item(db, current_user.id, cart_item_id, cart_item)
@cart_router.delete("/items/{cart_item_id}", response_model=Dict[str, Any])
async def remove_from_cart_endpoint(cart_item_id: int, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.remove_from_cart(db, current_user.id, cart_item_id)
@cart_router.delete("/clear", response_model=Dict[str, Any])
async def clear_cart_endpoint(current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.clear_cart(db, current_user.id)
@cart_router.get("/", response_model=Dict[str, Any])
async def get_cart_endpoint(current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.get_cart(db, current_user.id)

View File

@ -0,0 +1,107 @@
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from app.core import get_db, get_current_admin_user
from app import services
from app.schemas.catalog_schemas import (
CategoryCreate, CategoryUpdate, Category,
ProductCreate, ProductUpdate, Product,
ProductVariantCreate, ProductVariantUpdate, ProductVariant,
ProductImageCreate, ProductImageUpdate, ProductImage
)
from app.models.user_models import User as UserModel
from app.repositories.catalog_repo import get_products
# Роутер для каталога
catalog_router = APIRouter(prefix="/catalog", tags=["Каталог"])
@catalog_router.post("/categories", response_model=Dict[str, Any])
async def create_category_endpoint(category: CategoryCreate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.create_category(db, category)
@catalog_router.put("/categories/{category_id}", response_model=Dict[str, Any])
async def update_category_endpoint(category_id: int, category: CategoryUpdate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.update_category(db, category_id, category)
@catalog_router.delete("/categories/{category_id}", response_model=Dict[str, Any])
async def delete_category_endpoint(category_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.delete_category(db, category_id)
@catalog_router.get("/categories", response_model=List[Dict[str, Any]])
async def get_categories_tree(db: Session = Depends(get_db)):
return services.get_category_tree(db)
@catalog_router.post("/products", response_model=Dict[str, Any])
async def create_product_endpoint(product: ProductCreate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.create_product(db, product)
@catalog_router.put("/products/{product_id}", response_model=Dict[str, Any])
async def update_product_endpoint(product_id: int, product: ProductUpdate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.update_product(db, product_id, product)
@catalog_router.delete("/products/{product_id}", response_model=Dict[str, Any])
async def delete_product_endpoint(product_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.delete_product(db, product_id)
@catalog_router.get("/products/{product_id}", response_model=Dict[str, Any])
async def get_product_details_endpoint(product_id: int, db: Session = Depends(get_db)):
return services.get_product_details(db, product_id)
@catalog_router.post("/products/{product_id}/variants", response_model=Dict[str, Any])
async def add_product_variant_endpoint(product_id: int, variant: ProductVariantCreate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
variant.product_id = product_id
return services.add_product_variant(db, variant)
@catalog_router.put("/variants/{variant_id}", response_model=Dict[str, Any])
async def update_product_variant_endpoint(variant_id: int, variant: ProductVariantUpdate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.update_product_variant(db, variant_id, variant)
@catalog_router.delete("/variants/{variant_id}", response_model=Dict[str, Any])
async def delete_product_variant_endpoint(variant_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.delete_product_variant(db, variant_id)
@catalog_router.post("/products/{product_id}/images", response_model=Dict[str, Any])
async def upload_product_image_endpoint(
product_id: int,
file: UploadFile = File(...),
is_primary: bool = Form(False),
current_user: UserModel = Depends(get_current_admin_user),
db: Session = Depends(get_db)
):
return services.upload_product_image(db, product_id, file, is_primary)
@catalog_router.put("/images/{image_id}", response_model=Dict[str, Any])
async def update_product_image_endpoint(image_id: int, image: ProductImageUpdate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.update_product_image(db, image_id, image)
@catalog_router.delete("/images/{image_id}", response_model=Dict[str, Any])
async def delete_product_image_endpoint(image_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.delete_product_image(db, image_id)
@catalog_router.get("/products", response_model=List[Product])
async def get_products_endpoint(
skip: int = 0,
limit: int = 100,
category_id: Optional[int] = None,
search: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
is_active: Optional[bool] = True,
db: Session = Depends(get_db)
):
return get_products(db, skip, limit, category_id, search, min_price, max_price, is_active)

View File

@ -0,0 +1,40 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Dict, Any, List, Optional
from app.core import get_db, get_current_active_user, get_current_admin_user
from app import services
from app.schemas.content_schemas import PageCreate, PageUpdate
from app.models.user_models import User as UserModel
# Роутер для контента
content_router = APIRouter(prefix="/content", tags=["Контент"])
@content_router.post("/pages", response_model=Dict[str, Any])
async def create_page_endpoint(page: PageCreate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.create_page(db, page)
@content_router.put("/pages/{page_id}", response_model=Dict[str, Any])
async def update_page_endpoint(page_id: int, page: PageUpdate, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.update_page(db, page_id, page)
@content_router.delete("/pages/{page_id}", response_model=Dict[str, Any])
async def delete_page_endpoint(page_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.delete_page(db, page_id)
@content_router.get("/pages/{page_id}", response_model=Dict[str, Any])
async def get_page_endpoint(page_id: int, db: Session = Depends(get_db)):
return services.get_page(db, page_id)
@content_router.get("/pages/slug/{slug}", response_model=Dict[str, Any])
async def get_page_by_slug_endpoint(slug: str, db: Session = Depends(get_db)):
return services.get_page_by_slug(db, slug)
@content_router.get("/pages", response_model=Dict[str, Any])
async def get_pages_endpoint(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
return services.get_pages(db, skip, limit)

View File

@ -0,0 +1,45 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from app.core import get_db, get_current_active_user
from app import services
from app.schemas.order_schemas import OrderCreate, OrderUpdate, Order
from app.models.user_models import User as UserModel
from app.repositories.order_repo import get_all_orders, get_user_orders
# Роутер для заказов
order_router = APIRouter(prefix="/orders", tags=["Заказы"])
@order_router.post("/", response_model=Dict[str, Any])
async def create_order_endpoint(order: OrderCreate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.create_order(db, current_user.id, order)
@order_router.get("/{order_id}", response_model=Dict[str, Any])
async def get_order_endpoint(order_id: int, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.get_order(db, current_user.id, order_id, current_user.is_admin)
@order_router.put("/{order_id}", response_model=Dict[str, Any])
async def update_order_endpoint(order_id: int, order: OrderUpdate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.update_order(db, current_user.id, order_id, order, current_user.is_admin)
@order_router.post("/{order_id}/cancel", response_model=Dict[str, Any])
async def cancel_order_endpoint(order_id: int, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.cancel_order(db, current_user.id, order_id)
@order_router.get("/", response_model=List[Order])
async def get_orders(
skip: int = 0,
limit: int = 100,
status: Optional[str] = None,
current_user: UserModel = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
if current_user.is_admin:
return get_all_orders(db, skip, limit, status)
else:
return get_user_orders(db, current_user.id, skip, limit)

View File

@ -0,0 +1,35 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Dict, Any
from app.core import get_db, get_current_active_user, get_current_admin_user
from app import services
from app.schemas.review_schemas import ReviewCreate, ReviewUpdate
from app.models.user_models import User as UserModel
# Роутер для отзывов
review_router = APIRouter(prefix="/reviews", tags=["Отзывы"])
@review_router.post("/", response_model=Dict[str, Any])
async def create_review_endpoint(review: ReviewCreate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.create_review(db, current_user.id, review)
@review_router.put("/{review_id}", response_model=Dict[str, Any])
async def update_review_endpoint(review_id: int, review: ReviewUpdate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.update_review(db, current_user.id, review_id, review, current_user.is_admin)
@review_router.delete("/{review_id}", response_model=Dict[str, Any])
async def delete_review_endpoint(review_id: int, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.delete_review(db, current_user.id, review_id, current_user.is_admin)
@review_router.post("/{review_id}/approve", response_model=Dict[str, Any])
async def approve_review_endpoint(review_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.approve_review(db, review_id)
@review_router.get("/products/{product_id}", response_model=Dict[str, Any])
async def get_product_reviews_endpoint(product_id: int, skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
return services.get_product_reviews(db, product_id, skip, limit)

View File

@ -0,0 +1,40 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import Dict, Any
from app.core import get_db, get_current_active_user, get_current_admin_user
from app import services
from app.schemas.user_schemas import UserUpdate, AddressCreate, AddressUpdate
from app.models.user_models import User as UserModel
# Роутер для пользователей
user_router = APIRouter(prefix="/users", tags=["Пользователи"])
@user_router.get("/me", response_model=Dict[str, Any])
async def read_users_me(current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.get_user_profile(db, current_user.id)
@user_router.put("/me", response_model=Dict[str, Any])
async def update_user_me(user_data: UserUpdate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.update_user_profile(db, current_user.id, user_data)
@user_router.post("/me/addresses", response_model=Dict[str, Any])
async def create_user_address(address: AddressCreate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.add_user_address(db, current_user.id, address)
@user_router.put("/me/addresses/{address_id}", response_model=Dict[str, Any])
async def update_user_address_endpoint(address_id: int, address: AddressUpdate, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.update_user_address(db, current_user.id, address_id, address)
@user_router.delete("/me/addresses/{address_id}", response_model=Dict[str, Any])
async def delete_user_address_endpoint(address_id: int, current_user: UserModel = Depends(get_current_active_user), db: Session = Depends(get_db)):
return services.delete_user_address(db, current_user.id, address_id)
@user_router.get("/{user_id}", response_model=Dict[str, Any])
async def read_user(user_id: int, current_user: UserModel = Depends(get_current_admin_user), db: Session = Depends(get_db)):
return services.get_user_profile(db, user_id)

View File

@ -1,482 +0,0 @@
from sqlalchemy.orm import Session
from fastapi import HTTPException, status, UploadFile
from typing import List, Optional, Dict, Any, Union
from datetime import datetime, timedelta
import os
import uuid
import shutil
from pathlib import Path
from app.config import settings
from app.core import create_access_token, get_password_hash, verify_password
from app.repositories import (
user_repo, catalog_repo, order_repo, review_repo, content_repo
)
from app.schemas.user_schemas import UserCreate, UserUpdate, AddressCreate, AddressUpdate, Token
from app.schemas.catalog_schemas import (
CategoryCreate, CategoryUpdate,
ProductCreate, ProductUpdate,
ProductVariantCreate, ProductVariantUpdate,
ProductImageCreate, ProductImageUpdate
)
from app.schemas.order_schemas import CartItemCreate, CartItemUpdate, OrderCreate, OrderUpdate
from app.schemas.review_schemas import ReviewCreate, ReviewUpdate
from app.schemas.content_schemas import PageCreate, PageUpdate, AnalyticsLogCreate
# Сервисы аутентификации и пользователей
def register_user(db: Session, user: UserCreate) -> Dict[str, Any]:
# Создаем пользователя
db_user = user_repo.create_user(db, user)
# Создаем токен доступа
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": db_user.username}, expires_delta=access_token_expires
)
return {
"user": db_user,
"access_token": access_token,
"token_type": "bearer"
}
def login_user(db: Session, username: str, password: str) -> Dict[str, Any]:
# Аутентифицируем пользователя
user = user_repo.authenticate_user(db, username, password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
# Проверяем, что пользователь активен
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неактивный пользователь"
)
# Создаем токен доступа
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer"
}
def get_user_profile(db: Session, user_id: int) -> Dict[str, Any]:
user = user_repo.get_user(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Пользователь не найден"
)
# Получаем адреса пользователя
addresses = user_repo.get_user_addresses(db, user_id)
# Получаем заказы пользователя
orders = order_repo.get_user_orders(db, user_id)
# Получаем отзывы пользователя
reviews = review_repo.get_user_reviews(db, user_id)
return {
"user": user,
"addresses": addresses,
"orders": orders,
"reviews": reviews
}
def update_user_profile(db: Session, user_id: int, user_data: UserUpdate) -> Dict[str, Any]:
updated_user = user_repo.update_user(db, user_id, user_data)
return {"user": updated_user}
def add_user_address(db: Session, user_id: int, address: AddressCreate) -> Dict[str, Any]:
new_address = user_repo.create_address(db, address, user_id)
return {"address": new_address}
def update_user_address(db: Session, user_id: int, address_id: int, address: AddressUpdate) -> Dict[str, Any]:
updated_address = user_repo.update_address(db, address_id, address, user_id)
return {"address": updated_address}
def delete_user_address(db: Session, user_id: int, address_id: int) -> Dict[str, Any]:
success = user_repo.delete_address(db, address_id, user_id)
return {"success": success}
# Сервисы каталога
def create_category(db: Session, category: CategoryCreate) -> Dict[str, Any]:
new_category = catalog_repo.create_category(db, category)
return {"category": new_category}
def update_category(db: Session, category_id: int, category: CategoryUpdate) -> Dict[str, Any]:
updated_category = catalog_repo.update_category(db, category_id, category)
return {"category": updated_category}
def delete_category(db: Session, category_id: int) -> Dict[str, Any]:
success = catalog_repo.delete_category(db, category_id)
return {"success": success}
def get_category_tree(db: Session) -> List[Dict[str, Any]]:
# Получаем все категории верхнего уровня
root_categories = catalog_repo.get_categories(db, parent_id=None)
result = []
for category in root_categories:
# Рекурсивно получаем подкатегории
category_dict = {
"id": category.id,
"name": category.name,
"slug": category.slug,
"description": category.description,
"is_active": category.is_active,
"subcategories": _get_subcategories(db, category.id)
}
result.append(category_dict)
return result
def _get_subcategories(db: Session, parent_id: int) -> List[Dict[str, Any]]:
subcategories = catalog_repo.get_categories(db, parent_id=parent_id)
result = []
for category in subcategories:
category_dict = {
"id": category.id,
"name": category.name,
"slug": category.slug,
"description": category.description,
"is_active": category.is_active,
"subcategories": _get_subcategories(db, category.id)
}
result.append(category_dict)
return result
def create_product(db: Session, product: ProductCreate) -> Dict[str, Any]:
new_product = catalog_repo.create_product(db, product)
return {"product": new_product}
def update_product(db: Session, product_id: int, product: ProductUpdate) -> Dict[str, Any]:
updated_product = catalog_repo.update_product(db, product_id, product)
return {"product": updated_product}
def delete_product(db: Session, product_id: int) -> Dict[str, Any]:
success = catalog_repo.delete_product(db, product_id)
return {"success": success}
def get_product_details(db: Session, product_id: int) -> Dict[str, Any]:
product = catalog_repo.get_product(db, product_id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Продукт не найден"
)
# Получаем варианты продукта
variants = catalog_repo.get_product_variants(db, product_id)
# Получаем изображения продукта
images = catalog_repo.get_product_images(db, product_id)
# Получаем рейтинг продукта
rating = review_repo.get_product_rating(db, product_id)
# Получаем отзывы продукта
reviews = review_repo.get_product_reviews(db, product_id, limit=5)
return {
"product": product,
"variants": variants,
"images": images,
"rating": rating,
"reviews": reviews
}
def add_product_variant(db: Session, variant: ProductVariantCreate) -> Dict[str, Any]:
new_variant = catalog_repo.create_variant(db, variant)
return {"variant": new_variant}
def update_product_variant(db: Session, variant_id: int, variant: ProductVariantUpdate) -> Dict[str, Any]:
updated_variant = catalog_repo.update_variant(db, variant_id, variant)
return {"variant": updated_variant}
def delete_product_variant(db: Session, variant_id: int) -> Dict[str, Any]:
success = catalog_repo.delete_variant(db, variant_id)
return {"success": success}
def upload_product_image(db: Session, product_id: int, file: UploadFile, is_primary: bool = False) -> Dict[str, Any]:
# Проверяем, что продукт существует
product = catalog_repo.get_product(db, product_id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Продукт не найден"
)
# Проверяем расширение файла
file_extension = file.filename.split(".")[-1].lower()
if file_extension not in settings.ALLOWED_UPLOAD_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Неподдерживаемый формат файла. Разрешены: {', '.join(settings.ALLOWED_UPLOAD_EXTENSIONS)}"
)
# Создаем директорию для загрузок, если она не существует
upload_dir = Path(settings.UPLOAD_DIRECTORY) / "products" / str(product_id)
upload_dir.mkdir(parents=True, exist_ok=True)
# Генерируем уникальное имя файла
unique_filename = f"{uuid.uuid4()}.{file_extension}"
file_path = upload_dir / unique_filename
# Сохраняем файл
with file_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Создаем запись об изображении в БД
image_data = ProductImageCreate(
product_id=product_id,
image_url=f"/uploads/products/{product_id}/{unique_filename}",
alt_text=file.filename,
is_primary=is_primary
)
new_image = catalog_repo.create_image(db, image_data)
return {"image": new_image}
def update_product_image(db: Session, image_id: int, image: ProductImageUpdate) -> Dict[str, Any]:
updated_image = catalog_repo.update_image(db, image_id, image)
return {"image": updated_image}
def delete_product_image(db: Session, image_id: int) -> Dict[str, Any]:
# Получаем информацию об изображении перед удалением
image = catalog_repo.get_image(db, image_id)
if not image:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Изображение не найдено"
)
# Удаляем запись из БД
success = catalog_repo.delete_image(db, image_id)
# Удаляем файл с диска
if success:
try:
# Получаем путь к файлу из URL
file_path = Path(settings.UPLOAD_DIRECTORY) / image.image_url.lstrip("/uploads/")
if file_path.exists():
file_path.unlink()
except Exception:
# Если не удалось удалить файл, просто логируем ошибку
# В реальном приложении здесь должно быть логирование
pass
return {"success": success}
# Сервисы корзины и заказов
def add_to_cart(db: Session, user_id: int, cart_item: CartItemCreate) -> Dict[str, Any]:
new_cart_item = order_repo.create_cart_item(db, cart_item, user_id)
# Логируем событие добавления в корзину
log_data = AnalyticsLogCreate(
user_id=user_id,
event_type="add_to_cart",
product_id=cart_item.product_id,
additional_data={"quantity": cart_item.quantity}
)
content_repo.log_analytics_event(db, log_data)
return {"cart_item": new_cart_item}
def update_cart_item(db: Session, user_id: int, cart_item_id: int, cart_item: CartItemUpdate) -> Dict[str, Any]:
updated_cart_item = order_repo.update_cart_item(db, cart_item_id, cart_item, user_id)
return {"cart_item": updated_cart_item}
def remove_from_cart(db: Session, user_id: int, cart_item_id: int) -> Dict[str, Any]:
success = order_repo.delete_cart_item(db, cart_item_id, user_id)
return {"success": success}
def clear_cart(db: Session, user_id: int) -> Dict[str, Any]:
success = order_repo.clear_cart(db, user_id)
return {"success": success}
def get_cart(db: Session, user_id: int) -> Dict[str, Any]:
cart_items = order_repo.get_cart_with_product_details(db, user_id)
# Рассчитываем общую сумму корзины
total_amount = sum(item["total_price"] for item in cart_items)
return {
"items": cart_items,
"total_amount": total_amount,
"items_count": len(cart_items)
}
def create_order(db: Session, user_id: int, order: OrderCreate) -> Dict[str, Any]:
new_order = order_repo.create_order(db, order, user_id)
# Логируем событие создания заказа
log_data = AnalyticsLogCreate(
user_id=user_id,
event_type="order_created",
additional_data={"order_id": new_order.id, "total_amount": new_order.total_amount}
)
content_repo.log_analytics_event(db, log_data)
return {"order": new_order}
def get_order(db: Session, user_id: int, order_id: int, is_admin: bool = False) -> Dict[str, Any]:
# Получаем заказ с деталями
order_details = order_repo.get_order_with_details(db, order_id)
# Проверяем права доступа
if not is_admin and order_details["user_id"] != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для просмотра этого заказа"
)
return {"order": order_details}
def update_order(db: Session, user_id: int, order_id: int, order: OrderUpdate, is_admin: bool = False) -> Dict[str, Any]:
updated_order = order_repo.update_order(db, order_id, order, is_admin)
# Проверяем права доступа
if not is_admin and updated_order.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для обновления этого заказа"
)
return {"order": updated_order}
def cancel_order(db: Session, user_id: int, order_id: int) -> Dict[str, Any]:
# Отменяем заказ (обычный пользователь может только отменить заказ)
order_update = OrderUpdate(status="cancelled")
updated_order = order_repo.update_order(db, order_id, order_update, is_admin=False)
# Проверяем права доступа
if updated_order.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для отмены этого заказа"
)
return {"order": updated_order}
# Сервисы отзывов
def create_review(db: Session, user_id: int, review: ReviewCreate) -> Dict[str, Any]:
new_review = review_repo.create_review(db, review, user_id)
return {"review": new_review}
def update_review(db: Session, user_id: int, review_id: int, review: ReviewUpdate, is_admin: bool = False) -> Dict[str, Any]:
updated_review = review_repo.update_review(db, review_id, review, user_id, is_admin)
return {"review": updated_review}
def delete_review(db: Session, user_id: int, review_id: int, is_admin: bool = False) -> Dict[str, Any]:
success = review_repo.delete_review(db, review_id, user_id, is_admin)
return {"success": success}
def approve_review(db: Session, review_id: int) -> Dict[str, Any]:
approved_review = review_repo.approve_review(db, review_id)
return {"review": approved_review}
def get_product_reviews(db: Session, product_id: int, skip: int = 0, limit: int = 10) -> Dict[str, Any]:
reviews = review_repo.get_product_reviews(db, product_id, skip, limit)
# Получаем рейтинг продукта
rating = review_repo.get_product_rating(db, product_id)
return {
"reviews": reviews,
"rating": rating,
"total": rating["total_reviews"],
"skip": skip,
"limit": limit
}
# Сервисы информационных страниц
def create_page(db: Session, page: PageCreate) -> Dict[str, Any]:
new_page = content_repo.create_page(db, page)
return {"page": new_page}
def update_page(db: Session, page_id: int, page: PageUpdate) -> Dict[str, Any]:
updated_page = content_repo.update_page(db, page_id, page)
return {"page": updated_page}
def delete_page(db: Session, page_id: int) -> Dict[str, Any]:
success = content_repo.delete_page(db, page_id)
return {"success": success}
def get_page_by_slug(db: Session, slug: str) -> Dict[str, Any]:
page = content_repo.get_page_by_slug(db, slug)
if not page:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Страница не найдена"
)
return {"page": page}
# Сервисы аналитики
def log_event(db: Session, log: AnalyticsLogCreate) -> Dict[str, Any]:
new_log = content_repo.log_analytics_event(db, log)
return {"log": new_log}
def get_analytics_report(
db: Session,
period: str = "day",
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
report = content_repo.get_analytics_report(db, period, start_date, end_date)
return {"report": report}

View File

@ -0,0 +1,25 @@
from app.services.user_service import (
register_user, login_user, get_user_profile, update_user_profile,
add_user_address, update_user_address, delete_user_address
)
from app.services.catalog_service import (
create_category, update_category, delete_category, get_category_tree,
create_product, update_product, delete_product, get_product_details,
add_product_variant, update_product_variant, delete_product_variant,
upload_product_image, update_product_image, delete_product_image
)
from app.services.order_service import (
add_to_cart, update_cart_item, remove_from_cart, clear_cart, get_cart,
create_order, get_order, update_order, cancel_order
)
from app.services.review_service import (
create_review, update_review, delete_review, approve_review, get_product_reviews
)
from app.services.content_service import (
create_page, update_page, delete_page, get_page_by_slug,
log_event, get_analytics_report
)

View File

@ -0,0 +1,203 @@
from sqlalchemy.orm import Session
from fastapi import HTTPException, status, UploadFile
from typing import List, Dict, Any
import os
import uuid
import shutil
from pathlib import Path
from app.config import settings
from app.repositories import catalog_repo, review_repo
from app.schemas.catalog_schemas import (
CategoryCreate, CategoryUpdate,
ProductCreate, ProductUpdate,
ProductVariantCreate, ProductVariantUpdate,
ProductImageCreate, ProductImageUpdate
)
# Сервисы каталога
def create_category(db: Session, category: CategoryCreate) -> Dict[str, Any]:
new_category = catalog_repo.create_category(db, category)
return {"category": new_category}
def update_category(db: Session, category_id: int, category: CategoryUpdate) -> Dict[str, Any]:
updated_category = catalog_repo.update_category(db, category_id, category)
return {"category": updated_category}
def delete_category(db: Session, category_id: int) -> Dict[str, Any]:
success = catalog_repo.delete_category(db, category_id)
return {"success": success}
def get_category_tree(db: Session) -> List[Dict[str, Any]]:
# Получаем все категории верхнего уровня
root_categories = catalog_repo.get_categories(db, parent_id=None)
result = []
for category in root_categories:
# Рекурсивно получаем подкатегории
category_dict = {
"id": category.id,
"name": category.name,
"slug": category.slug,
"description": category.description,
"is_active": category.is_active,
"subcategories": _get_subcategories(db, category.id)
}
result.append(category_dict)
return result
def _get_subcategories(db: Session, parent_id: int) -> List[Dict[str, Any]]:
subcategories = catalog_repo.get_categories(db, parent_id=parent_id)
result = []
for category in subcategories:
category_dict = {
"id": category.id,
"name": category.name,
"slug": category.slug,
"description": category.description,
"is_active": category.is_active,
"subcategories": _get_subcategories(db, category.id)
}
result.append(category_dict)
return result
def create_product(db: Session, product: ProductCreate) -> Dict[str, Any]:
new_product = catalog_repo.create_product(db, product)
return {"product": new_product}
def update_product(db: Session, product_id: int, product: ProductUpdate) -> Dict[str, Any]:
updated_product = catalog_repo.update_product(db, product_id, product)
return {"product": updated_product}
def delete_product(db: Session, product_id: int) -> Dict[str, Any]:
success = catalog_repo.delete_product(db, product_id)
return {"success": success}
def get_product_details(db: Session, product_id: int) -> Dict[str, Any]:
product = catalog_repo.get_product(db, product_id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Продукт не найден"
)
# Получаем варианты продукта
variants = catalog_repo.get_product_variants(db, product_id)
# Получаем изображения продукта
images = catalog_repo.get_product_images(db, product_id)
# Получаем рейтинг продукта
rating = review_repo.get_product_rating(db, product_id)
# Получаем отзывы продукта
reviews = review_repo.get_product_reviews(db, product_id, limit=5)
return {
"product": product,
"variants": variants,
"images": images,
"rating": rating,
"reviews": reviews
}
def add_product_variant(db: Session, variant: ProductVariantCreate) -> Dict[str, Any]:
new_variant = catalog_repo.create_product_variant(db, variant)
return {"variant": new_variant}
def update_product_variant(db: Session, variant_id: int, variant: ProductVariantUpdate) -> Dict[str, Any]:
updated_variant = catalog_repo.update_product_variant(db, variant_id, variant)
return {"variant": updated_variant}
def delete_product_variant(db: Session, variant_id: int) -> Dict[str, Any]:
success = catalog_repo.delete_product_variant(db, variant_id)
return {"success": success}
def upload_product_image(db: Session, product_id: int, file: UploadFile, is_primary: bool = False) -> Dict[str, Any]:
# Проверяем, что продукт существует
product = catalog_repo.get_product(db, product_id)
if not product:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Продукт не найден"
)
# Проверяем расширение файла
file_extension = file.filename.split(".")[-1].lower()
if file_extension not in settings.ALLOWED_UPLOAD_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Неподдерживаемый формат файла. Разрешены: {', '.join(settings.ALLOWED_UPLOAD_EXTENSIONS)}"
)
# Создаем директорию для загрузок, если она не существует
upload_dir = Path(settings.UPLOAD_DIRECTORY) / "products" / str(product_id)
upload_dir.mkdir(parents=True, exist_ok=True)
# Генерируем уникальное имя файла
unique_filename = f"{uuid.uuid4()}.{file_extension}"
file_path = upload_dir / unique_filename
# Сохраняем файл
with file_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Создаем запись об изображении в БД
image_data = ProductImageCreate(
product_id=product_id,
image_url=f"/uploads/products/{product_id}/{unique_filename}",
alt_text=file.filename,
is_primary=is_primary
)
new_image = catalog_repo.create_product_image(db, image_data)
return {"image": new_image}
def update_product_image(db: Session, image_id: int, is_primary: bool) -> Dict[str, Any]:
updated_image = catalog_repo.update_product_image(db, image_id, is_primary)
return {"image": updated_image}
def delete_product_image(db: Session, image_id: int) -> Dict[str, Any]:
# Получаем информацию об изображении перед удалением
image = catalog_repo.get_product_image(db, image_id)
if not image:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Изображение не найдено"
)
# Удаляем запись из БД
success = catalog_repo.delete_product_image(db, image_id)
# Удаляем файл с диска
if success:
try:
# Получаем путь к файлу из URL
file_path = Path(settings.UPLOAD_DIRECTORY) / image.image_url.lstrip("/uploads/")
if file_path.exists():
file_path.unlink()
except Exception:
# Если не удалось удалить файл, просто логируем ошибку
# В реальном приложении здесь должно быть логирование
pass
return {"success": success}

View File

@ -0,0 +1,50 @@
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from typing import Dict, Any, Optional
from datetime import datetime
from app.repositories import content_repo
from app.schemas.content_schemas import PageCreate, PageUpdate, AnalyticsLogCreate
# Сервисы информационных страниц
def create_page(db: Session, page: PageCreate) -> Dict[str, Any]:
new_page = content_repo.create_page(db, page)
return {"page": new_page}
def update_page(db: Session, page_id: int, page: PageUpdate) -> Dict[str, Any]:
updated_page = content_repo.update_page(db, page_id, page)
return {"page": updated_page}
def delete_page(db: Session, page_id: int) -> Dict[str, Any]:
success = content_repo.delete_page(db, page_id)
return {"success": success}
def get_page_by_slug(db: Session, slug: str) -> Dict[str, Any]:
page = content_repo.get_page_by_slug(db, slug)
if not page:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Страница не найдена"
)
return {"page": page}
# Сервисы аналитики
def log_event(db: Session, log: AnalyticsLogCreate) -> Dict[str, Any]:
new_log = content_repo.log_analytics_event(db, log)
return {"log": new_log}
def get_analytics_report(
db: Session,
period: str = "day",
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
report = content_repo.get_analytics_report(db, period, start_date, end_date)
return {"report": report}

View File

@ -0,0 +1,107 @@
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from typing import Dict, Any
from app.repositories import order_repo, content_repo
from app.schemas.order_schemas import CartItemCreate, CartItemUpdate, OrderCreate, OrderUpdate
from app.schemas.content_schemas import AnalyticsLogCreate
# Сервисы корзины и заказов
def add_to_cart(db: Session, user_id: int, cart_item: CartItemCreate) -> Dict[str, Any]:
new_cart_item = order_repo.create_cart_item(db, cart_item, user_id)
# Логируем событие добавления в корзину
log_data = AnalyticsLogCreate(
user_id=user_id,
event_type="add_to_cart",
product_id=new_cart_item.variant.product_id,
additional_data={"quantity": cart_item.quantity}
)
content_repo.log_analytics_event(db, log_data)
return {"cart_item": new_cart_item}
def update_cart_item(db: Session, user_id: int, cart_item_id: int, cart_item: CartItemUpdate) -> Dict[str, Any]:
updated_cart_item = order_repo.update_cart_item(db, cart_item_id, cart_item, user_id)
return {"cart_item": updated_cart_item}
def remove_from_cart(db: Session, user_id: int, cart_item_id: int) -> Dict[str, Any]:
success = order_repo.delete_cart_item(db, cart_item_id, user_id)
return {"success": success}
def clear_cart(db: Session, user_id: int) -> Dict[str, Any]:
success = order_repo.clear_cart(db, user_id)
return {"success": success}
def get_cart(db: Session, user_id: int) -> Dict[str, Any]:
cart_items = order_repo.get_cart_with_product_details(db, user_id)
# Рассчитываем общую сумму корзины
total_amount = sum(item["total_price"] for item in cart_items)
return {
"items": cart_items,
"total_amount": total_amount,
"items_count": len(cart_items)
}
def create_order(db: Session, user_id: int, order: OrderCreate) -> Dict[str, Any]:
new_order = order_repo.create_order(db, order, user_id)
# Логируем событие создания заказа
log_data = AnalyticsLogCreate(
user_id=user_id,
event_type="order_created",
additional_data={"order_id": new_order.id, "total_amount": new_order.total_amount}
)
content_repo.log_analytics_event(db, log_data)
return {"order": new_order}
def get_order(db: Session, user_id: int, order_id: int, is_admin: bool = False) -> Dict[str, Any]:
# Получаем заказ с деталями
order_details = order_repo.get_order_with_details(db, order_id)
# Проверяем права доступа
if not is_admin and order_details["user_id"] != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для просмотра этого заказа"
)
return {"order": order_details}
def update_order(db: Session, user_id: int, order_id: int, order: OrderUpdate, is_admin: bool = False) -> Dict[str, Any]:
updated_order = order_repo.update_order(db, order_id, order, is_admin)
# Проверяем права доступа
if not is_admin and updated_order.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для обновления этого заказа"
)
return {"order": updated_order}
def cancel_order(db: Session, user_id: int, order_id: int) -> Dict[str, Any]:
# Отменяем заказ (обычный пользователь может только отменить заказ)
order_update = OrderUpdate(status="cancelled")
updated_order = order_repo.update_order(db, order_id, order_update, is_admin=False)
# Проверяем права доступа
if updated_order.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав для отмены этого заказа"
)
return {"order": updated_order}

View File

@ -0,0 +1,41 @@
from sqlalchemy.orm import Session
from typing import Dict, Any
from app.repositories import review_repo
from app.schemas.review_schemas import ReviewCreate, ReviewUpdate
# Сервисы отзывов
def create_review(db: Session, user_id: int, review: ReviewCreate) -> Dict[str, Any]:
new_review = review_repo.create_review(db, review, user_id)
return {"review": new_review}
def update_review(db: Session, user_id: int, review_id: int, review: ReviewUpdate, is_admin: bool = False) -> Dict[str, Any]:
updated_review = review_repo.update_review(db, review_id, review, user_id, is_admin)
return {"review": updated_review}
def delete_review(db: Session, user_id: int, review_id: int, is_admin: bool = False) -> Dict[str, Any]:
success = review_repo.delete_review(db, review_id, user_id, is_admin)
return {"success": success}
def approve_review(db: Session, review_id: int) -> Dict[str, Any]:
approved_review = review_repo.approve_review(db, review_id)
return {"review": approved_review}
def get_product_reviews(db: Session, product_id: int, skip: int = 0, limit: int = 10) -> Dict[str, Any]:
reviews = review_repo.get_product_reviews(db, product_id, skip, limit)
# Получаем рейтинг продукта
rating = review_repo.get_product_rating(db, product_id)
return {
"reviews": reviews,
"rating": rating,
"total": rating["total_reviews"],
"skip": skip,
"limit": limit
}

View File

@ -0,0 +1,101 @@
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from typing import Dict, Any
from datetime import timedelta
from app.config import settings
from app.core import create_access_token
from app.repositories import user_repo, order_repo, review_repo
from app.schemas.user_schemas import UserCreate, UserUpdate, AddressCreate, AddressUpdate
# Сервисы аутентификации и пользователей
def register_user(db: Session, user: UserCreate) -> Dict[str, Any]:
# Создаем пользователя
db_user = user_repo.create_user(db, user)
# Создаем токен доступа
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": db_user.email}, expires_delta=access_token_expires
)
return {
"user": db_user,
"access_token": access_token,
"token_type": "bearer"
}
def login_user(db: Session, email: str, password: str) -> Dict[str, Any]:
# Аутентифицируем пользователя
user = user_repo.authenticate_user(db, email, password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный email или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
# Проверяем, что пользователь активен
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неактивный пользователь"
)
# Создаем токен доступа
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer"
}
def get_user_profile(db: Session, user_id: int) -> Dict[str, Any]:
user = user_repo.get_user(db, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Пользователь не найден"
)
# Получаем адреса пользователя
addresses = user_repo.get_user_addresses(db, user_id)
# Получаем заказы пользователя
orders = order_repo.get_user_orders(db, user_id)
# Получаем отзывы пользователя
reviews = review_repo.get_user_reviews(db, user_id)
return {
"user": user,
"addresses": addresses,
"orders": orders,
"reviews": reviews
}
def update_user_profile(db: Session, user_id: int, user_data: UserUpdate) -> Dict[str, Any]:
updated_user = user_repo.update_user(db, user_id, user_data)
return {"user": updated_user}
def add_user_address(db: Session, user_id: int, address: AddressCreate) -> Dict[str, Any]:
new_address = user_repo.create_address(db, address, user_id)
return {"address": new_address}
def update_user_address(db: Session, user_id: int, address_id: int, address: AddressUpdate) -> Dict[str, Any]:
updated_address = user_repo.update_address(db, address_id, address, user_id)
return {"address": updated_address}
def delete_user_address(db: Session, user_id: int, address_id: int) -> Dict[str, Any]:
success = user_repo.delete_address(db, address_id, user_id)
return {"success": success}