globalsy_mvp/models.py
belikovme 391757d581 Рефакторинг конфигурации и структуры проекта
- Обновлен config.py с оптимизированными словарями секторов и индексов
- Удалены устаревшие классы exchange.py и moex_class.py
- Модернизирован moex_history.py с улучшенной логикой получения данных
- Обновлен requirements.txt с современными зависимостями для финансовой платформы
- Упрощен open_router.ipynb с фокусом на экономических темах
2025-03-12 17:01:25 +07:00

383 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
from datetime import datetime, timedelta
import pandas as pd
import logging
from typing import List, Dict, Any, Optional
import os
# Для работы с OpenAI API
from openai import OpenAI
# Импортируем конфигурацию
from config import sector_indices, sector_tickers, DB_CONFIG, OPENAI_CONFIG
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("models")
class FinancialDataManager:
"""Класс для управления финансовыми данными"""
def __init__(self):
"""Инициализация менеджера финансовых данных"""
self.db_path = f"data/{DB_CONFIG['financial_data']}"
def _get_connection(self):
"""
Создает и возвращает соединение с базой данных.
Returns:
sqlite3.Connection: Объект соединения с базой данных
"""
return sqlite3.connect(self.db_path)
def get_sector_data(self, sector: str, start_date: str, end_date: str) -> List[Dict[str, Any]]:
"""
Получение данных по сектору за указанный период
Args:
sector: Название сектора
start_date: Начальная дата в формате 'YYYY-MM-DD'
end_date: Конечная дата в формате 'YYYY-MM-DD'
Returns:
List[Dict[str, Any]]: Данные по сектору
"""
query = f"""
SELECT * FROM sector_indices
WHERE sector = ? AND date BETWEEN ? AND ?
ORDER BY date
"""
try:
with self._get_connection() as conn:
df = pd.read_sql_query(query, conn, params=(sector, start_date, end_date))
if not df.empty:
# Преобразуем даты в строки для JSON
df['date'] = df['date'].astype(str)
return df.to_dict(orient='records')
return []
except Exception as e:
logger.error(f"Ошибка при получении данных сектора {sector}: {str(e)}")
return []
def get_ticker_data(self, ticker: str, start_date: str, end_date: str) -> List[Dict[str, Any]]:
"""
Получение данных по тикеру за указанный период
Args:
ticker: Тикер ценной бумаги
start_date: Начальная дата в формате 'YYYY-MM-DD'
end_date: Конечная дата в формате 'YYYY-MM-DD'
Returns:
List[Dict[str, Any]]: Данные по тикеру
"""
query = f"""
SELECT * FROM ticker_data
WHERE ticker = ? AND date BETWEEN ? AND ?
ORDER BY date
"""
try:
with self._get_connection() as conn:
df = pd.read_sql_query(query, conn, params=(ticker, start_date, end_date))
if not df.empty:
# Преобразуем даты в строки для JSON
df['date'] = df['date'].astype(str)
return df.to_dict(orient='records')
return []
except Exception as e:
logger.error(f"Ошибка при получении данных тикера {ticker}: {str(e)}")
return []
def get_all_sectors(self) -> List[str]:
"""
Получение списка всех доступных секторов
Returns:
List[str]: Список секторов
"""
return list(sector_indices.keys())
def get_all_tickers(self) -> Dict[str, List[str]]:
"""
Получение списка всех доступных тикеров, сгруппированных по секторам
Returns:
Dict[str, List[str]]: Словарь с тикерами по секторам
"""
return sector_tickers
class NewsManager:
"""Класс для управления новостями"""
def __init__(self):
"""Инициализация менеджера новостей"""
self.db_path = f"data/{DB_CONFIG['news_data']}"
def _get_connection(self):
"""
Создает и возвращает соединение с базой данных.
Returns:
sqlite3.Connection: Объект соединения с базой данных
"""
return sqlite3.connect(self.db_path)
def get_news_by_date(self, start_date: str, end_date: str = None) -> List[Dict[str, Any]]:
"""
Получение новостей за указанный период
Args:
start_date: Начальная дата в формате 'YYYY-MM-DD'
end_date: Конечная дата в формате 'YYYY-MM-DD' (опционально)
Returns:
List[Dict[str, Any]]: Данные новостей
"""
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
query = """
SELECT * FROM news
WHERE date BETWEEN ? AND ?
ORDER BY date DESC
"""
try:
with self._get_connection() as conn:
df = pd.read_sql_query(query, conn, params=(start_date, end_date))
if not df.empty:
# Преобразуем даты в строки для JSON
df['date'] = df['date'].astype(str)
return df.to_dict(orient='records')
return []
except Exception as e:
logger.error(f"Ошибка при получении новостей: {str(e)}")
return []
def get_news_by_topic(self, topic: str, start_date: str, end_date: str = None) -> List[Dict[str, Any]]:
"""
Получение новостей по теме за указанный период
Args:
topic: Тема новостей
start_date: Начальная дата в формате 'YYYY-MM-DD'
end_date: Конечная дата в формате 'YYYY-MM-DD' (опционально)
Returns:
List[Dict[str, Any]]: Данные новостей
"""
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
query = """
SELECT * FROM news
WHERE topic = ? AND date BETWEEN ? AND ?
ORDER BY date DESC
"""
try:
with self._get_connection() as conn:
df = pd.read_sql_query(query, conn, params=(topic, start_date, end_date))
if not df.empty:
# Преобразуем даты в строки для JSON
df['date'] = df['date'].astype(str)
return df.to_dict(orient='records')
return []
except Exception as e:
logger.error(f"Ошибка при получении новостей по теме {topic}: {str(e)}")
return []
def get_topics(self) -> List[str]:
"""
Получение списка всех доступных тем новостей
Returns:
List[str]: Список тем
"""
query = "SELECT DISTINCT topic FROM news"
try:
with self._get_connection() as conn:
df = pd.read_sql_query(query, conn)
return df['topic'].tolist()
except Exception as e:
logger.error(f"Ошибка при получении списка тем новостей: {str(e)}")
return []
class AnalyticsManager:
"""Класс для управления аналитикой"""
def __init__(self):
"""Инициализация менеджера аналитики"""
self.db_path = f"data/{DB_CONFIG['analytics']}"
self.news_manager = NewsManager()
def _get_connection(self):
"""
Создает и возвращает соединение с базой данных.
Returns:
sqlite3.Connection: Объект соединения с базой данных
"""
return sqlite3.connect(self.db_path)
def analyze_news(self, news: List[Dict[str, Any]]) -> bool:
"""
Анализирует новости и сохраняет результаты в базу данных
Args:
news: Список новостей для анализа
Returns:
bool: Успешность операции
"""
if not news:
logger.warning("Нет новостей для анализа")
return False
# Собираем контент новостей для анализа
news_content = [item['content'] for item in news if 'content' in item]
if not news_content:
logger.warning("Нет содержимого новостей для анализа")
return False
# Подготавливаем промпт для анализа
prompt = """
Проанализируйте следующие новости и предоставьте:
1. Краткую сводку основных событий
2. Выявленные бизнес-проблемы
3. Возможные решения для этих проблем
4. Общую оценку настроения новостей (от -1 до 1, где -1 - крайне негативное, 1 - крайне позитивное)
Формат ответа:
{
"summary": "Краткая сводка основных событий...",
"business_problems": "Проблема 1... Проблема 2...",
"solutions": "Решение 1... Решение 2...",
"sentiment": 0.5
}
"""
try:
# Создаем клиент для OpenAI API
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", ""))
# Преобразуем массив новостей в строку
news_text = "\n\n".join(news_content)
# Отправляем запрос к API
completion = client.chat.completions.create(
model=OPENAI_CONFIG['model'],
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": news_text}
],
temperature=OPENAI_CONFIG['temperature'],
max_tokens=OPENAI_CONFIG['max_tokens'],
response_format={"type": "json_object"}
)
# Получаем результат анализа
analysis_result = completion.choices[0].message.content
# Парсим JSON-ответ
import json
result = json.loads(analysis_result)
# Создаем запись в базе данных
today = datetime.now().date().strftime("%Y-%m-%d")
with self._get_connection() as conn:
# Проверяем, есть ли уже анализ за сегодня
cursor = conn.cursor()
cursor.execute("SELECT id FROM analytics WHERE date = ?", (today,))
existing = cursor.fetchone()
if existing:
# Обновляем существующий анализ
cursor.execute(
"""
UPDATE analytics
SET summary = ?, business_problems = ?, solutions = ?, sentiment = ?
WHERE date = ?
""",
(
result.get("summary", ""),
result.get("business_problems", ""),
result.get("solutions", ""),
result.get("sentiment", 0),
today
)
)
logger.info(f"Анализ за {today} обновлен")
else:
# Создаем новый анализ
cursor.execute(
"""
INSERT INTO analytics (date, summary, business_problems, solutions, sentiment)
VALUES (?, ?, ?, ?, ?)
""",
(
today,
result.get("summary", ""),
result.get("business_problems", ""),
result.get("solutions", ""),
result.get("sentiment", 0)
)
)
logger.info(f"Анализ за {today} сохранен")
conn.commit()
return True
except Exception as e:
logger.error(f"Ошибка при анализе новостей: {str(e)}")
return False
def get_analytics_by_date(self, start_date: str, end_date: str = None) -> List[Dict[str, Any]]:
"""
Получение аналитики за указанный период
Args:
start_date: Начальная дата в формате 'YYYY-MM-DD'
end_date: Конечная дата в формате 'YYYY-MM-DD' (опционально)
Returns:
List[Dict[str, Any]]: Данные аналитики
"""
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
query = """
SELECT * FROM analytics
WHERE date BETWEEN ? AND ?
ORDER BY date DESC
"""
try:
with self._get_connection() as conn:
df = pd.read_sql_query(query, conn, params=(start_date, end_date))
if not df.empty:
# Преобразуем даты в строки для JSON
df['date'] = df['date'].astype(str)
return df.to_dict(orient='records')
return []
except Exception as e:
logger.error(f"Ошибка при получении аналитики: {str(e)}")
return []