import pandas as pd import requests from datetime import datetime, timedelta from typing import List, Dict, Optional import asyncio import aiohttp import numpy as np class MOEXHistoricalData: def __init__(self): self.base_url = "https://iss.moex.com/iss" # Словарь соответствия секторов и их индексов на MOEX self.sector_indices = { 'metals_mining': 'MOEXMM', # Индекс Металлов и добычи 'oil_gas': 'MOEXOG', # Индекс Нефти и газа 'chemicals': 'MOEXCH', # Индекс Химии и нефтехимии 'electric_utilities': 'MOEXEU', # Индекс Электроэнергетики 'telecom': 'MOEXTL', # Индекс Телекоммуникаций 'finance': 'MOEXFN', # Индекс Финансов 'consumer': 'MOEXCN', # Индекс Потребительского сектора 'transport': 'MOEXTN' # Индекс Транспорта } async def get_security_history( self, ticker: str, start_date: str, end_date: str, engine: str = "stock", market: str = "shares", board: str = "TQBR" ) -> pd.DataFrame: """ Получение исторических данных по отдельному тикеру Args: ticker: Тикер акции start_date: Начальная дата в формате YYYY-MM-DD end_date: Конечная дата в формате YYYY-MM-DD engine: Торговый движок (по умолчанию stock) market: Рынок (по умолчанию shares) board: Режим торгов (по умолчанию TQBR) Returns: DataFrame с историческими данными """ url = f"{self.base_url}/history/engines/{engine}/markets/{market}/boards/{board}/securities/{ticker}.json" all_data = [] start = 0 async with aiohttp.ClientSession() as session: while True: params = { "from": start_date, "till": end_date, "start": start, "limit": 100 } async with session.get(url, params=params) as response: data = await response.json() # Получаем данные истории history_data = data['history'] if not history_data['data']: break # Добавляем данные в общий список all_data.extend(history_data['data']) start += 100 if len(history_data['data']) < 100: break # Создаем DataFrame df = pd.DataFrame(all_data, columns=history_data['columns']) # Конвертируем даты и числовые значения df['TRADEDATE'] = pd.to_datetime(df['TRADEDATE']) numeric_columns = ['OPEN', 'HIGH', 'LOW', 'CLOSE', 'VALUE', 'VOLUME'] df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric) return df async def get_sector_history( self, sector_tickers: List[str], start_date: str, end_date: str, engine: str = "stock", market: str = "shares", board: str = "TQBR" ) -> Dict[str, pd.DataFrame]: """ Получение исторических данных по всем тикерам сектора Args: sector_tickers: Список тикеров сектора start_date: Начальная дата в формате YYYY-MM-DD end_date: Конечная дата в формате YYYY-MM-DD engine: Торговый движок (по умолчанию stock) market: Рынок (по умолчанию shares) board: Режим торгов (по умолчанию TQBR) Returns: Словарь {тикер: DataFrame с историческими данными} """ tasks = [] for ticker in sector_tickers: task = self.get_security_history( ticker=ticker, start_date=start_date, end_date=end_date, engine=engine, market=market, board=board ) tasks.append(task) results = await asyncio.gather(*tasks) return dict(zip(sector_tickers, results)) async def get_official_sector_index( self, sector: str, start_date: str, end_date: str ) -> pd.DataFrame: """ Получение официального отраслевого индекса с MOEX Args: sector: Название сектора (ключ из словаря sector_indices) start_date: Начальная дата в формате YYYY-MM-DD end_date: Конечная дата в формате YYYY-MM-DD Returns: DataFrame с данными индекса """ if sector not in self.sector_indices: raise ValueError(f"Неизвестный сектор: {sector}. Доступные секторы: {list(self.sector_indices.keys())}") index_ticker = self.sector_indices[sector] url = f"{self.base_url}/history/engines/stock/markets/index/securities/{index_ticker}.json" all_data = [] start = 0 async with aiohttp.ClientSession() as session: while True: params = { "from": start_date, "till": end_date, "start": start, "limit": 100 } async with session.get(url, params=params) as response: data = await response.json() history_data = data['history'] if not history_data['data']: break all_data.extend(history_data['data']) start += 100 if len(history_data['data']) < 100: break df = pd.DataFrame(all_data, columns=history_data['columns']) # Конвертируем даты и числовые значения df['TRADEDATE'] = pd.to_datetime(df['TRADEDATE']) numeric_columns = ['OPEN', 'HIGH', 'LOW', 'CLOSE', 'VALUE', 'VOLUME'] df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric) return df def calculate_sector_index( self, sector_data: Dict[str, pd.DataFrame], weights: Optional[Dict[str, float]] = None ) -> pd.DataFrame: """ Расчет индекса сектора на основе исторических данных входящих в него компаний Args: sector_data: Словарь с историческими данными по тикерам {тикер: DataFrame} weights: Словарь с весами компаний {тикер: вес}. Если None, веса будут равными Returns: DataFrame с рассчитанным индексом сектора """ # Если веса не указаны, используем равные веса if weights is None: weights = {ticker: 1/len(sector_data) for ticker in sector_data.keys()} # Создаем DataFrame с датами и ценами закрытия для каждого тикера prices_df = pd.DataFrame() for ticker, df in sector_data.items(): prices_df[ticker] = df.set_index('TRADEDATE')['CLOSE'] # Рассчитываем относительное изменение цен returns_df = prices_df.pct_change() # Рассчитываем взвешенную доходность индекса weighted_returns = pd.DataFrame() for ticker in returns_df.columns: weighted_returns[ticker] = returns_df[ticker] * weights[ticker] index_returns = weighted_returns.sum(axis=1) # Рассчитываем значения индекса index_values = (1 + index_returns).cumprod() * 1000 # Начальное значение 1000 # Создаем итоговый DataFrame result_df = pd.DataFrame({ 'INDEX_VALUE': index_values, 'INDEX_RETURN': index_returns }) return result_df