#!/usr/bin/env python # -*- coding: utf-8 -*- import argparse import asyncio import sqlite3 import os import schedule import time from datetime import datetime, timedelta import pandas as pd import logging from models import FinancialDataManager, NewsManager, AnalyticsManager from classes.moex_history import MOEXHistoricalData from news_parser import NewsParser from config import sector_tickers, sector_indices, DB_CONFIG, DATA_COLLECTION # Настройка логирования logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("data_collector.log"), logging.StreamHandler() ] ) logger = logging.getLogger("data_collector") class DataCollector: """Класс для сбора и обновления данных""" def __init__(self): """Инициализация сборщика данных""" self.moex_data = MOEXHistoricalData() self.news_parser = NewsParser() self.financial_manager = FinancialDataManager() self.news_manager = NewsManager() self.analytics_manager = AnalyticsManager() # Создаем директории для баз данных, если они не существуют os.makedirs('data', exist_ok=True) def _init_databases(self): """Инициализация баз данных""" # Инициализация базы данных для финансовых данных conn = sqlite3.connect(f"data/{DB_CONFIG['financial_data']}") cursor = conn.cursor() # Создаем таблицу для секторных индексов cursor.execute(''' CREATE TABLE IF NOT EXISTS sector_indices ( id INTEGER PRIMARY KEY AUTOINCREMENT, sector TEXT NOT NULL, date DATE NOT NULL, open REAL, high REAL, low REAL, close REAL, volume REAL, UNIQUE(sector, date) ) ''') # Создаем таблицу для данных по тикерам cursor.execute(''' CREATE TABLE IF NOT EXISTS ticker_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, ticker TEXT NOT NULL, sector TEXT NOT NULL, date DATE NOT NULL, open REAL, high REAL, low REAL, close REAL, volume REAL, UNIQUE(ticker, date) ) ''') conn.commit() conn.close() # Инициализация базы данных для новостей conn = sqlite3.connect(f"data/{DB_CONFIG['news_data']}") cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS news ( id INTEGER PRIMARY KEY AUTOINCREMENT, date DATE NOT NULL, topic TEXT, title TEXT NOT NULL, content TEXT NOT NULL, url TEXT, UNIQUE(title, date) ) ''') conn.commit() conn.close() # Инициализация базы данных для аналитики conn = sqlite3.connect(f"data/{DB_CONFIG['analytics']}") cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS analytics ( id INTEGER PRIMARY KEY AUTOINCREMENT, date DATE NOT NULL, summary TEXT NOT NULL, business_problems TEXT, solutions TEXT, sentiment REAL, UNIQUE(date) ) ''') conn.commit() conn.close() logger.info("Базы данных успешно инициализированы") async def _collect_sector_data(self, days=30): """Сбор данных по секторным индексам""" logger.info("Начинаем сбор данных по секторным индексам") end_date = datetime.now() start_date = end_date - timedelta(days=days) # Подключаемся к базе данных conn = sqlite3.connect(f"data/{DB_CONFIG['financial_data']}") # Собираем данные по каждому сектору for sector, index_code in sector_indices.items(): try: logger.info(f"Получаем данные для сектора: {sector}") # Получаем данные по индексу df = await self.moex_data.get_official_sector_index( sector, start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d') ) if not df.empty: # Добавляем колонку с названием сектора df['sector'] = sector # Сохраняем данные в базу df.to_sql('sector_indices', conn, if_exists='append', index=False, dtype={ 'sector': 'TEXT', 'TRADEDATE': 'DATE', 'OPEN': 'REAL', 'HIGH': 'REAL', 'LOW': 'REAL', 'CLOSE': 'REAL', 'VOLUME': 'REAL' }) logger.info(f"Сохранено {len(df)} записей для сектора {sector}") else: logger.warning(f"Нет данных для сектора {sector}") except Exception as e: logger.error(f"Ошибка при сборе данных для сектора {sector}: {str(e)}") conn.close() logger.info("Сбор данных по секторным индексам завершен") async def _collect_ticker_data(self, days=30): """Сбор данных по отдельным тикерам""" logger.info("Начинаем сбор данных по тикерам") end_date = datetime.now() start_date = end_date - timedelta(days=days) # Подключаемся к базе данных conn = sqlite3.connect(f"data/{DB_CONFIG['financial_data']}") # Собираем данные по каждому тикеру в каждом секторе for sector, tickers in sector_tickers.items(): for ticker in tickers: try: logger.info(f"Получаем данные для тикера: {ticker} (сектор: {sector})") # Получаем данные по тикеру df = await self.moex_data.get_security_history( ticker, start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d') ) if not df.empty: # Добавляем колонки с тикером и сектором df['ticker'] = ticker df['sector'] = sector # Сохраняем данные в базу df.to_sql('ticker_data', conn, if_exists='append', index=False, dtype={ 'ticker': 'TEXT', 'sector': 'TEXT', 'TRADEDATE': 'DATE', 'OPEN': 'REAL', 'HIGH': 'REAL', 'LOW': 'REAL', 'CLOSE': 'REAL', 'VOLUME': 'REAL' }) logger.info(f"Сохранено {len(df)} записей для тикера {ticker}") else: logger.warning(f"Нет данных для тикера {ticker}") except Exception as e: logger.error(f"Ошибка при сборе данных для тикера {ticker}: {str(e)}") conn.close() logger.info("Сбор данных по тикерам завершен") def _collect_news(self, days=7): """Сбор новостей""" logger.info(f"Начинаем сбор новостей за последние {days} дней") try: # Получаем новости news_df = self.news_parser.parse_news(days) if not news_df.empty: # Подключаемся к базе данных conn = sqlite3.connect(f"data/{DB_CONFIG['news_data']}") # Сохраняем новости в базу news_df.to_sql('news', conn, if_exists='append', index=False, dtype={ 'date': 'DATE', 'topic': 'TEXT', 'title': 'TEXT', 'content': 'TEXT', 'url': 'TEXT' }) conn.close() logger.info(f"Сохранено {len(news_df)} новостей") else: logger.warning("Нет новых новостей для сохранения") except Exception as e: logger.error(f"Ошибка при сборе новостей: {str(e)}") def _analyze_news(self): """Анализ новостей""" logger.info("Начинаем анализ новостей") try: # Получаем новости за последние 24 часа yesterday = datetime.now() - timedelta(days=1) news = self.news_manager.get_news_by_date(yesterday.strftime('%Y-%m-%d')) if news: # Анализируем новости analysis_result = self.analytics_manager.analyze_news(news) if analysis_result: logger.info("Анализ новостей успешно завершен") else: logger.warning("Не удалось выполнить анализ новостей") else: logger.warning("Нет новостей для анализа") except Exception as e: logger.error(f"Ошибка при анализе новостей: {str(e)}") async def collect_initial_data(self): """Сбор начальных данных при первом запуске""" logger.info("Начинаем сбор начальных данных") # Инициализируем базы данных self._init_databases() # Собираем финансовые данные за последние 90 дней await self._collect_sector_data(days=90) await self._collect_ticker_data(days=90) # Собираем новости за последние 14 дней self._collect_news(days=14) # Анализируем новости self._analyze_news() logger.info("Сбор начальных данных завершен") async def update_data(self): """Обновление данных по расписанию""" logger.info("Начинаем обновление данных") # Обновляем финансовые данные за последние 2 дня await self._collect_sector_data(days=2) await self._collect_ticker_data(days=2) # Обновляем новости за последний день self._collect_news(days=1) logger.info("Обновление данных завершено") def schedule_tasks(self): """Настройка расписания для регулярного обновления данных""" logger.info("Настраиваем расписание для обновления данных") # Обновление финансовых данных schedule.every(DATA_COLLECTION['financial_interval_hours']).hours.do( lambda: asyncio.run(self.update_data()) ) # Обновление новостей schedule.every(DATA_COLLECTION['news_interval_hours']).hours.do( lambda: self._collect_news(days=1) ) # Анализ новостей schedule.every(DATA_COLLECTION['analytics_interval_hours']).hours.do( lambda: self._analyze_news() ) logger.info("Расписание настроено") # Запускаем бесконечный цикл для выполнения задач по расписанию while True: schedule.run_pending() time.sleep(60) async def main(): """Основная функция""" parser = argparse.ArgumentParser(description='Сборщик данных для финансово-аналитической платформы') parser.add_argument('--init', action='store_true', help='Инициализировать базы данных и собрать начальные данные') parser.add_argument('--update', action='store_true', help='Обновить данные') parser.add_argument('--schedule', action='store_true', help='Запустить сбор данных по расписанию') args = parser.parse_args() collector = DataCollector() if args.init: await collector.collect_initial_data() elif args.update: await collector.update_data() elif args.schedule: collector.schedule_tasks() else: parser.print_help() if __name__ == "__main__": asyncio.run(main())