globalsy_mvp/data_collector.py
belikovme 391757d581 Рефакторинг конфигурации и структуры проекта
- Обновлен config.py с оптимизированными словарями секторов и индексов
- Удалены устаревшие классы exchange.py и moex_class.py
- Модернизирован moex_history.py с улучшенной логикой получения данных
- Обновлен requirements.txt с современными зависимостями для финансовой платформы
- Упрощен open_router.ipynb с фокусом на экономических темах
2025-03-12 17:01:25 +07:00

356 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import asyncio
import sqlite3
import os
import schedule
import time
from datetime import datetime, timedelta
import pandas as pd
import logging
from models import FinancialDataManager, NewsManager, AnalyticsManager
from classes.moex_history import MOEXHistoricalData
from news_parser import NewsParser
from config import sector_tickers, sector_indices, DB_CONFIG, DATA_COLLECTION
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("data_collector.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("data_collector")
class DataCollector:
"""Класс для сбора и обновления данных"""
def __init__(self):
"""Инициализация сборщика данных"""
self.moex_data = MOEXHistoricalData()
self.news_parser = NewsParser()
self.financial_manager = FinancialDataManager()
self.news_manager = NewsManager()
self.analytics_manager = AnalyticsManager()
# Создаем директории для баз данных, если они не существуют
os.makedirs('data', exist_ok=True)
def _init_databases(self):
"""Инициализация баз данных"""
# Инициализация базы данных для финансовых данных
conn = sqlite3.connect(f"data/{DB_CONFIG['financial_data']}")
cursor = conn.cursor()
# Создаем таблицу для секторных индексов
cursor.execute('''
CREATE TABLE IF NOT EXISTS sector_indices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sector TEXT NOT NULL,
date DATE NOT NULL,
open REAL,
high REAL,
low REAL,
close REAL,
volume REAL,
UNIQUE(sector, date)
)
''')
# Создаем таблицу для данных по тикерам
cursor.execute('''
CREATE TABLE IF NOT EXISTS ticker_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
sector TEXT NOT NULL,
date DATE NOT NULL,
open REAL,
high REAL,
low REAL,
close REAL,
volume REAL,
UNIQUE(ticker, date)
)
''')
conn.commit()
conn.close()
# Инициализация базы данных для новостей
conn = sqlite3.connect(f"data/{DB_CONFIG['news_data']}")
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date DATE NOT NULL,
topic TEXT,
title TEXT NOT NULL,
content TEXT NOT NULL,
url TEXT,
UNIQUE(title, date)
)
''')
conn.commit()
conn.close()
# Инициализация базы данных для аналитики
conn = sqlite3.connect(f"data/{DB_CONFIG['analytics']}")
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS analytics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date DATE NOT NULL,
summary TEXT NOT NULL,
business_problems TEXT,
solutions TEXT,
sentiment REAL,
UNIQUE(date)
)
''')
conn.commit()
conn.close()
logger.info("Базы данных успешно инициализированы")
async def _collect_sector_data(self, days=30):
"""Сбор данных по секторным индексам"""
logger.info("Начинаем сбор данных по секторным индексам")
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# Подключаемся к базе данных
conn = sqlite3.connect(f"data/{DB_CONFIG['financial_data']}")
# Собираем данные по каждому сектору
for sector, index_code in sector_indices.items():
try:
logger.info(f"Получаем данные для сектора: {sector}")
# Получаем данные по индексу
df = await self.moex_data.get_official_sector_index(
sector,
start_date.strftime('%Y-%m-%d'),
end_date.strftime('%Y-%m-%d')
)
if not df.empty:
# Добавляем колонку с названием сектора
df['sector'] = sector
# Сохраняем данные в базу
df.to_sql('sector_indices', conn, if_exists='append', index=False,
dtype={
'sector': 'TEXT',
'TRADEDATE': 'DATE',
'OPEN': 'REAL',
'HIGH': 'REAL',
'LOW': 'REAL',
'CLOSE': 'REAL',
'VOLUME': 'REAL'
})
logger.info(f"Сохранено {len(df)} записей для сектора {sector}")
else:
logger.warning(f"Нет данных для сектора {sector}")
except Exception as e:
logger.error(f"Ошибка при сборе данных для сектора {sector}: {str(e)}")
conn.close()
logger.info("Сбор данных по секторным индексам завершен")
async def _collect_ticker_data(self, days=30):
"""Сбор данных по отдельным тикерам"""
logger.info("Начинаем сбор данных по тикерам")
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# Подключаемся к базе данных
conn = sqlite3.connect(f"data/{DB_CONFIG['financial_data']}")
# Собираем данные по каждому тикеру в каждом секторе
for sector, tickers in sector_tickers.items():
for ticker in tickers:
try:
logger.info(f"Получаем данные для тикера: {ticker} (сектор: {sector})")
# Получаем данные по тикеру
df = await self.moex_data.get_security_history(
ticker,
start_date.strftime('%Y-%m-%d'),
end_date.strftime('%Y-%m-%d')
)
if not df.empty:
# Добавляем колонки с тикером и сектором
df['ticker'] = ticker
df['sector'] = sector
# Сохраняем данные в базу
df.to_sql('ticker_data', conn, if_exists='append', index=False,
dtype={
'ticker': 'TEXT',
'sector': 'TEXT',
'TRADEDATE': 'DATE',
'OPEN': 'REAL',
'HIGH': 'REAL',
'LOW': 'REAL',
'CLOSE': 'REAL',
'VOLUME': 'REAL'
})
logger.info(f"Сохранено {len(df)} записей для тикера {ticker}")
else:
logger.warning(f"Нет данных для тикера {ticker}")
except Exception as e:
logger.error(f"Ошибка при сборе данных для тикера {ticker}: {str(e)}")
conn.close()
logger.info("Сбор данных по тикерам завершен")
def _collect_news(self, days=7):
"""Сбор новостей"""
logger.info(f"Начинаем сбор новостей за последние {days} дней")
try:
# Получаем новости
news_df = self.news_parser.parse_news(days)
if not news_df.empty:
# Подключаемся к базе данных
conn = sqlite3.connect(f"data/{DB_CONFIG['news_data']}")
# Сохраняем новости в базу
news_df.to_sql('news', conn, if_exists='append', index=False,
dtype={
'date': 'DATE',
'topic': 'TEXT',
'title': 'TEXT',
'content': 'TEXT',
'url': 'TEXT'
})
conn.close()
logger.info(f"Сохранено {len(news_df)} новостей")
else:
logger.warning("Нет новых новостей для сохранения")
except Exception as e:
logger.error(f"Ошибка при сборе новостей: {str(e)}")
def _analyze_news(self):
"""Анализ новостей"""
logger.info("Начинаем анализ новостей")
try:
# Получаем новости за последние 24 часа
yesterday = datetime.now() - timedelta(days=1)
news = self.news_manager.get_news_by_date(yesterday.strftime('%Y-%m-%d'))
if news:
# Анализируем новости
analysis_result = self.analytics_manager.analyze_news(news)
if analysis_result:
logger.info("Анализ новостей успешно завершен")
else:
logger.warning("Не удалось выполнить анализ новостей")
else:
logger.warning("Нет новостей для анализа")
except Exception as e:
logger.error(f"Ошибка при анализе новостей: {str(e)}")
async def collect_initial_data(self):
"""Сбор начальных данных при первом запуске"""
logger.info("Начинаем сбор начальных данных")
# Инициализируем базы данных
self._init_databases()
# Собираем финансовые данные за последние 90 дней
await self._collect_sector_data(days=90)
await self._collect_ticker_data(days=90)
# Собираем новости за последние 14 дней
self._collect_news(days=14)
# Анализируем новости
self._analyze_news()
logger.info("Сбор начальных данных завершен")
async def update_data(self):
"""Обновление данных по расписанию"""
logger.info("Начинаем обновление данных")
# Обновляем финансовые данные за последние 2 дня
await self._collect_sector_data(days=2)
await self._collect_ticker_data(days=2)
# Обновляем новости за последний день
self._collect_news(days=1)
logger.info("Обновление данных завершено")
def schedule_tasks(self):
"""Настройка расписания для регулярного обновления данных"""
logger.info("Настраиваем расписание для обновления данных")
# Обновление финансовых данных
schedule.every(DATA_COLLECTION['financial_interval_hours']).hours.do(
lambda: asyncio.run(self.update_data())
)
# Обновление новостей
schedule.every(DATA_COLLECTION['news_interval_hours']).hours.do(
lambda: self._collect_news(days=1)
)
# Анализ новостей
schedule.every(DATA_COLLECTION['analytics_interval_hours']).hours.do(
lambda: self._analyze_news()
)
logger.info("Расписание настроено")
# Запускаем бесконечный цикл для выполнения задач по расписанию
while True:
schedule.run_pending()
time.sleep(60)
async def main():
"""Основная функция"""
parser = argparse.ArgumentParser(description='Сборщик данных для финансово-аналитической платформы')
parser.add_argument('--init', action='store_true', help='Инициализировать базы данных и собрать начальные данные')
parser.add_argument('--update', action='store_true', help='Обновить данные')
parser.add_argument('--schedule', action='store_true', help='Запустить сбор данных по расписанию')
args = parser.parse_args()
collector = DataCollector()
if args.init:
await collector.collect_initial_data()
elif args.update:
await collector.update_data()
elif args.schedule:
collector.schedule_tasks()
else:
parser.print_help()
if __name__ == "__main__":
asyncio.run(main())