In [1]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
from datetime import datetime
import yfinance as yf
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, Tuple
import warnings
warnings.filterwarnings('ignore')
# Set style
sns.set_style('whitegrid')
plt.style.use('ggplot')
plt.rcParams['figure.figsize'] = (15, 7.7)
plt.rcParams['font.size'] = 11
# Configuration
CONFIG = {
'ticker': '^GSPC',
'start_date': '2000-01-01',
'lookforward_window': 20,
'test_size': 100,
'lag_features': [1, 20],
'random_seed': 5678
}
np.random.seed(CONFIG['random_seed'])
def download_stock_data(ticker: str, start_date: str) -> pd.DataFrame:
try:
print(f"Downloading data for {ticker}...")
stock = yf.Ticker(ticker)
hist = stock.history(period="max")
if hist.empty:
raise ValueError(f"No data retrieved for {ticker}")
df = hist[hist.index >= start_date].copy()
print(f"Downloaded {len(df)} records from {df.index.min().date()} to {df.index.max().date()}")
return df
except Exception as e:
print(f"Error downloading data: {str(e)}")
raise
def calculate_forward_max_return(df: pd.DataFrame,
price_col: str = 'Close',
window: int = 20) -> pd.DataFrame:
df = df.copy()
# Fixed: Correct forward-looking calculation using list comprehension
future_max = []
for i in range(len(df)):
if i + window >= len(df):
future_max.append(np.nan)
else:
future_max.append(df[price_col].iloc[i+1:i+window+1].max())
df['future_max'] = future_max
df['target'] = 100 * (df['future_max'] - df[price_col]) / df[price_col]
df = df.dropna(subset=['target'])
print(f"Calculated forward {window}-day max return")
print(f"Target stats: mean={df['target'].mean():.2f}%, std={df['target'].std():.2f}%")
print(f"Range: [{df['target'].min():.2f}%, {df['target'].max():.2f}%]")
return df
def create_lagged_features(df: pd.DataFrame,
target_col: str = 'target',
lags: list = [1, 20]) -> pd.DataFrame:
df = df.copy()
for lag in lags:
df[f'lag{lag}'] = df[target_col].shift(lag)
df = df.dropna()
print(f"Created lagged features: {[f'lag{l}' for l in lags]}")
print(f"Final dataset size: {len(df)} observations")
return df
def split_train_test(df: pd.DataFrame, test_size: int) -> Tuple[pd.DataFrame, pd.DataFrame]:
train_df = df.iloc[:-test_size].copy()
test_df = df.iloc[-test_size:].copy()
print(f"Data Split:")
print(f"Training: {len(train_df)} samples ({train_df.index.min().date()} to {train_df.index.max().date()})")
print(f"Testing: {len(test_df)} samples ({test_df.index.min().date()} to {test_df.index.max().date()})")
return train_df, test_df
def fit_markov_models(train_df: pd.DataFrame,
target_col: str = 'target') -> Dict:
models = {}
print("\n" + "="*70)
print("FITTING MARKOV SWITCHING MODELS")
print("="*70)
# Model 1: 2-regime, switching intercept only
print("\n[1/5] Fitting 2-regime model (switching intercept)...")
try:
model_2reg = sm.tsa.MarkovRegression(
train_df[target_col],
k_regimes=2
)
result_2reg = model_2reg.fit(disp=False)
models['2_regime'] = {
'model': model_2reg,
'result': result_2reg,
'description': '2 regimes, switching intercept'
}
print(f"AIC: {result_2reg.aic:.2f}, BIC: {result_2reg.bic:.2f}")
except Exception as e:
print(f"Failed: {str(e)}")
# Model 2: 5-regime, switching intercept only
print("\n[2/5] Fitting 5-regime model (switching intercept)...")
try:
model_5reg = sm.tsa.MarkovRegression(
train_df[target_col],
k_regimes=5
)
result_5reg = model_5reg.fit(disp=False)
models['5_regime'] = {
'model': model_5reg,
'result': result_5reg,
'description': '5 regimes, switching intercept'
}
print(f"AIC: {result_5reg.aic:.2f}, BIC: {result_5reg.bic:.2f}")
except Exception as e:
print(f"Failed: {str(e)}")
# Model 3: 3-regime with lag1 and lag20 exogenous variables
print("\n[3/5] Fitting 3-regime model with lag features...")
try:
model_3reg_exog = sm.tsa.MarkovRegression(
train_df[target_col],
k_regimes=3,
exog=train_df[['lag1', 'lag20']]
)
result_3reg_exog = model_3reg_exog.fit(disp=False)
models['3_regime_exog'] = {
'model': model_3reg_exog,
'result': result_3reg_exog,
'description': '3 regimes, switching intercept & exog (lag1, lag20)'
}
print(f"AIC: {result_3reg_exog.aic:.2f}, BIC: {result_3reg_exog.bic:.2f}")
except Exception as e:
print(f"Failed: {str(e)}")
# Model 4: 3-regime with lag20 and switching variance
print("\n[4/5] Fitting 3-regime model with switching variance...")
try:
model_3reg_var = sm.tsa.MarkovRegression(
train_df[target_col],
k_regimes=3,
trend='c',
switching_variance=True,
exog=train_df[['lag20']]
)
result_3reg_var = model_3reg_var.fit(search_reps=50, method='bfgs', disp=False)
models['3_regime_var'] = {
'model': model_3reg_var,
'result': result_3reg_var,
'description': '3 regimes, switching variance, lag20'
}
print(f"AIC: {result_3reg_var.aic:.2f}, BIC: {result_3reg_var.bic:.2f}")
except Exception as e:
print(f"Failed: {str(e)}")
# Model 5: Best model with both lags and switching variance
print("\n[5/5] Fitting comprehensive 3-regime model...")
try:
model_best = sm.tsa.MarkovRegression(
train_df[target_col],
k_regimes=3,
trend='c',
switching_variance=True,
exog=train_df[['lag1', 'lag20']]
)
result_best = model_best.fit(search_reps=50, method='bfgs', disp=False)
models['3_regime_full'] = {
'model': model_best,
'result': result_best,
'description': '3 regimes, switching variance, lag1 & lag20'
}
print(f"AIC: {result_best.aic:.2f}, BIC: {result_best.bic:.2f}")
except Exception as e:
print(f"Failed: {str(e)}")
return models
def compare_models(models: Dict) -> pd.DataFrame:
comparison_data = []
for name, model_dict in models.items():
result = model_dict['result']
comparison_data.append({
'Model': name,
'Description': model_dict['description'],
'K_Regimes': result.k_regimes,
'N_Params': len(result.params),
'Log_Likelihood': result.llf,
'AIC': result.aic,
'BIC': result.bic,
'HQIC': result.hqic if hasattr(result, 'hqic') else np.nan
})
df_comparison = pd.DataFrame(comparison_data)
df_comparison = df_comparison.sort_values('BIC')
print("\n" + "="*70)
print("MODEL COMPARISON")
print("="*70)
print(df_comparison.to_string(index=False))
print("\nLower AIC/BIC indicates better model fit")
return df_comparison
def analyze_regime_characteristics(result, model_name: str):
print(f"\n{'='*70}")
print(f"REGIME ANALYSIS: {model_name}")
print(f"{'='*70}")
k_regimes = result.k_regimes
print("\nExpected Regime Durations:")
durations = result.expected_durations
for i, duration in enumerate(durations):
print(f"Regime {i}: {duration:.2f} days ({duration/252:.2f} years)")
print("\nRegime Mean Returns:")
for i in range(k_regimes):
param_name = f'regime[{i}].const' if f'regime[{i}].const' in result.params.index else f'const'
if param_name in result.params.index:
mean_val = result.params[param_name]
print(f"Regime {i}: {mean_val:.3f}%")
print("\nTransition Probability Matrix:")
transition_matrix = result.regime_transition.reshape(k_regimes, k_regimes)
df_trans = pd.DataFrame(
transition_matrix,
columns=[f"To_{i}" for i in range(k_regimes)],
index=[f"From_{i}" for i in range(k_regimes)]
)
print(df_trans.round(4).to_string())
if hasattr(result, 'switching_variance') and result.switching_variance:
print("\nRegime-Specific Variances:")
for i in range(k_regimes):
var_param = f'regime[{i}].sigma2'
if var_param in result.params.index:
var_val = result.params[var_param]
print(f"Regime {i}: σ² = {var_val:.4f} (σ = {np.sqrt(var_val):.4f})")
def plot_regime_probabilities(result, target_series: pd.Series,
model_name: str, figsize=(15, 10)):
k_regimes = result.k_regimes
probs = result.smoothed_marginal_probabilities
fig, axes = plt.subplots(k_regimes + 1, 1, figsize=figsize, sharex=True)
axes[0].plot(target_series.index, target_series.values,
color='black', linewidth=0.5, alpha=0.7)
axes[0].set_ylabel('Target (%)', fontsize=10, fontweight='bold')
axes[0].set_title(f'{model_name}: Target Variable and Regime Probabilities',
fontsize=12, fontweight='bold')
axes[0].grid(True, alpha=0.3)
axes[0].axhline(y=0, color='red', linestyle='--', linewidth=0.5, alpha=0.5)
colors = plt.cm.Set3(np.linspace(0, 1, k_regimes))
for i in range(k_regimes):
axes[i+1].fill_between(probs.index, 0, probs.iloc[:, i],
alpha=0.6, color=colors[i], label=f'Regime {i}')
axes[i+1].set_ylabel(f'P(Regime {i})', fontsize=9, fontweight='bold')
axes[i+1].set_ylim([0, 1])
axes[i+1].grid(True, alpha=0.3)
axes[i+1].legend(loc='upper right', fontsize=8)
axes[-1].set_xlabel('Date', fontsize=10, fontweight='bold')
plt.tight_layout()
return fig
def plot_regime_overlay(result, target_series: pd.Series,
model_name: str, figsize=(15, 6)):
probs = result.smoothed_marginal_probabilities
regime_classification = probs.idxmax(axis=1)
fig, ax = plt.subplots(figsize=figsize)
colors = plt.cm.Set3(np.linspace(0, 1, result.k_regimes))
for regime_idx in range(result.k_regimes):
mask = regime_classification == regime_idx
ax.scatter(target_series.index[mask], target_series.values[mask],
c=[colors[regime_idx]], s=10, alpha=0.6,
label=f'Regime {regime_idx}')
ax.axhline(y=0, color='red', linestyle='--', linewidth=1, alpha=0.5)
ax.set_xlabel('Date', fontsize=11, fontweight='bold')
ax.set_ylabel('20-Day Forward Max Return (%)', fontsize=11, fontweight='bold')
ax.set_title(f'{model_name}: Regime Classification Overlay',
fontsize=13, fontweight='bold')
ax.legend(loc='best', fontsize=9)
ax.grid(True, alpha=0.3)
plt.tight_layout()
return fig
def analyze_regime_periods(result, df: pd.DataFrame, target_col: str = 'target'):
probs = result.smoothed_marginal_probabilities
regime_classification = probs.idxmax(axis=1)
print(f"\n{'='*70}")
print("REGIME PERIOD STATISTICS")
print(f"{'='*70}")
for regime_idx in range(result.k_regimes):
mask = regime_classification == regime_idx
regime_data = df.loc[mask, target_col]
print(f"\nRegime {regime_idx}:")
print(f"Frequency: {mask.sum()} days ({100*mask.sum()/len(df):.1f}%)")
print(f"Mean return: {regime_data.mean():.3f}%")
print(f"Std dev: {regime_data.std():.3f}%")
print(f"Min: {regime_data.min():.3f}%")
print(f"Max: {regime_data.max():.3f}%")
regime_series = (regime_classification == regime_idx).astype(int)
regime_groups = (regime_series != regime_series.shift()).cumsum()
longest_period = regime_series.groupby(regime_groups).sum().max()
print(f"Longest period: {longest_period} consecutive days")
In [2]:
print("\n" + "="*70)
print("MARKOV SWITCHING DYNAMIC REGRESSION ANALYSIS")
print("="*70)
df = download_stock_data(CONFIG['ticker'], CONFIG['start_date'])
df = calculate_forward_max_return(
df,
price_col='Close',
window=CONFIG['lookforward_window']
)
df = create_lagged_features(df, lags=CONFIG['lag_features'])
train_df, test_df = split_train_test(df, CONFIG['test_size'])
models = fit_markov_models(train_df)
comparison_df = compare_models(models)
best_model_name = comparison_df.iloc[0]['Model']
best_model = models[best_model_name]
print(f"\n{'='*70}")
print(f"BEST MODEL: {best_model_name}")
print(f"{'='*70}")
print(best_model['result'].summary())
analyze_regime_characteristics(best_model['result'], best_model['description'])
analyze_regime_periods(best_model['result'], train_df)
print("\nGenerating visualizations...")
fig1 = plot_regime_probabilities(
best_model['result'],
train_df['target'],
best_model['description']
)
plt.savefig('markov_regime_probabilities.png', dpi=300, bbox_inches='tight')
print("Saved: markov_regime_probabilities.png")
fig2 = plot_regime_overlay(
best_model['result'],
train_df['target'],
best_model['description']
)
plt.savefig('markov_regime_overlay.png', dpi=300, bbox_inches='tight')
print("Saved: markov_regime_overlay.png")
plt.show()
======================================================================
MARKOV SWITCHING DYNAMIC REGRESSION ANALYSIS
======================================================================
Downloading data for ^GSPC...
Downloaded 6546 records from 2000-01-03 to 2026-01-12
Calculated forward 20-day max return
Target stats: mean=2.92%, std=2.73%
Range: [-5.27%, 28.48%]
Created lagged features: ['lag1', 'lag20']
Final dataset size: 6506 observations
Data Split:
Training: 6406 samples (2000-02-01 to 2025-07-22)
Testing: 100 samples (2025-07-23 to 2025-12-11)
======================================================================
FITTING MARKOV SWITCHING MODELS
======================================================================
[1/5] Fitting 2-regime model (switching intercept)...
AIC: 27673.02, BIC: 27706.85
[2/5] Fitting 5-regime model (switching intercept)...
AIC: 22769.37, BIC: 22945.26
[3/5] Fitting 3-regime model with lag features...
AIC: 20512.60, BIC: 20620.84
[4/5] Fitting 3-regime model with switching variance...
AIC: 23573.55, BIC: 23675.02
[5/5] Fitting comprehensive 3-regime model...
AIC: 18311.89, BIC: 18433.66
======================================================================
MODEL COMPARISON
======================================================================
Model Description K_Regimes N_Params Log_Likelihood AIC BIC HQIC
3_regime_full 3 regimes, switching variance, lag1 & lag20 3 18 -9137.947100 18311.894201 18433.664027 18354.041756
3_regime_exog 3 regimes, switching intercept & exog (lag1, lag20) 3 16 -10240.298946 20512.597891 20620.837737 20550.062385
5_regime 5 regimes, switching intercept 5 26 -11358.683365 22769.366730 22945.256478 22830.246531
3_regime_var 3 regimes, switching variance, lag20 3 15 -11771.772696 23573.545393 23675.020248 23608.668355
2_regime 2 regimes, switching intercept 2 5 -13831.511662 27673.023324 27706.848275 27684.730978
Lower AIC/BIC indicates better model fit
======================================================================
BEST MODEL: 3_regime_full
======================================================================
Markov Switching Model Results
==============================================================================
Dep. Variable: target No. Observations: 6406
Model: MarkovRegression Log Likelihood -9137.947
Date: Tue, 13 Jan 2026 AIC 18311.894
Time: 08:01:47 BIC 18433.664
Sample: 0 HQIC 18354.042
- 6406
Covariance Type: approx
Regime 0 parameters
==============================================================================
coef std err z P>|z| [0.025 0.975]
------------------------------------------------------------------------------
const 0.1733 0.027 6.438 0.000 0.121 0.226
x1 0.8986 0.008 111.822 0.000 0.883 0.914
x2 -0.0019 0.006 -0.290 0.772 -0.014 0.011
sigma2 0.3700 0.017 21.965 0.000 0.337 0.403
Regime 1 parameters
==============================================================================
coef std err z P>|z| [0.025 0.975]
------------------------------------------------------------------------------
const 1.6765 0.462 3.629 0.000 0.771 2.582
x1 0.7863 0.041 19.089 0.000 0.706 0.867
x2 -0.0150 0.039 -0.387 0.699 -0.091 0.061
sigma2 12.9299 1.883 6.868 0.000 9.240 16.620
Regime 2 parameters
==============================================================================
coef std err z P>|z| [0.025 0.975]
------------------------------------------------------------------------------
const 0.5306 0.053 9.986 0.000 0.426 0.635
x1 0.8607 0.011 78.397 0.000 0.839 0.882
x2 -0.0106 0.010 -1.098 0.272 -0.030 0.008
sigma2 1.8866 0.165 11.414 0.000 1.563 2.211
Regime transition parameters
==============================================================================
coef std err z P>|z| [0.025 0.975]
------------------------------------------------------------------------------
p[0->0] 0.9753 0.004 252.383 0.000 0.968 0.983
p[1->0] 0.0003 0.034 0.009 0.992 -0.067 0.068
p[2->0] 0.0315 0.005 6.662 0.000 0.022 0.041
p[0->1] 1.933e-06 0.003 0.001 0.999 -0.005 0.005
p[1->1] 0.9379 0.027 35.374 0.000 0.886 0.990
p[2->1] 0.0083 0.003 3.074 0.002 0.003 0.014
==============================================================================
Warnings:
[1] Covariance matrix calculated using numerical (complex-step) differentiation.
======================================================================
REGIME ANALYSIS: 3 regimes, switching variance, lag1 & lag20
======================================================================
Expected Regime Durations:
Regime 0: 40.55 days (0.16 years)
Regime 1: 16.11 days (0.06 years)
Regime 2: 25.15 days (0.10 years)
Regime Mean Returns:
Transition Probability Matrix:
To_0 To_1 To_2
From_0 0.9753 0.0003 0.0315
From_1 0.0000 0.9379 0.0083
From_2 0.0247 0.0617 0.9602
======================================================================
REGIME PERIOD STATISTICS
======================================================================
Regime 0:
Frequency: 3428 days (53.5%)
Mean return: 2.248%
Std dev: 1.572%
Min: -1.706%
Max: 9.420%
Longest period: 343 consecutive days
Regime 1:
Frequency: 320 days (5.0%)
Mean return: 6.948%
Std dev: 5.925%
Min: -5.268%
Max: 28.478%
Longest period: 74 consecutive days
Regime 2:
Frequency: 2658 days (41.5%)
Mean return: 3.322%
Std dev: 2.837%
Min: -3.513%
Max: 14.941%
Longest period: 131 consecutive days
Generating visualizations...
Saved: markov_regime_probabilities.png
Saved: markov_regime_overlay.png