import sqlite3 from datetime import datetime, timedelta import pandas as pd import logging from typing import List, Dict, Any, Optional import os # Для работы с OpenAI API from openai import OpenAI # Импортируем конфигурацию from config import sector_indices, sector_tickers, DB_CONFIG, OPENAI_CONFIG # Настройка логирования logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("models") class FinancialDataManager: """Класс для управления финансовыми данными""" def __init__(self): """Инициализация менеджера финансовых данных""" self.db_path = f"data/{DB_CONFIG['financial_data']}" def _get_connection(self): """ Создает и возвращает соединение с базой данных. Returns: sqlite3.Connection: Объект соединения с базой данных """ return sqlite3.connect(self.db_path) def get_sector_data(self, sector: str, start_date: str, end_date: str) -> List[Dict[str, Any]]: """ Получение данных по сектору за указанный период Args: sector: Название сектора start_date: Начальная дата в формате 'YYYY-MM-DD' end_date: Конечная дата в формате 'YYYY-MM-DD' Returns: List[Dict[str, Any]]: Данные по сектору """ query = f""" SELECT * FROM sector_indices WHERE sector = ? AND date BETWEEN ? AND ? ORDER BY date """ try: with self._get_connection() as conn: df = pd.read_sql_query(query, conn, params=(sector, start_date, end_date)) if not df.empty: # Преобразуем даты в строки для JSON df['date'] = df['date'].astype(str) return df.to_dict(orient='records') return [] except Exception as e: logger.error(f"Ошибка при получении данных сектора {sector}: {str(e)}") return [] def get_ticker_data(self, ticker: str, start_date: str, end_date: str) -> List[Dict[str, Any]]: """ Получение данных по тикеру за указанный период Args: ticker: Тикер ценной бумаги start_date: Начальная дата в формате 'YYYY-MM-DD' end_date: Конечная дата в формате 'YYYY-MM-DD' Returns: List[Dict[str, Any]]: Данные по тикеру """ query = f""" SELECT * FROM ticker_data WHERE ticker = ? AND date BETWEEN ? AND ? ORDER BY date """ try: with self._get_connection() as conn: df = pd.read_sql_query(query, conn, params=(ticker, start_date, end_date)) if not df.empty: # Преобразуем даты в строки для JSON df['date'] = df['date'].astype(str) return df.to_dict(orient='records') return [] except Exception as e: logger.error(f"Ошибка при получении данных тикера {ticker}: {str(e)}") return [] def get_all_sectors(self) -> List[str]: """ Получение списка всех доступных секторов Returns: List[str]: Список секторов """ return list(sector_indices.keys()) def get_all_tickers(self) -> Dict[str, List[str]]: """ Получение списка всех доступных тикеров, сгруппированных по секторам Returns: Dict[str, List[str]]: Словарь с тикерами по секторам """ return sector_tickers class NewsManager: """Класс для управления новостями""" def __init__(self): """Инициализация менеджера новостей""" self.db_path = f"data/{DB_CONFIG['news_data']}" def _get_connection(self): """ Создает и возвращает соединение с базой данных. Returns: sqlite3.Connection: Объект соединения с базой данных """ return sqlite3.connect(self.db_path) def get_news_by_date(self, start_date: str, end_date: str = None) -> List[Dict[str, Any]]: """ Получение новостей за указанный период Args: start_date: Начальная дата в формате 'YYYY-MM-DD' end_date: Конечная дата в формате 'YYYY-MM-DD' (опционально) Returns: List[Dict[str, Any]]: Данные новостей """ if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") query = """ SELECT * FROM news WHERE date BETWEEN ? AND ? ORDER BY date DESC """ try: with self._get_connection() as conn: df = pd.read_sql_query(query, conn, params=(start_date, end_date)) if not df.empty: # Преобразуем даты в строки для JSON df['date'] = df['date'].astype(str) return df.to_dict(orient='records') return [] except Exception as e: logger.error(f"Ошибка при получении новостей: {str(e)}") return [] def get_news_by_topic(self, topic: str, start_date: str, end_date: str = None) -> List[Dict[str, Any]]: """ Получение новостей по теме за указанный период Args: topic: Тема новостей start_date: Начальная дата в формате 'YYYY-MM-DD' end_date: Конечная дата в формате 'YYYY-MM-DD' (опционально) Returns: List[Dict[str, Any]]: Данные новостей """ if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") query = """ SELECT * FROM news WHERE topic = ? AND date BETWEEN ? AND ? ORDER BY date DESC """ try: with self._get_connection() as conn: df = pd.read_sql_query(query, conn, params=(topic, start_date, end_date)) if not df.empty: # Преобразуем даты в строки для JSON df['date'] = df['date'].astype(str) return df.to_dict(orient='records') return [] except Exception as e: logger.error(f"Ошибка при получении новостей по теме {topic}: {str(e)}") return [] def get_topics(self) -> List[str]: """ Получение списка всех доступных тем новостей Returns: List[str]: Список тем """ query = "SELECT DISTINCT topic FROM news" try: with self._get_connection() as conn: df = pd.read_sql_query(query, conn) return df['topic'].tolist() except Exception as e: logger.error(f"Ошибка при получении списка тем новостей: {str(e)}") return [] class AnalyticsManager: """Класс для управления аналитикой""" def __init__(self): """Инициализация менеджера аналитики""" self.db_path = f"data/{DB_CONFIG['analytics']}" self.news_manager = NewsManager() def _get_connection(self): """ Создает и возвращает соединение с базой данных. Returns: sqlite3.Connection: Объект соединения с базой данных """ return sqlite3.connect(self.db_path) def analyze_news(self, news: List[Dict[str, Any]]) -> bool: """ Анализирует новости и сохраняет результаты в базу данных Args: news: Список новостей для анализа Returns: bool: Успешность операции """ if not news: logger.warning("Нет новостей для анализа") return False # Собираем контент новостей для анализа news_content = [item['content'] for item in news if 'content' in item] if not news_content: logger.warning("Нет содержимого новостей для анализа") return False # Подготавливаем промпт для анализа prompt = """ Проанализируйте следующие новости и предоставьте: 1. Краткую сводку основных событий 2. Выявленные бизнес-проблемы 3. Возможные решения для этих проблем 4. Общую оценку настроения новостей (от -1 до 1, где -1 - крайне негативное, 1 - крайне позитивное) Формат ответа: { "summary": "Краткая сводка основных событий...", "business_problems": "Проблема 1... Проблема 2...", "solutions": "Решение 1... Решение 2...", "sentiment": 0.5 } """ try: # Создаем клиент для OpenAI API client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "")) # Преобразуем массив новостей в строку news_text = "\n\n".join(news_content) # Отправляем запрос к API completion = client.chat.completions.create( model=OPENAI_CONFIG['model'], messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": news_text} ], temperature=OPENAI_CONFIG['temperature'], max_tokens=OPENAI_CONFIG['max_tokens'], response_format={"type": "json_object"} ) # Получаем результат анализа analysis_result = completion.choices[0].message.content # Парсим JSON-ответ import json result = json.loads(analysis_result) # Создаем запись в базе данных today = datetime.now().date().strftime("%Y-%m-%d") with self._get_connection() as conn: # Проверяем, есть ли уже анализ за сегодня cursor = conn.cursor() cursor.execute("SELECT id FROM analytics WHERE date = ?", (today,)) existing = cursor.fetchone() if existing: # Обновляем существующий анализ cursor.execute( """ UPDATE analytics SET summary = ?, business_problems = ?, solutions = ?, sentiment = ? WHERE date = ? """, ( result.get("summary", ""), result.get("business_problems", ""), result.get("solutions", ""), result.get("sentiment", 0), today ) ) logger.info(f"Анализ за {today} обновлен") else: # Создаем новый анализ cursor.execute( """ INSERT INTO analytics (date, summary, business_problems, solutions, sentiment) VALUES (?, ?, ?, ?, ?) """, ( today, result.get("summary", ""), result.get("business_problems", ""), result.get("solutions", ""), result.get("sentiment", 0) ) ) logger.info(f"Анализ за {today} сохранен") conn.commit() return True except Exception as e: logger.error(f"Ошибка при анализе новостей: {str(e)}") return False def get_analytics_by_date(self, start_date: str, end_date: str = None) -> List[Dict[str, Any]]: """ Получение аналитики за указанный период Args: start_date: Начальная дата в формате 'YYYY-MM-DD' end_date: Конечная дата в формате 'YYYY-MM-DD' (опционально) Returns: List[Dict[str, Any]]: Данные аналитики """ if not end_date: end_date = datetime.now().strftime("%Y-%m-%d") query = """ SELECT * FROM analytics WHERE date BETWEEN ? AND ? ORDER BY date DESC """ try: with self._get_connection() as conn: df = pd.read_sql_query(query, conn, params=(start_date, end_date)) if not df.empty: # Преобразуем даты в строки для JSON df['date'] = df['date'].astype(str) return df.to_dict(orient='records') return [] except Exception as e: logger.error(f"Ошибка при получении аналитики: {str(e)}") return []