OptETF Deploy
Translate remaining radar axis annotations + frontier subtitle to English (main-system recommendation charts)
b3fb7c4 | import requests | |
| from bs4 import BeautifulSoup | |
| import pandas as pd | |
| import os | |
| import numpy as np | |
| import time | |
| import random | |
| import json | |
| from yahooquery import Ticker | |
| import parameters | |
| from datetime import datetime, timedelta | |
| from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer, logging as hf_logging | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from scipy.optimize import linprog | |
| from scipy.optimize import minimize | |
| import logging | |
| import sys | |
| from pathlib import Path | |
| from sentiment_engine.store import get_sentiment_frame_asof, load_daily_sentiment | |
| # 使用 hf_logging 設定 transformers 的日誌層級 | |
| hf_logging.set_verbosity_error() | |
| # 2. 隱藏載入模型時的進度條 (tqdm) | |
| hf_logging.disable_progress_bar() | |
| # --- 配置 Standard Logging --- | |
| VERBOSE = parameters.VERBOSE # 從參數設定讀取 Verbose 模式開關 | |
| # 1. 初始化 logger | |
| log = logging.getLogger("AI_RoboAdvisor") | |
| # 2. 移除可能預設存在的 handlers 防止重複列印 | |
| log.handlers = [] | |
| # 3. 根據 VERBOSE 參數動態決定日誌等級 | |
| # logging.INFO: 顯示所有 print(普通資訊) | |
| # logging.WARNING: 只顯示警告跟錯誤 | |
| log_level = logging.INFO if VERBOSE else logging.WARNING | |
| log.setLevel(log_level) | |
| # 4. 定義輸出到終端機的 Handler (這就是你的噤聲功能開關) | |
| stream_handler = logging.StreamHandler(sys.stdout) | |
| stream_handler.setLevel(log_level) # Handler 也同步等級 | |
| # 5. 定義日誌顯示格式 (可以讓偵錯時資訊更豐富,正式展示時噤聲) | |
| # 展示模式格式 (乾淨): | |
| formatter_clean = logging.Formatter('%(message)s') | |
| # 偵錯模式格式 (帶有時間與函式名稱): | |
| formatter_debug = logging.Formatter('[%(asctime)s] [%(levelname)s] (%(funcName)s) %(message)s', '%H:%M:%S') | |
| # 根據 Verbose 模式選擇格式 | |
| if VERBOSE: | |
| stream_handler.setFormatter(formatter_debug) | |
| else: | |
| stream_handler.setFormatter(formatter_clean) | |
| # 6. 將 Handler 加入 logger | |
| log.addHandler(stream_handler) | |
| # 6b. 集中式檔案日誌:每次執行在 logs/ 寫一個時間戳 log 檔(比終端更完整,固定記 INFO 以上)。 | |
| # best-effort —— 即使建立失敗也不影響主流程;終端的噤聲行為(VERBOSE)不變。 | |
| try: | |
| from datetime import datetime as _dt_log | |
| _logs_dir = getattr(parameters, "LOGS_DIR", "logs") | |
| os.makedirs(_logs_dir, exist_ok=True) | |
| _log_file = os.path.join(_logs_dir, f"run_{_dt_log.now().strftime('%Y%m%d_%H%M%S')}.log") | |
| _file_handler = logging.FileHandler(_log_file, encoding="utf-8") | |
| _file_handler.setLevel(logging.INFO) | |
| _file_handler.setFormatter(formatter_debug) | |
| log.addHandler(_file_handler) | |
| # 放寬 logger 等級到 INFO,讓 INFO 能進到檔案;stream_handler 仍維持 log_level → 終端噤聲不變。 | |
| if log.level > logging.INFO: | |
| log.setLevel(logging.INFO) | |
| except Exception as _e_log: | |
| log.warning(f"⚠️ 無法建立集中式 log 檔(已略過,不影響執行):{_e_log}") | |
| log.warning("✅ 金融理財引擎日誌系統配置完成。") | |
| # ========================================== | |
| # data_loader 智慧快取引擎 | |
| # ========================================== | |
| DB_PATH = "csv/historical_close_price_db.csv" | |
| # 全域網路抓取開關:False = 快取-only 模式(完全不對外抓資料,含缺資料補抓與近期補齊)。 | |
| # 由呼叫端(如網頁版 /api/run 的「擷取/更新市場資料」勾選)設定;預設 True 維持原行為。 | |
| ALLOW_NETWORK_FETCH = True | |
| def _history_to_close_matrix(history_df): | |
| """將 yahooquery 回傳的 history 資料整理成 Date x Ticker 的 raw close 價格矩陣。""" | |
| if isinstance(history_df, dict) or history_df is None or history_df.empty: | |
| return pd.DataFrame() | |
| if "close" not in history_df.columns: | |
| return pd.DataFrame() | |
| price_df = history_df["close"].unstack(level=0) | |
| # yahooquery 近期可能回傳 tz-aware 日期;與 tz-naive 混用會觸發 | |
| # "Cannot mix tz-aware with tz-naive values"。統一轉 UTC→去時區→正規化到日界, | |
| # 確保索引一律為 tz-naive 且能與日期索引的快取資料庫對齊。 | |
| idx = pd.to_datetime(price_df.index, utc=True, errors="coerce") | |
| try: | |
| idx = idx.tz_localize(None) | |
| except (TypeError, AttributeError): | |
| pass | |
| price_df.index = idx.normalize() | |
| # normalize 後可能出現同一天重複列(yahooquery 偶爾回傳當日盤中即時列+收盤列)。 | |
| # 同日保留最後一筆,確保索引唯一,避免後續 reindex/union 報 | |
| # "Reindexing only valid with uniquely valued Index objects"。 | |
| price_df = price_df[~price_df.index.duplicated(keep="last")] | |
| price_df = price_df.sort_index() | |
| price_df.columns = [str(col).strip() for col in price_df.columns] | |
| return price_df | |
| def _fetch_close_matrix_yq(tickers, period="3y", max_retries=3): | |
| """集中抓取 raw close,讓新增 ETF 與近期補資料共用同一套防錯邏輯。""" | |
| clean_tickers = sorted({str(ticker).strip() for ticker in tickers if pd.notna(ticker) and str(ticker).strip()}) | |
| if not clean_tickers: | |
| return pd.DataFrame() | |
| for attempt in range(max_retries): | |
| try: | |
| yq_tickers = Ticker(clean_tickers, asynchronous=True) | |
| temp_df = yq_tickers.history(period=period, interval="1d") | |
| price_df = _history_to_close_matrix(temp_df) | |
| if not price_df.empty: | |
| return price_df | |
| log.warning(f"價格資料抓取為空,重試中 ({attempt + 1}/{max_retries})...") | |
| time.sleep(3) | |
| except Exception as e: | |
| log.warning(f"價格資料抓取失敗 ({e}),重試中 ({attempt + 1}/{max_retries})...") | |
| time.sleep(5) | |
| return pd.DataFrame() | |
| def _refresh_recent_historical_prices(db_df, tickers, max_retries=3): | |
| """每次主程式使用價格矩陣時,都用最近 10 天資料補齊已存在 ETF 的最新可得交易日。""" | |
| if db_df.empty: | |
| return db_df | |
| # 更新快取時要刷新整個已快取 universe,而不是只刷新本次呼叫的 subset。 | |
| # 否則小型測試呼叫會新增只有少數 ETF 有值的日期列,造成資料庫看起來大量缺漏。 | |
| refresh_tickers = sorted(set(db_df.columns).union(ticker for ticker in tickers if ticker in db_df.columns)) | |
| if not refresh_tickers: | |
| return db_df | |
| recent_df = _fetch_close_matrix_yq(refresh_tickers, period="10d", max_retries=max_retries) | |
| if recent_df.empty: | |
| log.warning("近期價格更新失敗,沿用既有 historical_close_price_db.csv。") | |
| return db_df | |
| min_row_coverage = max(1, int(len(refresh_tickers) * 0.80)) | |
| recent_df = recent_df.dropna(thresh=min_row_coverage) | |
| if recent_df.empty: | |
| log.warning("近期價格更新覆蓋率過低,避免寫入大量缺漏列。") | |
| return db_df | |
| db_df = db_df.sort_index() | |
| all_dates = db_df.index.union(recent_df.index) | |
| all_cols = db_df.columns.union(recent_df.columns) | |
| db_df = db_df.reindex(index=all_dates, columns=all_cols) | |
| # 只覆蓋最近 10 天實際抓到的格子,保留舊有三年歷史資料。 | |
| db_df.loc[recent_df.index, recent_df.columns] = recent_df | |
| db_df = db_df.sort_index() | |
| db_df.to_csv(DB_PATH, index_label="date") | |
| log.info(f"✅ historical_close_price_db.csv 已補到最新可得日期:{db_df.index.max().date()}") | |
| return db_df | |
| def get_or_fetch_historical_prices(tickers, max_retries=3): | |
| """ | |
| 智慧快取引擎: | |
| 1. 讀取本地歷史資料庫。 | |
| 2. 比對請求的 tickers,找出未建立快取的標的。 | |
| 3. 只針對缺少的標的發送 API 請求。 | |
| 4. 將新資料合併至本地資料庫並存檔。 | |
| 5. 回傳此次請求所需的價格矩陣。 | |
| """ | |
| log.info(f"\n啟動本地快取引擎 (請求總數: {len(tickers)} 檔)...") | |
| # 確保資料夾存在 | |
| os.makedirs("csv", exist_ok=True) | |
| # 1. 讀取本地資料庫 | |
| if os.path.exists(DB_PATH): | |
| db_df = pd.read_csv(DB_PATH, index_col='date', parse_dates=True) | |
| # 防護:快取若含重複日期列,會使後續 join/reindex 報錯,先去重(同日保留最後一筆)。 | |
| if db_df.index.has_duplicates: | |
| db_df = db_df[~db_df.index.duplicated(keep="last")] | |
| existing_tickers = db_df.columns.tolist() | |
| else: | |
| db_df = pd.DataFrame() | |
| existing_tickers = [] | |
| # 2. 找出缺少的 ETF (Set Difference) | |
| tickers = [str(ticker).strip() for ticker in tickers if pd.notna(ticker) and str(ticker).strip()] | |
| missing_tickers = sorted(set(tickers) - set(existing_tickers)) | |
| # 3. 抓取缺少的資料 | |
| if missing_tickers and not ALLOW_NETWORK_FETCH: | |
| log.warning(f"⚙️ 快取-only 模式(未勾選更新資料):跳過 {len(missing_tickers)} 檔缺資料 ETF 的抓取,僅使用既有快取。") | |
| if missing_tickers and ALLOW_NETWORK_FETCH: | |
| log.info(f"🔍 發現 {len(missing_tickers)} 檔 ETF 尚未建立快取,準備從 API 抓取...") | |
| new_data_df = pd.DataFrame() | |
| for attempt in range(max_retries): | |
| try: | |
| yq_tickers = Ticker(missing_tickers, asynchronous=True) | |
| temp_df = yq_tickers.history(period="3y", interval="1d") | |
| if isinstance(temp_df, dict) or temp_df.empty: | |
| log.warning(f"⚠️ 獲取資料為空,等待 3 秒後重試 (第 {attempt+1}/{max_retries} 次)...") | |
| time.sleep(3) | |
| continue | |
| # 轉換為 Date x Ticker 矩陣;使用 close 才能讓報酬代表資本利得。 | |
| # 共用 _history_to_close_matrix 的 tz-naive 正規化,避免 tz-aware/naive 混用錯誤。 | |
| new_data_df = _history_to_close_matrix(temp_df) | |
| log.info("✅ 缺失資料抓取成功!") | |
| break # 成功則跳出重試迴圈 | |
| except Exception as e: | |
| log.warning(f"⚠️ 發生錯誤 ({e}),等待 5 秒後重試...") | |
| time.sleep(5) | |
| # 4. 合併並存回資料庫 | |
| if not new_data_df.empty: | |
| if db_df.empty: | |
| db_df = new_data_df | |
| else: | |
| # 使用 outer join 合併新舊資料,以日期為基準對齊 | |
| db_df = db_df.join(new_data_df, how='outer') | |
| # 存檔更新資料庫 | |
| db_df.to_csv(DB_PATH, index_label="date") | |
| log.info(f"💾 已將新資料寫入本地快取資料庫: {DB_PATH}") | |
| else: | |
| log.warning("❌ 無法獲取缺失的 ETF 資料,將僅使用本地現有資料。") | |
| elif not missing_tickers: | |
| log.info("⚡ 所有請求的 ETF 皆已在本地快取中,直接載入!") | |
| # 5. 萃取並回傳本次請求所需的資料矩陣 | |
| # 確保只取有成功存在於資料庫的 tickers | |
| # 對已存在的 ETF 也補最近交易日;避免快取已有 ticker 時永遠不更新價格。 | |
| # 快取-only 模式(未勾選更新資料)下完全跳過此網路補齊,直接沿用既有快取。 | |
| if ALLOW_NETWORK_FETCH: | |
| db_df = _refresh_recent_historical_prices(db_df, tickers, max_retries=max_retries) | |
| available_tickers = [t for t in tickers if t in db_df.columns] | |
| if not available_tickers: | |
| log.error("❌ 嚴重錯誤:完全沒有可用的價格資料。") | |
| sys.exit(1) | |
| return pd.DataFrame() | |
| # 切片出需要的欄位,並去除全部為 NaN 的無效日期 | |
| final_price_matrix = db_df[available_tickers].dropna(how='all') | |
| return final_price_matrix | |
| def prepare_aligned_returns(price_matrix, tickers, min_history=126): | |
| """ | |
| Build a leakage-free return panel by using only overlapping history. | |
| This avoids backward-filling future data into earlier missing periods. | |
| """ | |
| available_tickers = [ticker for ticker in tickers if ticker in price_matrix.columns] | |
| if not available_tickers: | |
| return pd.DataFrame() | |
| aligned_prices = price_matrix[available_tickers].sort_index().ffill() | |
| returns_matrix = aligned_prices.pct_change(fill_method=None).dropna(axis=1, how='all') | |
| if returns_matrix.empty: | |
| return pd.DataFrame() | |
| first_valid_dates = [returns_matrix[col].first_valid_index() for col in returns_matrix.columns] | |
| first_valid_dates = [dt for dt in first_valid_dates if dt is not None] | |
| if not first_valid_dates: | |
| return pd.DataFrame() | |
| common_start = max(first_valid_dates) | |
| returns_matrix = returns_matrix.loc[common_start:].dropna(how='any') | |
| if min_history and len(returns_matrix) < min_history: | |
| log.warning(f"?? 重疊報酬樣本偏短,目前只有 {len(returns_matrix)} 筆日資料。") | |
| return returns_matrix | |
| def calc_geometric_annual_returns(returns_matrix, periods_per_year=252): | |
| """ | |
| 從每日報酬矩陣計算各 ETF 的幾何年化報酬率。 | |
| 這裡不用 mean() * 252,是為了讓 MPT 圖表的報酬率口徑與候選名單的 CAGR 保持一致。 | |
| """ | |
| if returns_matrix is None or returns_matrix.empty: | |
| return pd.Series(dtype=float) | |
| years = len(returns_matrix) / periods_per_year | |
| if years <= 0: | |
| return pd.Series(0.0, index=returns_matrix.columns) | |
| compounded_returns = (1 + returns_matrix).prod() | |
| return compounded_returns.pow(1 / years) - 1 | |
| def compute_cov_annual(returns_matrix, periods_per_year=252): | |
| """年化共變異數矩陣。 | |
| parameters.USE_LEDOIT_WOLF_COV=True 時用 Ledoit-Wolf 收縮估計,降低 mean-variance | |
| 最佳化對樣本共變異數雜訊的放大;資料不足或失敗時退回樣本共變異數。 | |
| """ | |
| if returns_matrix is None or returns_matrix.empty: | |
| return None | |
| clean = returns_matrix.dropna(how="any") | |
| use_lw = getattr(parameters, "USE_LEDOIT_WOLF_COV", False) | |
| if use_lw and clean.shape[0] > clean.shape[1] and clean.shape[1] >= 2: | |
| try: | |
| from sklearn.covariance import LedoitWolf | |
| return LedoitWolf().fit(clean.values).covariance_ * periods_per_year | |
| except Exception as exc: | |
| log.warning(f"Ledoit-Wolf 收縮失敗,改用樣本共變異數:{exc}") | |
| base = clean if not clean.empty else returns_matrix | |
| return base.cov().values * periods_per_year | |
| def shrink_mean_returns(mu_vector, intensity=None): | |
| """把各資產的樣本平均報酬向「橫斷面總平均」收縮,降低均值估計雜訊。 | |
| μ_shrunk = (1-δ)·μ_sample + δ·μ_grand | |
| δ=0 等於不收縮(純樣本平均);δ=1 等於全部用總平均(所有資產同一個期望報酬)。 | |
| parameters.USE_MEAN_SHRINKAGE=False 時直接回傳原始樣本平均。 | |
| """ | |
| mu = np.asarray(mu_vector, dtype=float) | |
| if mu.size == 0 or not getattr(parameters, "USE_MEAN_SHRINKAGE", False): | |
| return mu | |
| delta = intensity if intensity is not None else getattr(parameters, "MEAN_SHRINKAGE_INTENSITY", 0.5) | |
| delta = float(np.clip(delta, 0.0, 1.0)) | |
| grand = float(np.nanmean(mu)) | |
| return (1.0 - delta) * mu + delta * grand | |
| def derive_params_from_weights(global_weights): | |
| """偏好→參數映射 g(w):把 9 維全局權重轉成最佳化器參數(τ / vol_budget / 核心類型)。 | |
| 用權重向量的連續函數(非 profile 名稱查表),讓 AHP 任意權重與手設 profile 走同一條 | |
| 路、自動校準(見 03 §1.05、05 §4.57)。 | |
| 彙總指標: | |
| R = w[Return_CAGR] + w[Return_Div] 報酬總渴望(含股息,當 τ 強度乘數) | |
| T_growth = CAGR / (CAGR + w[Risk_Vol] + w[Risk_MaxDD] + 1e-9) 資本利得(成長)渴望 vs 風險 | |
| 說明:核心類型 = 「要不要買成長/市場風險」,故只由「資本利得渴望」決定,不含股息 | |
| (否則收入型會被股息撐高而誤分到 beta 核心)。股息偏好改由 τ 傾斜處理(見 03 §1.05 決議)。 | |
| 映射(連續、單調): | |
| core_mode = minvar (T_growth<低門檻) / market (中間) / beta (T_growth≥高門檻) | |
| vol_budget = clip(VOL_BUDGET_BASE + VOL_BUDGET_SLOPE·T_growth, MIN, MAX) | |
| tau = TAU_MAP_COEF·(1−T_growth)·R_hat (R_hat = R 截到 [0,1]) | |
| → 高成長渴望(beta 核心)自動低 τ(beta 取代傾斜); | |
| 收入型(低 T_growth、高 R)自動拿到較高 τ 做股息傾斜。 | |
| 註:τ 量級刻意保守平滑、只編碼方向,精確量級交給 walk-forward 驗(不硬擬合單一路徑 sweep 甜蜜點)。 | |
| 註:backtest_engine 直接 import 本函式,確保兩系統數值完全一致。 | |
| """ | |
| gw = global_weights or {} | |
| def _w(k): | |
| try: | |
| return float(gw.get(k, 0.0) or 0.0) | |
| except (TypeError, ValueError): | |
| return 0.0 | |
| cagr = _w("Return_CAGR") | |
| R = cagr + _w("Return_Div") | |
| denom = cagr + _w("Risk_Vol") + _w("Risk_MaxDD") + 1e-9 | |
| T_growth = cagr / denom | |
| t_low = float(getattr(parameters, "CORE_MODE_T_LOW", 0.40)) | |
| t_high = float(getattr(parameters, "CORE_MODE_T_HIGH", 0.65)) | |
| if T_growth < t_low: | |
| core_mode = "minvar" | |
| elif T_growth < t_high: | |
| core_mode = "market" | |
| else: | |
| core_mode = "beta" | |
| vb = (float(getattr(parameters, "VOL_BUDGET_BASE", 0.10)) | |
| + float(getattr(parameters, "VOL_BUDGET_SLOPE", 0.40)) * T_growth) | |
| vol_budget = float(np.clip(vb, float(getattr(parameters, "VOL_BUDGET_MIN", 0.10)), | |
| float(getattr(parameters, "VOL_BUDGET_MAX", 0.45)))) | |
| # risk_fraction(無量綱):C2 用它在「候選池可行波動範圍 [v_min,v_max]」內定位風險預算。 | |
| override = getattr(parameters, "RISK_FRACTION_OVERRIDE", None) | |
| if override is not None: | |
| risk_fraction = float(np.clip(float(override), 0.0, 1.0)) | |
| else: | |
| risk_fraction = float(np.clip(T_growth, float(getattr(parameters, "RISK_FRACTION_MIN", 0.05)), | |
| float(getattr(parameters, "RISK_FRACTION_MAX", 0.95)))) | |
| r_hat = float(np.clip(R, 0.0, 1.0)) | |
| tau = float(getattr(parameters, "TAU_MAP_COEF", 0.30)) * (1.0 - T_growth) * r_hat | |
| return {"R": R, "T_growth": T_growth, "core_mode": core_mode, | |
| "vol_budget": vol_budget, "risk_fraction": risk_fraction, "tau": tau} | |
| def compute_feasible_vol_budget(cov_matrix, max_weight, risk_fraction): | |
| """把無量綱 risk_fraction∈[0,1] 轉成「相對候選池可行波動範圍」的絕對波動預算。 | |
| v_min = 最小變異組合波動(min wᵀΣw s.t. Σw=1, 0≤w≤cap) | |
| v_max = 最大變異組合波動(max wᵀΣw 同約束) | |
| vol_budget = v_min + risk_fraction·(v_max − v_min),並夾在 ≥ v_min(恆可行)。 | |
| 解決兩個問題:(1) 絕對 vol_budget 可能低於 v_min → 無解;(2) 絕對值可能脫離當期池子(過高/過低)。 | |
| cov_matrix 須為年化共變異數,回傳 (vol_budget, v_min, v_max) 皆為年化波動;失敗回 (None,None,None)。 | |
| backtest_engine 直接 import 本函式,確保兩系統數值完全一致。 | |
| """ | |
| cov = np.asarray(cov_matrix, dtype=float) | |
| if cov.ndim != 2 or cov.shape[0] == 0: | |
| return None, None, None | |
| n = cov.shape[0] | |
| cap = float(max_weight) | |
| w0 = np.array([1.0 / n] * n) | |
| bnds = tuple((0.0, cap) for _ in range(n)) | |
| cons = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0},) | |
| def pvar(w): | |
| return float(np.dot(w.T, np.dot(cov, w))) | |
| # v_min:最小變異是「凸最小化」,SLSQP 可靠。 | |
| try: | |
| r_min = minimize(pvar, w0, method='SLSQP', bounds=bnds, constraints=cons, options={'maxiter': 500, 'ftol': 1e-10}) | |
| v_min = np.sqrt(max(r_min.fun, 0.0)) if r_min.success else np.sqrt(max(pvar(w0), 0.0)) | |
| except Exception: | |
| v_min = np.sqrt(max(pvar(w0), 0.0)) | |
| # v_max:最大化 wᵀΣw 是「凸最大化」,最優在頂點,單點 SLSQP 易卡局部 → 低估。 | |
| # 用確定性多起點(等權 + 依個別變異數貪婪填滿到 cap 的多個頂點)+ SLSQP 精修,取最大。 | |
| def greedy_fill(order): | |
| w = np.zeros(n) | |
| remaining = 1.0 | |
| for idx in order: | |
| wi = min(cap, remaining) | |
| w[idx] = wi | |
| remaining -= wi | |
| if remaining <= 1e-12: | |
| break | |
| if remaining > 1e-9: # cap·n < 1 理論上不會發生(已保證可投資檔數足夠) | |
| w += remaining / n | |
| return w | |
| var_order = list(np.argsort(-np.diag(cov))) # 個別變異數由大到小 | |
| starts = [w0, greedy_fill(var_order)] | |
| # 額外頂點:把最高變異的前 k 檔各填到 cap(k 由 cap 決定),探索不同集中組合 | |
| k_needed = int(np.ceil(1.0 / cap)) | |
| for shift in range(1, min(3, max(1, n - k_needed)) + 1): | |
| starts.append(greedy_fill(var_order[shift:] + var_order[:shift])) | |
| best_var = pvar(w0) | |
| for s in starts: | |
| best_var = max(best_var, pvar(s)) # 頂點本身就是候選(不一定要 SLSQP 收斂) | |
| try: | |
| r = minimize(lambda w: -pvar(w), s, method='SLSQP', bounds=bnds, constraints=cons, | |
| options={'maxiter': 500, 'ftol': 1e-10}) | |
| if r.success: | |
| best_var = max(best_var, pvar(r.x)) | |
| except Exception: | |
| pass | |
| v_max = np.sqrt(max(best_var, 0.0)) | |
| if v_max < v_min: # 數值保險 | |
| v_max = v_min | |
| frac = float(np.clip(risk_fraction, 0.0, 1.0)) | |
| budget = v_min + frac * max(v_max - v_min, 0.0) | |
| budget = max(budget, v_min * (1.0 + 1e-3)) | |
| return float(budget), float(v_min), float(v_max) | |
| # ========================================== | |
| # 品質約束系統化框架(設計見 07):門檻=可行範圍內的偏好加權位置,恆可行、偏好驅動、池子相對。 | |
| # backtest_engine 直接 import 這些 helper,確保兩系統數值完全一致。 | |
| # ========================================== | |
| def quality_tightness_map(global_weights, full_ratio=None): | |
| """由 9 維權重算各品質約束的 tightness∈[0,1]。 | |
| 中性點 = 該使用者品質 5-slot(Cost,Div_Score,Liq_Volume,Liq_AUM,FinBERT)平均權重; | |
| fully tight at FULL_RATIO×平均。流動性為 2-slot,以 per-slot 公平比較(除以 2)。""" | |
| gw = global_weights or {} | |
| def _w(k): | |
| try: | |
| return float(gw.get(k, 0.0) or 0.0) | |
| except (TypeError, ValueError): | |
| return 0.0 | |
| w_cost, w_div = _w("Cost_ExpRatio"), _w("Div_Score") | |
| w_vol, w_aum, w_sent = _w("Liq_Volume"), _w("Liq_AUM"), _w("FinBERT_score") | |
| neutral = float(np.mean([w_cost, w_div, w_vol, w_aum, w_sent])) | |
| fr = float(full_ratio if full_ratio is not None else getattr(parameters, "QUALITY_TIGHTNESS_FULL_RATIO", 2.0)) | |
| def ts(ratio): | |
| if neutral <= 0 or fr <= 1.0: | |
| return 0.0 | |
| return float(np.clip((ratio - 1.0) / (fr - 1.0), 0.0, 1.0)) | |
| inv = (1.0 / neutral) if neutral > 0 else 0.0 | |
| return { | |
| "cost": ts(w_cost * inv), | |
| "hhi": ts(w_div * inv), | |
| "liq": ts((w_vol + w_aum) / (2.0 * neutral)) if neutral > 0 else 0.0, | |
| "sent": ts(w_sent * inv), | |
| "w_vol": w_vol, "w_aum": w_aum, "neutral": neutral, | |
| } | |
| def feasible_linear_range(x_vec, max_weight): | |
| """線性投組值 f(w)=w·x 在 Σw=1,0≤wᵢ≤cap 下的 (min,max),貪婪封閉解(最優在頂點)。""" | |
| x = np.asarray(x_vec, dtype=float) | |
| n = x.size | |
| if n == 0: | |
| return None, None | |
| cap = float(max_weight) | |
| def fill(order): | |
| w = np.zeros(n) | |
| rem = 1.0 | |
| for i in order: | |
| wi = min(cap, rem) | |
| w[i] = wi | |
| rem -= wi | |
| if rem <= 1e-12: | |
| break | |
| if rem > 1e-9: | |
| w += rem / n | |
| return float(np.dot(w, x)) | |
| return fill(list(np.argsort(x))), fill(list(np.argsort(-x))) | |
| def quality_threshold(v_min, v_max, tightness, lower_is_better): | |
| """把 tightness∈[0,1] 轉成可行範圍內的門檻。lower_is_better=True → ≤門檻;False → ≥門檻。""" | |
| t = float(np.clip(tightness, 0.0, 1.0)) | |
| lo, hi = float(v_min), float(v_max) | |
| if hi < lo: | |
| lo, hi = hi, lo | |
| return (lo + (1.0 - t) * (hi - lo)) if lower_is_better else (hi - (1.0 - t) * (hi - lo)) | |
| def feasible_hhi_range(sector_matrix, max_weight): | |
| """投組產業 HHI=Σ_k(w·S_k)²=wᵀ(SSᵀ)w 的 (min,max)。min 解凸 QP;max 凸最大化用貪婪+多起點。""" | |
| S = np.asarray(sector_matrix, dtype=float) | |
| if S.ndim != 2 or S.shape[0] == 0: | |
| return None, None | |
| M = S @ S.T | |
| n = M.shape[0] | |
| cap = float(max_weight) | |
| w0 = np.ones(n) / n | |
| bnds = tuple((0.0, cap) for _ in range(n)) | |
| cons = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0},) | |
| def hhi(w): | |
| return float(np.dot(w, np.dot(M, w))) | |
| try: | |
| rmin = minimize(hhi, w0, method='SLSQP', bounds=bnds, constraints=cons, options={'maxiter': 500, 'ftol': 1e-10}) | |
| h_min = hhi(rmin.x) if rmin.success else hhi(w0) | |
| except Exception: | |
| h_min = hhi(w0) | |
| def fill(order): | |
| w = np.zeros(n) | |
| rem = 1.0 | |
| for i in order: | |
| wi = min(cap, rem) | |
| w[i] = wi | |
| rem -= wi | |
| if rem <= 1e-12: | |
| break | |
| if rem > 1e-9: | |
| w += rem / n | |
| return w | |
| best = hhi(w0) | |
| for s in [w0, fill(list(np.argsort(-np.diag(M))))]: | |
| best = max(best, hhi(s)) | |
| try: | |
| r = minimize(lambda w: -hhi(w), s, method='SLSQP', bounds=bnds, constraints=cons, options={'maxiter': 500, 'ftol': 1e-10}) | |
| if r.success: | |
| best = max(best, hhi(r.x)) | |
| except Exception: | |
| pass | |
| return float(h_min), float(max(best, h_min)) | |
| def liq_composite_vector(norm_vol, norm_aum, w_vol, w_aum): | |
| """合併流動性特徵:依使用者 Vol:AUM 權重比例加權正規化流動性(退化時各 0.5)。""" | |
| nv = np.asarray(norm_vol, dtype=float) | |
| na = np.asarray(norm_aum, dtype=float) | |
| den = float(w_vol) + float(w_aum) | |
| if den <= 0: | |
| return 0.5 * nv + 0.5 * na | |
| return (float(w_vol) * nv + float(w_aum) * na) / den | |
| def build_quality_constraints(global_weights, max_weight, cost_vec=None, sector_matrix=None, | |
| norm_liq_vol=None, norm_liq_aum=None, sent_vec=None): | |
| """組出品質約束(SLSQP ineq 清單)。逐維度受 QC_ENABLE_* 旗標控制(OAT)。 | |
| 成本/HHI 越低越好(≤門檻);流動性/情緒越高越好(≥門檻)。門檻皆在可行範圍內 → 恆可行。 | |
| functions.py 與 backtest_engine 共用本函式,確保兩系統一致。""" | |
| cons = [] | |
| if not getattr(parameters, "USE_QUALITY_CONSTRAINTS", True): | |
| return cons | |
| tmap = quality_tightness_map(global_weights) | |
| cap = float(max_weight) | |
| # 成本(越低越好) | |
| if getattr(parameters, "QC_ENABLE_COST", False) and tmap["cost"] > 0 and cost_vec is not None: | |
| x = np.asarray(cost_vec, dtype=float) | |
| vmin, vmax = feasible_linear_range(x, cap) | |
| if vmin is not None: | |
| thr = quality_threshold(vmin, vmax, tmap["cost"], lower_is_better=True) | |
| cons.append({'type': 'ineq', 'fun': lambda w, xx=x, t=thr: t - np.dot(w, xx)}) | |
| # 分散 HHI(越低越好) | |
| if getattr(parameters, "QC_ENABLE_HHI", False) and tmap["hhi"] > 0 and sector_matrix is not None and len(sector_matrix) > 0: | |
| hmin, hmax = feasible_hhi_range(sector_matrix, cap) | |
| if hmin is not None: | |
| thr = quality_threshold(hmin, hmax, tmap["hhi"], lower_is_better=True) | |
| cons.append({'type': 'ineq', 'fun': lambda w, sm=sector_matrix, t=thr: t - np.sum(np.dot(w, sm) ** 2)}) | |
| # 合併流動性(越高越好) | |
| if getattr(parameters, "QC_ENABLE_LIQ", False) and tmap["liq"] > 0 and norm_liq_vol is not None and norm_liq_aum is not None: | |
| lc = liq_composite_vector(norm_liq_vol, norm_liq_aum, tmap["w_vol"], tmap["w_aum"]) | |
| vmin, vmax = feasible_linear_range(lc, cap) | |
| if vmin is not None: | |
| thr = quality_threshold(vmin, vmax, tmap["liq"], lower_is_better=False) | |
| cons.append({'type': 'ineq', 'fun': lambda w, xx=lc, t=thr: np.dot(w, xx) - t}) | |
| # 情緒(越高越好) | |
| if getattr(parameters, "QC_ENABLE_SENT", False) and tmap["sent"] > 0 and sent_vec is not None: | |
| x = np.asarray(sent_vec, dtype=float) | |
| vmin, vmax = feasible_linear_range(x, cap) | |
| if vmin is not None: | |
| thr = quality_threshold(vmin, vmax, tmap["sent"], lower_is_better=False) | |
| cons.append({'type': 'ineq', 'fun': lambda w, xx=x, t=thr: np.dot(w, xx) - t}) | |
| return cons | |
| def compute_benchmark_cov_vector(etf_returns, benchmark_returns, periods_per_year=252): | |
| """回傳 (c, var_bench):U-C2 的 market / beta 核心共用原料,只需每日報酬,不需成分權重。 | |
| c[i] = Cov(rᵢ, r_bench) · periods_per_year (年化;各 ETF 對基準的共變異) | |
| var_bench = Var(r_bench) · periods_per_year (年化基準變異數) | |
| beta[i] = c[i] / var_bench (對基準回歸的 beta) | |
| market 核心 min ½wᵀΣw − wᵀc 等價於 min Var(r_p − r_bench)(對基準報酬流追蹤誤差²)。 | |
| c 依 etf_returns.columns 的順序回傳,與最佳化器的標的順序一致。 | |
| 對齊兩者共同交易日;資料不足回傳 (None, None)。 | |
| backtest_engine 直接 import 本函式,確保兩系統數值完全一致。 | |
| """ | |
| if etf_returns is None or getattr(etf_returns, "empty", True) or benchmark_returns is None: | |
| return None, None | |
| bench = pd.Series(benchmark_returns).dropna() | |
| if bench.empty: | |
| return None, None | |
| joined = etf_returns.join(bench.rename("__bench__"), how="inner").dropna(how="any") | |
| if joined.shape[0] < 2: | |
| return None, None | |
| b = joined["__bench__"].values | |
| var_bench = float(np.var(b, ddof=1)) * periods_per_year | |
| cols = list(etf_returns.columns) | |
| c = np.array([float(np.cov(joined[col].values, b, ddof=1)[0, 1]) for col in cols]) * periods_per_year | |
| return c, var_bench | |
| def get_benchmark_returns_aligned(returns_matrix, benchmark_ticker): | |
| """(主系統用)抓基準(VT)每日報酬並對齊到 returns_matrix 的交易日,回傳 Series;失敗回 None。 | |
| 回測引擎已有完整價格庫(含基準),由 caller 直接從 lookback 價格算基準報酬傳入,不走本函式。 | |
| """ | |
| try: | |
| bench_prices = get_or_fetch_historical_prices([benchmark_ticker]) | |
| if bench_prices is None or benchmark_ticker not in getattr(bench_prices, "columns", []): | |
| log.warning(f" [Arm C2] 取不到基準 {benchmark_ticker} 價格。") | |
| return None | |
| bench_ret = bench_prices[benchmark_ticker].pct_change(fill_method=None) | |
| return bench_ret.reindex(returns_matrix.index).dropna() | |
| except Exception as exc: | |
| log.warning(f" [Arm C2] 基準報酬對齊失敗:{exc}") | |
| return None | |
| def calc_buy_and_hold_portfolio_path(returns_matrix, weights): | |
| """ | |
| 依照主程式績效解讀邏輯建立投組路徑: | |
| 在 lookback 第一個可投資日用建議權重買入,後續不每日再平衡,直接持有到最後一天。 | |
| 回傳 buy-and-hold 的每日報酬與 NAV 路徑,供績效圖與深度健檢報告共用。 | |
| """ | |
| if returns_matrix is None or returns_matrix.empty: | |
| return pd.Series(dtype=float), pd.Series(dtype=float) | |
| returns = returns_matrix.fillna(0.0).copy() | |
| weights_array = np.array(weights, dtype=float) | |
| if len(weights_array) != len(returns.columns): | |
| raise ValueError("weights length must match returns_matrix columns.") | |
| weight_sum = weights_array.sum() | |
| if weight_sum <= 0: | |
| return pd.Series(dtype=float), pd.Series(dtype=float) | |
| weights_array = weights_array / weight_sum | |
| # 各 ETF 從第一天起各自複利成長,再用初始配置權重加總,就是不再平衡的投組 NAV。 | |
| asset_growth = (1 + returns).cumprod() | |
| nav = asset_growth.dot(weights_array) | |
| if isinstance(returns.index, pd.DatetimeIndex) and len(returns.index) > 0: | |
| initial_index = returns.index[0] - pd.Timedelta(days=1) | |
| else: | |
| initial_index = -1 | |
| nav = pd.concat([pd.Series([1.0], index=[initial_index]), nav]) | |
| nav = nav.sort_index() | |
| daily_returns = nav.pct_change(fill_method=None).dropna() | |
| return daily_returns, nav | |
| def calculate_individual_maxdd_bounds(returns_matrix, lower_quantile=0.01, upper_quantile=0.95): | |
| """ | |
| 用同一個 lookback 期間內各 ETF 自身的真實 MaxDD 分布建立分數尺度。 | |
| 這讓 true portfolio MaxDD score 和當期候選池風險環境一致,同時保留固定計算口徑。 | |
| """ | |
| if returns_matrix is None or returns_matrix.empty: | |
| return 0.0, 1.0 | |
| returns_values = np.nan_to_num(returns_matrix.values, nan=0.0) | |
| asset_nav = np.cumprod(1.0 + returns_values, axis=0) | |
| running_peak = np.maximum.accumulate(asset_nav, axis=0) | |
| drawdown = asset_nav / np.where(running_peak == 0, np.nan, running_peak) - 1.0 | |
| individual_maxdd = np.abs(np.nanmin(drawdown, axis=0)) | |
| individual_maxdd = individual_maxdd[np.isfinite(individual_maxdd)] | |
| if len(individual_maxdd) == 0: | |
| return 0.0, 1.0 | |
| lower_bound = float(np.quantile(individual_maxdd, lower_quantile)) | |
| upper_bound = float(np.quantile(individual_maxdd, upper_quantile)) | |
| if upper_bound <= lower_bound: | |
| upper_bound = lower_bound + 1e-9 | |
| return lower_bound, upper_bound | |
| def calculate_true_maxdd_score(weights, returns_values, lower_bound, upper_bound): | |
| """ | |
| 計算投組真實 buy-and-hold 最大回撤分數。 | |
| 分數越高代表回撤越小;若上下界失效,回傳 None 讓呼叫端使用舊 proxy fallback。 | |
| """ | |
| if upper_bound <= lower_bound: | |
| return None | |
| weights_array = np.asarray(weights, dtype=float) | |
| returns_array = np.nan_to_num(np.asarray(returns_values, dtype=float), nan=0.0) | |
| if returns_array.size == 0 or weights_array.size == 0: | |
| return None | |
| portfolio_returns = returns_array @ weights_array | |
| nav = np.cumprod(1.0 + portfolio_returns) | |
| if len(nav) == 0: | |
| return None | |
| running_peak = np.maximum.accumulate(nav) | |
| drawdown = nav / np.where(running_peak == 0, np.nan, running_peak) - 1.0 | |
| maxdd_abs = abs(float(np.nanmin(drawdown))) | |
| return float(1.0 - np.clip((maxdd_abs - lower_bound) / (upper_bound - lower_bound), 0.0, 1.0)) | |
| # ========================================== | |
| # stage0_0_get_sorted_ETFsorted | |
| # ========================================== | |
| def parse_aum(aum_str: str) -> float: | |
| """ | |
| 將帶有 M, B, K 後綴的 AUM 字串轉換為實際數值, | |
| 若遇到 N/A 或無法解析的格式則回傳 0.0。 | |
| """ | |
| # 處理空值或無效值 | |
| if not aum_str or aum_str in ["N/A", "-", "n/a"]: | |
| return 0.0 | |
| # 移除千分位逗號並去除頭尾空白 | |
| aum_str = aum_str.replace(",", "").strip() | |
| multiplier = 1.0 | |
| # 判斷並處理單位後綴 | |
| if aum_str.endswith("B"): | |
| multiplier = 1_000_000_000.0 | |
| aum_str = aum_str[:-1] | |
| elif aum_str.endswith("M"): | |
| multiplier = 1_000_000.0 | |
| aum_str = aum_str[:-1] | |
| elif aum_str.endswith("K"): | |
| multiplier = 1_000.0 | |
| aum_str = aum_str[:-1] | |
| try: | |
| # 轉換為浮點數並乘上對應的倍數 | |
| return float(aum_str) * multiplier | |
| except ValueError: | |
| return 0.0 | |
| def get_all_etfs() -> list[str]: | |
| """ | |
| 抓取所有 ETF 的資訊,並儲存完整的資料。 | |
| 具備 7 天快取過期檢查機制。 | |
| """ | |
| csv_file = "csv/all_etfs.csv" | |
| # 檢查檔案是否存在與是否過期 | |
| if os.path.exists(csv_file): | |
| # 取得檔案最後修改時間 | |
| file_mtime = datetime.fromtimestamp(os.path.getmtime(csv_file)) | |
| now = datetime.now() | |
| age = now - file_mtime | |
| if age <= timedelta(days=7): | |
| log.info(f"已找到 {csv_file} (上次更新:{file_mtime.strftime('%Y-%m-%d')},距今 {age.days} 天),直接讀取快取資料。") | |
| df = pd.read_csv(csv_file) | |
| return df["ticker"].tolist() | |
| else: | |
| log.info(f"檔案 {csv_file} 已過期 (距今 {age.days} 天,超過 7 天),準備重新抓取...") | |
| else: | |
| log.info(f"找不到 {csv_file},準備執行首次抓取...") | |
| base_url = "https://stockanalysis.com/etf/" | |
| headers = { | |
| "User-Agent": "Mozilla/5.0" | |
| } | |
| all_results = [] | |
| page = 1 | |
| while True: | |
| log.info(f"正在請求第 {page} 頁...") | |
| resp = requests.get(f"{base_url}?page={page}", headers=headers, timeout=15) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| table = soup.find("table") | |
| if not table: | |
| log.info("未找到任何資料,停止抓取。") | |
| break | |
| # 解析資料行 | |
| rows = table.find("tbody").find_all("tr") | |
| if not rows: | |
| log.info("已無更多資料可抓取,停止。") | |
| break | |
| for row in rows: | |
| cols = row.find_all("td") | |
| if len(cols) < 2: | |
| continue | |
| ticker = cols[0].get_text(strip=True) | |
| name = cols[1].get_text(strip=True) | |
| asset_class = cols[2].get_text(strip=True) | |
| aum = cols[3].get_text(strip=True) if len(cols) > 3 else "N/A" # 假設 AUM 在第 4 欄 | |
| all_results.append({ | |
| "ticker": ticker, | |
| "name": name, | |
| "asset_class": asset_class, | |
| "assets_aum": aum, | |
| }) | |
| log.info(f"第 {page} 頁資料抓取成功,共計資料行數:{len(rows)}") | |
| if len(rows) < 500: # 假設每頁最多 500 筆資料,少於 500 筆表示最後一頁 | |
| break | |
| page += 1 | |
| # 儲存結果 | |
| all_results.sort(key=lambda x: parse_aum(x["assets_aum"]), reverse=True) | |
| df = pd.DataFrame(all_results) | |
| # 確保資料夾存在再存檔 | |
| os.makedirs(os.path.dirname(csv_file), exist_ok=True) | |
| df.to_csv(csv_file, index=False) | |
| log.info(f"所有 ETF 資料已抓取完畢,更新儲存至 {csv_file}") | |
| return df["ticker"].tolist() | |
| # ========================================== | |
| # stage0_1_ETF_data_input | |
| # ========================================== | |
| # 模組 1:從本地 CSV 獲取 Top N 清單 | |
| def get_target_tickers_from_csv(file_path, top_n): | |
| if not os.path.exists(file_path): | |
| log.error(f"❌ 找不到 {file_path}") | |
| sys.exit(1) | |
| return [] | |
| df = pd.read_csv(file_path) | |
| # 防呆:確認有我們需要的欄位 | |
| if 'ticker' not in df.columns or 'asset_class' not in df.columns: | |
| log.error("❌ CSV 檔案格式錯誤,找不到 'ticker' 或 'asset_class' 欄位。") | |
| sys.exit(1) | |
| return [] | |
| # 第一層過濾:只保留 Equity (股票型),剔除 Fixed Income, Commodity 等 | |
| df_equity = df[df['asset_class'] == 'Equity'].copy() | |
| # 取前 N 大 | |
| target_tickers = df_equity['ticker'].head(top_n).tolist() | |
| log.info(f"📂 成功從 CSV 載入 AUM 前 {top_n} 大的純股票型 (Equity) ETF 清單。") | |
| return target_tickers | |
| # 模組 2:yahooquery 財務與流動性特徵 (防封鎖改良版) | |
| def fetch_etf_data_yq(ticker_list, period="3y", batch_size=10, YQ_OUTPUT_FILE=parameters.YQ_OUTPUT_FILE): | |
| log.info(f"\n🚀 啟動 yahooquery:加入防護機制,分批抓取 {period} 數據...") | |
| # 支援斷點續傳:檢查是否已有部分完成的檔案 | |
| if os.path.exists(YQ_OUTPUT_FILE): | |
| existing_df = pd.read_csv(YQ_OUTPUT_FILE) | |
| completed_tickers = existing_df['ETF'].tolist() | |
| log.info(f"📂 發現既有 YQ 資料庫,已完成 {len(completed_tickers)} 檔。") | |
| else: | |
| existing_df = pd.DataFrame() | |
| completed_tickers = [] | |
| # 濾除已經抓過的 ETF | |
| remaining_tickers = [t for t in ticker_list if t not in completed_tickers] | |
| if not remaining_tickers: | |
| log.info("✅ 所有 YQ 財務特徵皆已抓取完畢!") | |
| return existing_df | |
| log.info(f"⏳ 尚有 {len(remaining_tickers)} 檔待處理...") | |
| # 將剩餘名單切割為微型批次 (例如每 5 檔一組) | |
| for i in range(0, len(remaining_tickers), batch_size): | |
| batch = remaining_tickers[i:i + batch_size] | |
| log.info(f"\n📦 正在處理批次: {batch}") | |
| # 針對這小批次初始化 Ticker | |
| tq = Ticker(batch) | |
| # 抓取該批次的資料 | |
| history = tq.history(period=period) | |
| profiles = tq.fund_profile | |
| summary = tq.summary_detail | |
| data_records = [] | |
| for symbol in batch: | |
| try: | |
| # --- 1. 取得價格資料以計算長天期報酬與風險 --- | |
| if isinstance(history, pd.DataFrame) and symbol in history.index.levels[0]: | |
| symbol_hist = history.loc[symbol] | |
| trading_days = len(symbol_hist) | |
| years = trading_days / 252.0 | |
| if years < 2.5: | |
| log.warning(f"⚠️ {symbol} 歷史數據不足 2.5 年,跳過計算。") | |
| data_records.append({ | |
| "ETF": symbol, | |
| "Years_Data": np.nan, | |
| "Date": pd.Timestamp.now().strftime("%Y-%m-%d"), | |
| "Return_CAGR (%)": np.nan, | |
| "Return_Div (%)": np.nan, | |
| "Risk_Vol (%)": np.nan, | |
| "Risk_MaxDD (%)": np.nan, | |
| "Cost_ExpRatio (%)": np.nan, | |
| "Liq_Volume (M)": np.nan, | |
| "Liq_AUM (B)": np.nan | |
| }) | |
| continue | |
| start_price = symbol_hist['close'].iloc[0] | |
| end_price = symbol_hist['close'].iloc[-1] | |
| cagr = (end_price / start_price) ** (1 / years) - 1 | |
| daily_returns = symbol_hist['close'].pct_change().dropna() | |
| annual_volatility = daily_returns.std() * np.sqrt(252) | |
| cumulative_returns = (1 + daily_returns).cumprod() | |
| max_drawdown = ((cumulative_returns - cumulative_returns.cummax()) / cumulative_returns.cummax()).min() | |
| avg_volume = symbol_hist['volume'].mean() | |
| else: | |
| log.warning(f"⚠️ 找不到 {symbol} 的歷史價格。") | |
| continue | |
| # --- 2. 提取準確的 ETF Metadata --- | |
| prof = profiles.get(symbol, {}) if isinstance(profiles, dict) else {} | |
| summ = summary.get(symbol, {}) if isinstance(summary, dict) else {} | |
| if prof == f'No fundamentals data found for symbol: {symbol}': | |
| log.warning(f"⚠️ {symbol} 的 profile 資料格式異常,無法解析。") | |
| expense_ratio = None | |
| else: | |
| fees = prof.get('feesExpensesInvestment', {}) | |
| expense_ratio = fees.get('annualReportExpenseRatio') | |
| if expense_ratio is None: | |
| expense_ratio = summ.get('navPrice', 0) * 0 | |
| dividend_yield = summ.get('yield', 0) | |
| aum = summ.get('totalAssets', np.nan) | |
| data_records.append({ | |
| "ETF": symbol, | |
| "Years_Data": round(years, 1), | |
| "Date": pd.Timestamp.now().strftime("%Y-%m-%d"), | |
| "Return_CAGR (%)": round(cagr * 100, 2), | |
| "Return_Div (%)": round(dividend_yield * 100, 2) if dividend_yield else 0.0, | |
| "Risk_Vol (%)": round(annual_volatility * 100, 2), | |
| "Risk_MaxDD (%)": round(max_drawdown * 100, 2), | |
| "Cost_ExpRatio (%)": round(expense_ratio * 100, 3) if expense_ratio else np.nan, | |
| "Liq_Volume (M)": round(avg_volume / 1000000, 2), | |
| "Liq_AUM (B)": round(aum / 1000000000, 2) if pd.notnull(aum) else np.nan | |
| }) | |
| log.info(f"✅ YQ 成功處理: {symbol}") | |
| except Exception as e: | |
| log.warning(f"❌ YQ 處理 {symbol} 時發生錯誤: {e}") | |
| # 如果這個批次有成功抓到資料,立刻存檔 (斷點續傳) | |
| if data_records: | |
| batch_df = pd.DataFrame(data_records) | |
| existing_df = pd.concat([existing_df, batch_df], ignore_index=True) | |
| existing_df.to_csv(YQ_OUTPUT_FILE, index=False) | |
| # 關鍵防護:每跑完一個批次,隨機休眠 3 到 6 秒 | |
| if i + batch_size < len(remaining_tickers): | |
| sleep_time = round(random.uniform(2.5, 5.0), 1) | |
| log.info(f"💤 批次完成,隨機休眠 {sleep_time} 秒以防封鎖...") | |
| time.sleep(sleep_time) | |
| log.info(f"💾 YQ 財務特徵已全數抓取並儲存至 {YQ_OUTPUT_FILE}") | |
| return existing_df | |
| # 模組 3:Alpha Vantage 產業分散度特徵 | |
| def build_etf_database_av(target_tickers, AV_DB_FILE=parameters.AV_DB_FILE, AV_API_KEY=parameters.AV_API_KEY, AV_MAX_CALLS_PER_DAY=parameters.AV_MAX_CALLS_PER_DAY): | |
| log.info(f"\n🚀 啟動 Alpha Vantage:比對並更新分散度資料庫...") | |
| if os.path.exists(AV_DB_FILE): | |
| with open(AV_DB_FILE, 'r', encoding='utf-8') as f: | |
| db = json.load(f) | |
| else: | |
| db = {} | |
| calls_today = 0 | |
| log.info(f"📊 目前本地資料庫已有 {len(db)} 檔 ETF 的分散度資料。") | |
| for symbol in target_tickers: | |
| if symbol in db: | |
| continue # 已存在則跳過,不浪費 API 額度 | |
| if calls_today >= AV_MAX_CALLS_PER_DAY: | |
| log.warning(f"\n🛑 已達今日 AV 安全呼叫上限 ({AV_MAX_CALLS_PER_DAY}次)。請明天再執行後續代碼!") | |
| break | |
| log.info(f"⏳ 正在向 Alpha Vantage 請求 {symbol} 的結構資料...") | |
| url = f"https://www.alphavantage.co/query?function=ETF_PROFILE&symbol={symbol}&apikey={AV_API_KEY}" | |
| time.sleep(1) # 防呆休眠 | |
| try: | |
| response = requests.get(url).json() | |
| if "Information" in response or "Note" in response: | |
| log.warning(f"🛑 觸發 Alpha Vantage 頻率限制,程式提早暫停。") | |
| log.info(response) | |
| break | |
| if "sectors" in response and "holdings" in response: | |
| sectors = response["sectors"] | |
| holdings = response["holdings"] | |
| # 建立原始產業權重字典 | |
| raw_sector_weights = {} | |
| for s in sectors: | |
| industry_name = s.get('sector', 'Unknown') | |
| weight = float(s.get('weight', 0)) | |
| raw_sector_weights[industry_name] = weight | |
| # 第一層過濾:計算產業權重總和 | |
| total_sector_weight = sum(raw_sector_weights.values()) | |
| SECTOR_is_True = 1 | |
| # 如果權重涵蓋度不到 60% (0.6),判定為無效資料並剔除 | |
| if total_sector_weight < 0.6: | |
| log.warning(f"⚠️ {symbol} 權重涵蓋度過低 ({total_sector_weight*100:.1f}%),判定為無效資料並剔除。") | |
| SECTOR_is_True = 0 | |
| # 🚨 正規化映射:將產業權重等比例放大至 100% (1.0) | |
| if total_sector_weight > 0: | |
| normalized_sector_weights = { | |
| ind: round(w / total_sector_weight, 6) | |
| for ind, w in raw_sector_weights.items() | |
| } | |
| else: | |
| normalized_sector_weights = {} | |
| # 使用正規化後的權重計算 HHI (加總必為 1.0,不需再補償未涵蓋部分) | |
| sector_hhi = sum([w ** 2 for w in normalized_sector_weights.values()]) | |
| top3_weight = sum([float(h.get('weight', 0)) for h in holdings[:3]]) | |
| if SECTOR_is_True: | |
| db[symbol] = { | |
| "Sector_HHI": round(sector_hhi, 4), | |
| "Top3_Weight": round(top3_weight, 4), | |
| "Sector_Weights": normalized_sector_weights, # 儲存 100% 正規化後的產業分佈 | |
| "Last_Updated": time.strftime("%Y-%m-%d") | |
| } | |
| else: | |
| db[symbol] = { | |
| "Sector_HHI": -1.0, | |
| "Top3_Weight": -1.0, | |
| "Sector_Weights": {}, # 無效資料不儲存產業分佈 | |
| "Last_Updated": time.strftime("%Y-%m-%d") | |
| } | |
| # 寫入本地 JSON 檔案 | |
| with open(AV_DB_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(db, f, ensure_ascii=False, indent=4) | |
| log.info(f"✅ AV 成功計算並儲存 {symbol} (已正規化至 100%)。") | |
| else: | |
| log.warning(f"⚠️ 找不到 {symbol} 的結構資料 (可能非 ETF 或 API 缺漏)。") | |
| db[symbol] = { | |
| "Sector_HHI": -1.0, | |
| "Top3_Weight": -1.0, | |
| "Sector_Weights": {}, # 無效資料不儲存產業分佈 | |
| "Last_Updated": time.strftime("%Y-%m-%d") | |
| } | |
| calls_today += 1 | |
| time.sleep(1) # 防呆休眠 | |
| except Exception as e: | |
| log.error(f"❌ 處理 {symbol} 時發生錯誤: {e}") | |
| sys.exit(1) | |
| log.info(f"🏁 Alpha Vantage 更新排程執行完畢。資料庫儲存於 {AV_DB_FILE}") | |
| # 模組 4:Finnhub 新聞與 FinBERT 宏觀情緒特徵 | |
| # 系統設定 | |
| FINNHUB_API_KEY = parameters.FINNHUB_API_KEY # 請填入你的 Finnhub API Key | |
| def fetch_finnhub_news(ticker, days=180): | |
| """使用 Finnhub API 抓取過去指定天數的歷史新聞""" | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=days) | |
| start_str = start_date.strftime('%Y-%m-%d') | |
| end_str = end_date.strftime('%Y-%m-%d') | |
| url = f"https://finnhub.io/api/v1/company-news?symbol={ticker}&from={start_str}&to={end_str}&token={FINNHUB_API_KEY}" | |
| try: | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| log.warning(f"⚠️ 抓取 {ticker} 新聞時發生錯誤: {e}") | |
| return [] | |
| def calculate_macro_sentiment(ticker_symbol, half_life_days=60, finbert=None): | |
| """抓取新聞、評分,並套用指數時間衰減計算宏觀情緒""" | |
| news_items = fetch_finnhub_news(ticker_symbol, days=180) | |
| if not news_items: | |
| return 0.0 | |
| current_time = datetime.now().timestamp() | |
| decay_constant = np.log(2) / half_life_days | |
| total_weighted_score = 0.0 | |
| total_weight = 0.0 | |
| for article in news_items: | |
| headline = article.get('headline', '') | |
| summary = article.get('summary', '') | |
| timestamp = article.get('datetime', current_time) | |
| text = f"{headline}. {summary}".strip() | |
| if not text or text == ".": | |
| continue | |
| text = text[:500] | |
| try: | |
| res = finbert(text)[0] | |
| label = res['label'] | |
| confidence = res['score'] | |
| if label == 'positive': | |
| base_score = confidence | |
| elif label == 'negative': | |
| base_score = -confidence | |
| else: | |
| base_score = 0.0 | |
| except Exception: | |
| continue | |
| days_ago = max(0, (current_time - timestamp) / 86400) | |
| weight = np.exp(-decay_constant * days_ago) | |
| total_weighted_score += (base_score * weight) | |
| total_weight += weight | |
| if total_weight == 0: | |
| return 0.0 | |
| macro_score = max(-1.0, min(1.0, total_weighted_score / total_weight)) | |
| time.sleep(1.5) # 遵守免費 API 速率限制 | |
| return round(macro_score, 4) | |
| def append_sentiment_to_csv_legacy_live(csv_filepath=parameters.YQ_OUTPUT_FILE, max_age_days=180, print_preview=True): | |
| """ | |
| 讀取 CSV,計算情緒分數,並將結果存回原 CSV 檔案中。 | |
| """ | |
| if not os.path.exists(csv_filepath): | |
| log.error(f"❌ 錯誤:找不到檔案 {csv_filepath}") | |
| sys.exit(1) | |
| return | |
| if FINNHUB_API_KEY == "請填入你的_Finnhub_API_Key": | |
| log.error("❌ 警告:未設定 Finnhub API Key。") | |
| sys.exit(1) | |
| return | |
| log.info("\n" + "="*50) | |
| log.info(f" 📡 啟動 FinBERT 宏觀情緒動能擷取 ({csv_filepath})") | |
| log.info("="*50) | |
| # 1. 讀取 CSV | |
| df = pd.read_csv(csv_filepath) | |
| if 'ETF' not in df.columns: | |
| log.error("❌ 錯誤:CSV 中找不到 'ETF' 欄位。") | |
| sys.exit(1) | |
| return | |
| # 2. 確保快取欄位存在並格式化日期 | |
| if 'FinBERT_score' not in df.columns: | |
| df['FinBERT_score'] = np.nan | |
| if 'FinBERT_date' not in df.columns: | |
| df['FinBERT_date'] = pd.NaT # Pandas 的時間空值 (Not-a-Time) | |
| df['FinBERT_date'] = pd.to_datetime(df['FinBERT_date'], errors='coerce') | |
| etf_list = df['ETF'].tolist() | |
| total = len(etf_list) | |
| current_date = datetime.now() | |
| # 3. 逐一計算分數 | |
| etf_list = df['ETF'].tolist() | |
| total = len(etf_list) | |
| # 讀取 CSV 後,檢查哪些 ETF 已經算過了 | |
| if 'FinBERT_score' not in df.columns: | |
| df['FinBERT_score'] = np.nan # 初始化空欄位 | |
| log.info("⏳ 載入本地 FinBERT 模型中...") | |
| # 偵測「權重檔」是否存在(而非只看資料夾):repo 隨附 config/tokenizer,但權重(model.safetensors, | |
| # ~419MB)是 gitignore 不入庫,故 fresh clone 後資料夾存在但缺權重 → 須自動下載。 | |
| _finbert_dir = Path("local_finbert") | |
| _has_weights = (_finbert_dir / "model.safetensors").exists() or (_finbert_dir / "pytorch_model.bin").exists() | |
| if not _has_weights: | |
| # 1. 首次/缺權重時聯網下載模型與 Tokenizer(之後存到本地,不再重抓) | |
| log.info("⏳ 首次使用或缺少權重,正在下載 FinBERT 模型(ProsusAI/finbert)...") | |
| model_name = "ProsusAI/finbert" | |
| model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| # 2. 儲存到本地端資料夾 (例如命名為 local_finbert) | |
| _finbert_dir.mkdir(parents=True, exist_ok=True) | |
| model.save_pretrained("./local_finbert") | |
| tokenizer.save_pretrained("./local_finbert") | |
| log.info("✅ FinBERT 模型下載並儲存完成!") | |
| # ========================================== | |
| # 之後你每次要測試時,只需要指向本地資料夾即可: | |
| finbert = pipeline("sentiment-analysis", model="./local_finbert", tokenizer="./local_finbert") # 預載入 FinBERT 模型,避免重複載入浪費時間 | |
| log.info("✅ FinBERT 模型載入完成。") | |
| for i, ticker in enumerate(etf_list, 1): | |
| # 取得該 ETF 所在的列索引 (確保修改對齊) | |
| mask = df['ETF'] == ticker | |
| existing_score = df.loc[mask, 'FinBERT_score'].iloc[0] | |
| existing_date = df.loc[mask, 'FinBERT_date'].iloc[0] | |
| needs_update = True | |
| # 🚨 快取失效檢查 (TTL Logic) | |
| if pd.notna(existing_score) and pd.notna(existing_date): | |
| delta_days = (current_date - existing_date).days | |
| if delta_days < max_age_days: | |
| needs_update = False | |
| if not needs_update: | |
| continue | |
| # 若無快取或已過期,則執行 API 抓取與 NLP 推論 | |
| # 判斷 existing_date 是否為 NaT 來決定印出的訊息 | |
| if pd.notna(existing_date): | |
| status_msg = f"快取過期 ({existing_date.strftime('%Y-%m-%d')})" | |
| else: | |
| status_msg = "無快取紀錄 (首次抓取)" | |
| # 若無快取或已過期,則執行 API 抓取與 NLP 推論 | |
| log.info(f"[{i:03d}/{total}] {ticker:<5} {status_msg},執行 API 抓取與 NLP 推論...") | |
| score = calculate_macro_sentiment(ticker, finbert=finbert) | |
| # 將新分數與當下日期寫回 DataFrame | |
| df.loc[mask, 'FinBERT_score'] = score | |
| df.loc[mask, 'FinBERT_date'] = current_date.strftime('%Y-%m-%d') | |
| log.info(f" -> 分數: {score}") | |
| df.loc[df['ETF'] == ticker, 'FinBERT_score'] = score | |
| # 🚨 斷點續傳:每算完 10 檔就存檔一次,避免跑到一半當機心血全毀 | |
| if i % 10 == 0: | |
| df.to_csv(csv_filepath, index=False) | |
| if print_preview: | |
| log.info("💾 已建立暫存檔...") | |
| # 4. 存回原 CSV 檔案 | |
| df.to_csv(csv_filepath, index=False) | |
| log.info("="*50) | |
| log.info(f"✅ 情緒分數擷取完成,已成功寫入 {csv_filepath}") | |
| # 檢驗輸出 | |
| if print_preview: | |
| log.info("\n【資料預覽】") | |
| # 只取出字串格式的日期印出比較好看 | |
| preview_df = df.copy() | |
| preview_df['FinBERT_date'] = preview_df['FinBERT_date'].dt.strftime('%Y-%m-%d') | |
| log.info(preview_df[['ETF', 'FinBERT_score', 'FinBERT_date']].head(10).to_string(index=False)) | |
| def append_sentiment_to_csv(csv_filepath=parameters.YQ_OUTPUT_FILE, max_age_days=180, print_preview=True): | |
| """ | |
| 從 sentiment_engine 的每日情緒快取更新主程式 ETF 清單。 | |
| 這個函式保留原本 Stage 0 會呼叫的名稱,但資料來源改成 | |
| sentiment_engine/data/sentiment_daily_cache.csv。若某 ETF 在目前 as-of | |
| 日期之前沒有情緒資料,會寫入 0.0,代表中性情緒,避免後續偏好分數出現 NaN。 | |
| """ | |
| if not os.path.exists(csv_filepath): | |
| log.error(f"找不到要更新情緒分數的 CSV:{csv_filepath}") | |
| sys.exit(1) | |
| return | |
| log.info("\n" + "=" * 50) | |
| log.info(f"從 sentiment_engine 快取更新 FinBERT 情緒分數:{csv_filepath}") | |
| log.info("=" * 50) | |
| df = pd.read_csv(csv_filepath) | |
| if "ETF" not in df.columns: | |
| log.error("CSV 缺少 ETF 欄位,無法對應 sentiment cache。") | |
| sys.exit(1) | |
| return | |
| # 主程式使用當下日期做 as-of 查詢;cache 若只到最近交易日,會自動取最近一筆。 | |
| as_of_date = pd.Timestamp(datetime.now().date()) | |
| daily_sentiment = load_daily_sentiment() | |
| sentiment_frame = get_sentiment_frame_asof( | |
| df["ETF"].dropna().astype(str).tolist(), | |
| as_of_date, | |
| daily_df=daily_sentiment, | |
| neutral_score=0.0, | |
| ) | |
| sentiment_map = sentiment_frame.set_index("ticker")["sentiment_score"].to_dict() | |
| date_map = sentiment_frame.set_index("ticker")["sentiment_date"].to_dict() | |
| normalized_tickers = df["ETF"].astype(str).str.strip().str.upper() | |
| df["FinBERT_score"] = normalized_tickers.map(sentiment_map).fillna(0.0).astype(float) | |
| df["FinBERT_date"] = normalized_tickers.map(date_map) | |
| # 沒有新聞 cache 的 ETF 仍使用中性 0.0;日期留成 as-of,方便辨識這次更新時間。 | |
| missing_date_mask = pd.to_datetime(df["FinBERT_date"], errors="coerce").isna() | |
| df.loc[missing_date_mask, "FinBERT_date"] = as_of_date.strftime("%Y-%m-%d") | |
| df["FinBERT_date"] = pd.to_datetime(df["FinBERT_date"], errors="coerce").dt.strftime("%Y-%m-%d") | |
| df.to_csv(csv_filepath, index=False) | |
| log.info(f"已用情緒快取更新 {len(df)} 檔 ETF;快取不足者採用 0.0 中性分數。") | |
| if print_preview: | |
| preview_cols = ["ETF", "FinBERT_score", "FinBERT_date"] | |
| log.info("\n情緒分數預覽:") | |
| log.info(df[preview_cols].head(10).to_string(index=False)) | |
| def clean_existing_database(AV_DB_FILE=parameters.AV_DB_FILE): | |
| if not os.path.exists(AV_DB_FILE): | |
| log.error(f"❌ 找不到 {AV_DB_FILE}") | |
| sys.exit(1) | |
| return | |
| with open(AV_DB_FILE, 'r', encoding='utf-8') as f: | |
| db = json.load(f) | |
| original_count = len(db) | |
| keys_to_delete = [] | |
| for symbol, data in db.items(): | |
| hhi = data.get("Sector_HHI", 0) | |
| # 根據數學極限,HHI 低於 0.08 絕對是無效或缺失的資料 | |
| if hhi != -1.0 and hhi < 0.08: | |
| keys_to_delete.append(symbol) | |
| # 從字典中刪除異常標的 | |
| for k in keys_to_delete: | |
| del db[k] | |
| # 將乾淨的資料寫回檔案 | |
| with open(AV_DB_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(db, f, ensure_ascii=False, indent=4) | |
| log.info(f"✅ 資料庫清理完成!") | |
| log.info(f"清理前資料數:{original_count}") | |
| log.info(f"刪除了 {len(keys_to_delete)} 筆異常資料:{keys_to_delete}") | |
| log.info(f"目前剩餘 {len(db)} 筆有效資料。") | |
| def merge_final_features(AV_DB_FILE=parameters.AV_DB_FILE, YQ_FILE=parameters.YQ_OUTPUT_FILE, OUTPUT_FILE=parameters.OUTPUT_FILE): | |
| log.info("🚀 啟動 Stage 0 最終合併程序...") | |
| # 1. 讀取財務與流動性特徵 | |
| if not os.path.exists(YQ_FILE): | |
| log.error(f"❌ 找不到 {YQ_FILE},請確認是否已完成 YQ 資料抓取。") | |
| sys.exit(1) | |
| return | |
| df_yq = pd.read_csv(YQ_FILE) | |
| log.info(f"📄 成功讀取 {len(df_yq)} 檔 ETF 的財務特徵。") | |
| # 2. 讀取分散度特徵資料庫 | |
| if os.path.exists(AV_DB_FILE): | |
| with open(AV_DB_FILE, 'r', encoding='utf-8') as f: | |
| db_av = json.load(f) | |
| log.info(f"📄 成功讀取 {len(db_av)} 檔 ETF 的本地分散度資料。") | |
| else: | |
| db_av = {} | |
| log.warning("⚠️ 找不到本地分散度資料庫。") | |
| # 3. 計算 Div_Score 並準備對應字典 | |
| div_scores = {} | |
| for symbol, data in db_av.items(): | |
| sector_hhi = data.get("Sector_HHI") | |
| top3_weight = data.get("Top3_Weight") | |
| if pd.notna(sector_hhi) and sector_hhi > 0: | |
| base_score = 1 / sector_hhi | |
| penalty = 0.8 if top3_weight > 0.20 else 1.0 | |
| div_scores[symbol] = round(base_score * penalty, 4) | |
| # 4. 進行合併 (Left Join: 將 Div_Score 映射到主表中) | |
| df_yq['Div_Score (產出)'] = df_yq['ETF'].map(div_scores) | |
| # 5. 整理欄位順序並儲存 | |
| cols = ['ETF', 'Years_Data', 'Date', 'Return_CAGR (%)', 'Return_Div (%)', | |
| 'Risk_Vol (%)', 'Risk_MaxDD (%)', 'Cost_ExpRatio (%)', | |
| 'Liq_Volume (M)', 'Liq_AUM (B)', 'Div_Score (產出)', 'FinBERT_score'] | |
| # 確保只選取存在的欄位 (防呆) | |
| final_cols = [c for c in cols if c in df_yq.columns] | |
| df_final = df_yq[final_cols] | |
| df_final.to_csv(OUTPUT_FILE, index=False) | |
| log.info("\n=== Stage 0: 最終多維度特徵矩陣預覽 ===") | |
| log.info(df_final.head(15).to_string(index=False)) | |
| log.info(f"\n✅ 合併完成!最終矩陣已儲存為 {OUTPUT_FILE}") | |
| log.info(f"提示:其中未具備產業數據的債券/商品 ETF,其 Div_Score 欄位會保留為 NaN。") | |
| def patch_aum_from_csv(MATRIX_FILE=parameters.OUTPUT_FILE, UNIVERSE_FILE=parameters.CSV_UNIVERSE_FILE): | |
| log.info("🔧 啟動 AUM 數據校正與覆蓋程序...") | |
| try: | |
| # 1. 讀取目前的特徵矩陣與最原始的 CSV 清單 | |
| df_matrix = pd.read_csv(MATRIX_FILE) | |
| df_universe = pd.read_csv(UNIVERSE_FILE) | |
| # 2. 建立 AUM 字串轉數值 (以 Billion 為單位) 的函數 | |
| def parse_aum_to_billion(aum_str): | |
| if not isinstance(aum_str, str) or aum_str in ["N/A", "-", "n/a", ""]: | |
| return np.nan | |
| aum_str = aum_str.replace(",", "").strip() | |
| multiplier = 1.0 | |
| if aum_str.endswith("B"): | |
| multiplier = 1.0 # 已經是 Billion 級別 | |
| aum_str = aum_str[:-1] | |
| elif aum_str.endswith("M"): | |
| multiplier = 0.001 # Million 轉 Billion | |
| aum_str = aum_str[:-1] | |
| elif aum_str.endswith("K"): | |
| multiplier = 0.000001 # Kilo 轉 Billion | |
| aum_str = aum_str[:-1] | |
| try: | |
| return round(float(aum_str) * multiplier, 2) | |
| except ValueError: | |
| return np.nan | |
| # 3. 將原始 CSV 的 AUM 轉換為數值 | |
| df_universe['Real_AUM_B'] = df_universe['assets_aum'].apply(parse_aum_to_billion) | |
| # 4. 建立「代碼對應真實 AUM」的字典 | |
| aum_dict = dict(zip(df_universe['ticker'], df_universe['Real_AUM_B'])) | |
| # 5. 直接覆蓋矩陣中的 Liq_AUM (B) 欄位 | |
| df_matrix['Liq_AUM (B)'] = df_matrix['ETF'].map(aum_dict) | |
| # 6. 重新存檔 | |
| df_matrix.to_csv(MATRIX_FILE, index=False) | |
| log.info("✅ AUM 校正成功!已使用靜態 CSV 的數據覆蓋 Yahoo 雜訊。") | |
| # 印出前 5 名來驗證 (檢查 VOO, IVV 等數值是否已恢復正常) | |
| log.info("\n=== 校正後的 AUM 預覽 ===") | |
| log.info(df_matrix[['ETF', 'Liq_AUM (B)']].head().to_string(index=False)) | |
| except Exception as e: | |
| log.error(f"❌ 校正過程中發生錯誤: {e}") | |
| sys.exit(1) | |
| # ========================================== | |
| # stage0_2_EDA_and_Visualization | |
| # ========================================== | |
| def run_stage0_2_eda(): | |
| log.info("啟動 Stage 0: 探索性資料分析 (EDA) 視覺化...") | |
| # 1. 讀取 Stage 0 產出的原始資料 | |
| try: | |
| df = pd.read_csv("csv/stage0_final_matrix.csv") | |
| except FileNotFoundError: | |
| log.error("❌ 找不到 stage0_final_matrix.csv,請確認檔案路徑。") | |
| sys.exit(1) | |
| return | |
| # 定義需要觀察的 9 個基礎數值特徵 | |
| features_to_plot = [ | |
| 'Return_CAGR (%)', 'Return_Div (%)', | |
| 'Risk_Vol (%)', 'Risk_MaxDD (%)', | |
| 'Cost_ExpRatio (%)', | |
| 'Liq_Volume (M)', 'Liq_AUM (B)', | |
| 'Div_Score (產出)', 'FinBERT_score' | |
| ] | |
| # 過濾出實際存在於 df 中的欄位 (防呆) | |
| valid_features = [f for f in features_to_plot if f in df.columns] | |
| n_features = len(valid_features) | |
| if n_features == 0: | |
| log.error("❌ 找不到任何指定的數值特徵進行繪圖。") | |
| sys.exit(1) | |
| return | |
| # 2. 設置繪圖風格與中文字型 | |
| sns.set_theme(style="whitegrid") | |
| plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei', 'Noto Sans CJK TC', 'Noto Sans CJK SC', 'PingFang HK', 'Arial Unicode MS'] | |
| plt.rcParams['axes.unicode_minus'] = False | |
| # 3. 繪製特徵分佈總覽圖 (Histogram + KDE) | |
| fig_hist, axes_hist = plt.subplots(nrows=3, ncols=3, figsize=(15, 12)) | |
| axes_hist = axes_hist.flatten() | |
| for i, feature in enumerate(valid_features): | |
| sns.histplot(df[feature], kde=True, ax=axes_hist[i], color='steelblue', bins=30) | |
| axes_hist[i].set_title(f'Distribution: {feature}', fontweight='bold') | |
| axes_hist[i].set_xlabel('') | |
| axes_hist[i].set_ylabel('Frequency') | |
| # 隱藏多餘的子圖 | |
| for j in range(i + 1, len(axes_hist)): | |
| fig_hist.delaxes(axes_hist[j]) | |
| plt.tight_layout() | |
| fig_hist.savefig("png/eda_histograms_beforeDEA.png", dpi=300) | |
| #plt.show() | |
| plt.close(fig_hist) | |
| log.info("✅ 產出特徵分佈圖:eda_histograms.png") | |
| # 4. 繪製箱型圖 (Boxplot) 觀察極端值 | |
| fig_box, axes_box = plt.subplots(nrows=3, ncols=3, figsize=(15, 12)) | |
| axes_box = axes_box.flatten() | |
| for i, feature in enumerate(valid_features): | |
| sns.boxplot(x=df[feature], ax=axes_box[i], color='lightcoral') | |
| axes_box[i].set_title(f'Outliers: {feature}', fontweight='bold') | |
| axes_box[i].set_xlabel('') | |
| for j in range(i + 1, len(axes_box)): | |
| fig_box.delaxes(axes_box[j]) | |
| plt.tight_layout() | |
| fig_box.savefig("png/eda_boxplots_beforeDEA.png", dpi=300) | |
| #plt.show() | |
| plt.close(fig_box) | |
| log.info("✅ 產出特徵箱型圖:eda_boxplots.png") | |
| # 5. 輸出基礎統計量報告 | |
| log.info("\n=== 📊 基礎特徵統計量敘述 (Descriptive Statistics) ===") | |
| desc_stats = df[valid_features].describe().T | |
| # 增加偏度 (Skewness) 指標,大於 1 或小於 -1 代表嚴重偏態 | |
| desc_stats['skewness'] = df[valid_features].skew() | |
| log.info(desc_stats[['mean', 'std', 'min', '50%', 'max', 'skewness']].round(4).to_string()) | |
| # ========================================== | |
| # stage0_3_regularization_and_dimensionality_reduction | |
| # ========================================== | |
| def custom_minmax_scaler(series, feature_name, lower_bound_q=0.01, upper_bound_q=0.95, min_val=0.1, max_val=1.0): | |
| """ | |
| 自定義正規化函數:包含 1%~95% 縮尾處理,並映射至 [0.1, 1.0] 區間。 | |
| """ | |
| # 縮尾處理 (Winsorization) 消除極端離群值 | |
| lower_bound = series.quantile(lower_bound_q) | |
| upper_bound = series.quantile(upper_bound_q) | |
| clipped = series.clip(lower=lower_bound, upper=upper_bound) | |
| # 壓縮至 0~1 | |
| s_min = clipped.min() | |
| s_max = clipped.max() | |
| if s_max == s_min: | |
| scaled = np.zeros(len(clipped)) | |
| else: | |
| scaled = (clipped - s_min) / (s_max - s_min) | |
| # 線性映射至 [0.1, 1.0] | |
| final_scaled = min_val + scaled * (max_val - min_val) | |
| return final_scaled | |
| def run_stage0_normalization_and_reduction(): | |
| log.info("啟動 Stage 0: DEA 前置正規化與客觀降維...") | |
| try: | |
| df = pd.read_csv("csv/stage0_final_matrix.csv") | |
| except FileNotFoundError: | |
| log.error("❌ 找不到 stage0_final_matrix.csv。") | |
| sys.exit(1) | |
| return | |
| df_dea = pd.DataFrame({'ETF': df['ETF']}) | |
| # ========================================== | |
| # 1. DEA 產出項 (Outputs): 越大越好 | |
| # ========================================== | |
| # [報酬維度 R_P] - 拆成資本利得與殖利率兩個獨立 DEA 產出(選項 A),避免高成長零股息 ETF 被平均稀釋。 | |
| df_dea['Out_CAGR'] = custom_minmax_scaler(df['Return_CAGR (%)'], 'CAGR') | |
| df_dea['Out_Div'] = custom_minmax_scaler(df['Return_Div (%)'], 'Div') | |
| # [流動性維度 L_P] - 先取對數處理嚴重右偏 | |
| log_volume = np.log1p(df['Liq_Volume (M)']) | |
| log_aum = np.log1p(df['Liq_AUM (B)']) | |
| norm_vol = custom_minmax_scaler(log_volume, 'Volume') | |
| norm_aum = custom_minmax_scaler(log_aum, 'AUM') | |
| df_dea['Out_Liquidity'] = (norm_vol + norm_aum) / 2 | |
| # [分散度維度 D_P] | |
| df_dea['Out_Diversity'] = custom_minmax_scaler(df['Div_Score (產出)'], 'Diversity') | |
| # [市場情緒維度 S_P] | |
| df_dea['Out_Sentiment'] = custom_minmax_scaler(df['FinBERT_score'], 'Sentiment', lower_bound_q=0.05, upper_bound_q=0.95) | |
| # ========================================== | |
| # 2. DEA 投入項 (Inputs): 越小越好 (消耗的資源/承擔的風險) | |
| # ========================================== | |
| # [風險維度 V_P] - MaxDD 取絕對值代表回撤幅度 | |
| norm_risk_vol = custom_minmax_scaler(df['Risk_Vol (%)'], 'Risk_Vol') | |
| norm_maxdd = custom_minmax_scaler(df['Risk_MaxDD (%)'].abs(), 'Risk_MaxDD') | |
| df_dea['In_Risk'] = (norm_risk_vol + norm_maxdd) / 2 | |
| # [成本維度 C_P] | |
| df_dea['In_Cost'] = custom_minmax_scaler(df['Cost_ExpRatio (%)'], 'Cost') | |
| # ========================================== | |
| # 3. 儲存 DEA 專用矩陣 | |
| # ========================================== | |
| # 重新排列欄位,便於後續切片 | |
| cols = ['ETF', 'In_Risk', 'In_Cost', 'Out_CAGR', 'Out_Div', 'Out_Liquidity', 'Out_Diversity', 'Out_Sentiment'] | |
| df_dea = df_dea[cols] | |
| log.info("\n=== 📊 降維後 DEA 矩陣預覽 (皆落於 0.1 ~ 1.0 區間) ===") | |
| log.info(df_dea.head().round(4).to_string()) | |
| df_dea.to_csv("csv/stage0_dea_ready_matrix.csv", index=False) | |
| log.info("\n✅ 已輸出 stage0_dea_ready_matrix.csv") | |
| # ========================================== | |
| # stage1_DEA_efficiency_calculation | |
| # ========================================== | |
| def run_stage1_normalized_dea(): | |
| # 1. 讀取 Stage 0 正規化並降維後的 DEA 專用矩陣 | |
| try: | |
| df_raw = pd.read_csv("csv/stage0_dea_ready_matrix.csv") | |
| except FileNotFoundError: | |
| log.error("找不到 stage0_dea_ready_matrix.csv,請先執行前段正規化。") | |
| sys.exit(1) | |
| return | |
| # 2. 資料清洗:實作「分層 DEA」,先濾除含有 NaN 的非股票型資產 | |
| # 確保送入 linprog 的矩陣是完美的實數 | |
| input_cols = ['In_Risk', 'In_Cost'] | |
| output_cols = ['Out_CAGR', 'Out_Div', 'Out_Liquidity', 'Out_Diversity'] | |
| dea_required_cols = input_cols + output_cols | |
| invalid_mask = df_raw[dea_required_cols].isna().any(axis=1) | |
| dropped_count = int(invalid_mask.sum()) | |
| if dropped_count > 0: | |
| log.warning(f"?? Stage 1 DEA 排除 {dropped_count} 檔ETF,原因是 DEA 必要欄位含 NaN。") | |
| df = df_raw.loc[~invalid_mask].reset_index(drop=True) | |
| log.info(f"📊 載入資料:共 {len(df)} 檔 ETF 進入正規化 DEA 運算。") | |
| # 3. 定義投入 (Inputs) 與 產出 (Outputs) 欄位 | |
| # 投入:越小越好 (In_Risk, In_Cost) | |
| input_cols = ['In_Risk', 'In_Cost'] | |
| # 產出:越大越好 (Out_CAGR, Out_Div, Out_Liquidity, Out_Diversity) | |
| output_cols = ['Out_CAGR', 'Out_Div', 'Out_Liquidity', 'Out_Diversity'] | |
| # 提取數值矩陣 | |
| X = df[input_cols].values | |
| Y = df[output_cols].values | |
| n_dmus = len(df) # 決策單元 (ETF) 數量 | |
| n_inputs = X.shape[1] # 投入維度數 = 2 | |
| n_outputs = Y.shape[1] # 產出維度數 = 3 | |
| efficiencies = [] | |
| log.info("🚀 啟動 DEA (CCR Input-Oriented) 線性規劃求解...") | |
| # 4. 對每一檔 ETF 分別求解線性規劃 | |
| for k in range(n_dmus): | |
| # 目標函數:最大化 u^T * Y_k -> 轉為 scipy 的 Minimize -u^T * Y_k | |
| c = np.concatenate((np.zeros(n_inputs), -Y[k])) | |
| # 限制式 1:v^T * X_k = 1 | |
| A_eq = np.concatenate((X[k].reshape(1, -1), np.zeros((1, n_outputs))), axis=1) | |
| b_eq = np.array([1.0]) | |
| # 限制式 2:-v^T * X_j + u^T * Y_j <= 0 (對所有 j) | |
| A_ub = np.hstack((-X, Y)) | |
| b_ub = np.zeros(n_dmus) | |
| # 變數範圍 (Bounds):加入 Non-Archimedean infinitesimal 防止權重為 0 | |
| epsilon = 1e-6 | |
| bounds = [(epsilon, None) for _ in range(n_inputs + n_outputs)] | |
| # 執行求解 | |
| res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs') | |
| if res.success: | |
| eff = -res.fun | |
| # 處理極微小的浮點數誤差,確保最高分為 1.0 | |
| eff = min(round(eff, 4), 1.0) | |
| efficiencies.append(eff) | |
| else: | |
| log.warning(f"⚠️ 警告:{df['ETF'].iloc[k]} 求解失敗。") | |
| efficiencies.append(np.nan) | |
| # 5. 將效率分數寫回 DataFrame 並排序 | |
| df['DEA_Score'] = efficiencies | |
| # 顯示排版 | |
| display_cols = ['ETF', 'DEA_Score'] + input_cols + output_cols | |
| df_sorted = df[display_cols].sort_values(by='DEA_Score', ascending=False).reset_index(drop=True) | |
| # 篩選 | |
| efficient_frontier = df_sorted[df_sorted['DEA_Score'] == 1.0] | |
| relatively_efficient = df_sorted[df_sorted['DEA_Score'] >= 0.95] | |
| log.info(f"\n=== 🏆 效率前緣標的 (DEA Score = 1.0) 共 {len(efficient_frontier)} 檔 ===") | |
| log.info(efficient_frontier[['ETF', 'DEA_Score', 'In_Risk', 'In_Cost', 'Out_CAGR']].head(10).to_string(index=False)) | |
| log.info(f"\n具有一定競爭力的標的數量 (DEA Score >= 0.95) : {len(relatively_efficient)}") | |
| log.info(f"\n=== 📉 需要剃除的劣勢標的 (DEA Score < 0.80) 共 {len(df_sorted[df_sorted['DEA_Score'] < 0.80])} 檔 ===") | |
| log.info(df_sorted[df_sorted['DEA_Score'] < 0.80][['ETF', 'DEA_Score']].head(5).to_string(index=False)) | |
| # 儲存包含分數的最終結果 | |
| df_sorted.to_csv("csv/stage1_dea_results.csv", index=False) | |
| log.info("\n✅ 已將 DEA 分數儲存至 csv/stage1_dea_results.csv") | |
| def plot_dea_distribution(): | |
| # 1. 讀取 DEA 結果 | |
| try: | |
| df = pd.read_csv("csv/stage1_dea_results.csv") | |
| except FileNotFoundError: | |
| log.error("找不到 csv/stage1_dea_results.csv,請確認 DEA 模型已執行完畢。") | |
| sys.exit(1) | |
| return | |
| # 2. 設定繪圖風格與中文字型 | |
| sns.set_theme(style="whitegrid") | |
| # 若在 Windows 執行可使用微軟正黑體;Mac 可改為 'PingFang HK' 或 'Arial Unicode MS' | |
| plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei', 'Noto Sans CJK TC', 'Noto Sans CJK SC', 'PingFang HK', 'Arial Unicode MS'] | |
| plt.rcParams['axes.unicode_minus'] = False | |
| # 3. 建立畫布 | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| # 4. 繪製直方圖與 KDE 曲線 | |
| # 根據資料量自動調整 bins,未來資料變多時圖形依然平滑 | |
| sns.histplot(df['DEA_Score'], bins=20, kde=True, color='steelblue', edgecolor='black', ax=ax) | |
| # 5. 加上客觀閾值參考線 | |
| ax.axvline(x=1.0, color='red', linestyle='--', linewidth=2, label='Frontier (1.0)') | |
| top_frac = getattr(parameters, "DEA_TOP_FRACTION", 0.25) | |
| cutoff = float(df['DEA_Score'].quantile(1.0 - top_frac)) | |
| ax.axvline(x=cutoff, color='green', linestyle='--', linewidth=2, label=f'Top {top_frac*100:.0f}% cutoff ({cutoff:.3f})') | |
| # 6. 設定標題與標籤 | |
| ax.set_title('Stage 1: DEA Efficiency Score Distribution', fontsize=16, pad=15) | |
| ax.set_xlabel('DEA Efficiency Score', fontsize=12) | |
| ax.set_ylabel('ETF Count', fontsize=12) | |
| ax.legend() | |
| # 確保標籤不重疊 | |
| plt.tight_layout() | |
| # 7. 儲存圖片 | |
| output_filename = "png/dea_score_distribution.png" | |
| plt.savefig(output_filename, dpi=300) | |
| log.info(f"📊 統計圖繪製完成!已儲存為 {output_filename}") | |
| def run_stage1_super_efficiency_normalized(): | |
| log.info("🚀 啟動超級效率模型 (Super-Efficiency DEA) 運算...") | |
| # 1. 🚨 修正:讀取「正規化且降維後」的 DEA 專用矩陣 | |
| try: | |
| df_raw = pd.read_csv("csv/stage0_dea_ready_matrix.csv") | |
| except FileNotFoundError: | |
| log.error("❌ 找不到 stage0_dea_ready_matrix.csv,請先執行前段正規化。") | |
| sys.exit(1) | |
| return | |
| input_cols = ['In_Risk', 'In_Cost'] | |
| output_cols = ['Out_CAGR', 'Out_Div', 'Out_Liquidity', 'Out_Diversity'] | |
| dea_required_cols = input_cols + output_cols | |
| invalid_mask = df_raw[dea_required_cols].isna().any(axis=1) | |
| dropped_count = int(invalid_mask.sum()) | |
| if dropped_count > 0: | |
| log.warning(f"?? Super-Efficiency DEA 排除 {dropped_count} 檔ETF,原因是 DEA 必要欄位含 NaN。") | |
| df = df_raw.loc[~invalid_mask].reset_index(drop=True) | |
| # 2. 對齊 6 大維度 | |
| X = df[input_cols].values | |
| Y = df[output_cols].values | |
| n_dmus = len(df) | |
| n_inputs = X.shape[1] # 降為 2 維度 | |
| n_outputs = Y.shape[1] # 降為 4 維度 | |
| super_efficiencies = [] | |
| # 預先建立完整的 A_ub 基礎矩陣 (-X + Y) | |
| A_ub_base = np.hstack((-X, Y)) | |
| # 3. 針對每檔 ETF 進行超級效率求解 | |
| for k in range(n_dmus): | |
| c = np.concatenate((np.zeros(n_inputs), -Y[k])) | |
| A_eq = np.concatenate((X[k].reshape(1, -1), np.zeros((1, n_outputs))), axis=1) | |
| b_eq = np.array([1.0]) | |
| # 核心差異:刪除第 k 列 (不與自己比較) | |
| A_ub_k = np.delete(A_ub_base, k, axis=0) | |
| b_ub_k = np.zeros(n_dmus - 1) | |
| epsilon = 1e-6 | |
| bounds = [(epsilon, None) for _ in range(n_inputs + n_outputs)] | |
| res = linprog(c, A_ub=A_ub_k, b_ub=b_ub_k, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs') | |
| if res.success: | |
| eff = -res.fun | |
| super_efficiencies.append(round(eff, 4)) | |
| else: | |
| # 正規化後的數據極少無解,若無解通常是強烈的離群訊號 | |
| log.warning(f"⚠️ 警告:{df['ETF'].iloc[k]} 求解無解 (Infeasible)。") | |
| super_efficiencies.append(np.nan) | |
| # 4. 寫回 DataFrame 並排序 | |
| df['Super_Score'] = super_efficiencies | |
| df_sorted = df.sort_values(by='Super_Score', ascending=False).reset_index(drop=True) | |
| # 5. 分析與印出結果 | |
| log.info("\n=== 🚨 疑似極端值/異類 (Super_Score > 1.5) ===") | |
| outliers = df_sorted[df_sorted['Super_Score'] > 1.5] | |
| if not outliers.empty: | |
| log.info(outliers[['ETF', 'Super_Score', 'In_Cost', 'Out_CAGR']].to_string(index=False)) | |
| log.info("💡 洞察:這些標的分數突破天際,建議檢查是否為單一維度極端產生的數學扭曲。") | |
| else: | |
| log.info("無發現嚴重極端值,資料庫品質十分穩定。") | |
| log.info("\n=== 🏆 優秀且穩健的前緣標的 (1.0 <= Super_Score <= 1.5) ===") | |
| robust_frontiers = df_sorted[(df_sorted['Super_Score'] >= 1.0) & (df_sorted['Super_Score'] <= 1.5)] | |
| log.info(robust_frontiers[['ETF', 'Super_Score']].head(10).to_string(index=False)) | |
| df_sorted.to_csv("csv/stage1_super_efficiency_results.csv", index=False) | |
| log.info("\n✅ 超級效率運算完成,結果已儲存為 csv/stage1_super_efficiency_results.csv") | |
| def run_cross_efficiency_dea(): | |
| log.info("啟動 Stage 1: 交叉評估 (Cross-Efficiency) 運算...") | |
| # 1. 讀取 Stage 1 第一階段 (標準 DEA) 的結果 | |
| try: | |
| df_raw = pd.read_csv("csv/stage1_dea_results.csv") | |
| except FileNotFoundError: | |
| log.error("找不到 csv/stage1_dea_results.csv,請先執行標準 DEA 模型。") | |
| sys.exit(1) | |
| return | |
| # 2. 候選池改用「取前 25%」百分位門檻(取代絕對 0.80),對 universe 變動更穩定。 | |
| top_frac = getattr(parameters, "DEA_TOP_FRACTION", 0.25) | |
| df_valid = df_raw.dropna(subset=['DEA_Score']) | |
| n_keep = max(1, int(np.ceil(len(df_valid) * top_frac))) | |
| df = df_valid.nlargest(n_keep, 'DEA_Score').reset_index(drop=True) | |
| log.info(f"篩選完畢:取 DEA 前 {top_frac*100:.0f}% 共 {len(df)} 檔 ETF 進入交叉評估矩陣。") | |
| input_cols = ['In_Risk', 'In_Cost'] | |
| output_cols = ['Out_CAGR', 'Out_Div', 'Out_Liquidity', 'Out_Diversity'] | |
| X = df[input_cols].values | |
| Y = df[output_cols].values | |
| n_dmus = len(df) | |
| n_inputs = X.shape[1] | |
| n_outputs = Y.shape[1] | |
| # 建立 n x n 交叉評估矩陣 | |
| cross_matrix = np.zeros((n_dmus, n_dmus)) | |
| # 3. 矩陣運算核心 | |
| for k in range(n_dmus): | |
| # 步驟 A: 求解 DMU_k 的標準 DEA 以取得其專屬的「最佳權重」 | |
| c = np.concatenate((np.zeros(n_inputs), -Y[k])) | |
| A_eq = np.concatenate((X[k].reshape(1, -1), np.zeros((1, n_outputs))), axis=1) | |
| b_eq = np.array([1.0]) | |
| A_ub = np.hstack((-X, Y)) | |
| b_ub = np.zeros(n_dmus) | |
| epsilon = 1e-6 | |
| bounds = [(epsilon, None) for _ in range(n_inputs + n_outputs)] | |
| res = linprog(c, A_ub=A_ub, b_ub=b_ub, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs') | |
| if res.success: | |
| v_star = res.x[:n_inputs] | |
| u_star = res.x[n_inputs:] | |
| # 步驟 B: 使用 DMU_k 的價值觀 (權重),去評分所有的 DMU_j | |
| for j in range(n_dmus): | |
| virtual_output = np.dot(u_star, Y[j]) | |
| virtual_input = np.dot(v_star, X[j]) | |
| if virtual_input > 0: | |
| cross_matrix[k][j] = virtual_output / virtual_input | |
| else: | |
| cross_matrix[k][j] = 0.0 | |
| else: | |
| log.warning(f"警告:DMU {df['ETF'].iloc[k]} 求解失敗。") | |
| cross_matrix[k, :] = np.nan | |
| # 4. 計算最終交叉分數 (同儕評分的平均值) | |
| cross_efficiency_scores = np.nanmean(cross_matrix, axis=0) | |
| # 5. 整合結果與輸出 | |
| df['Cross_Score'] = np.round(cross_efficiency_scores, 4) | |
| # 依照交叉分數進行絕對排序 | |
| df_sorted = df.sort_values(by='Cross_Score', ascending=False).reset_index(drop=True) | |
| log.info("\n=== 🏆 交叉評估最終排序 (Top 15 候選池) ===") | |
| display_cols = ['ETF', 'Cross_Score', 'DEA_Score'] + input_cols + output_cols | |
| log.info(df_sorted[display_cols].head(15).to_string(index=False)) | |
| output_file = "csv/stage1_final_candidates.csv" | |
| df_sorted.to_csv(output_file, index=False) | |
| log.info(f"\n資料儲存完畢:候選名單已輸出至 {output_file}") | |
| # ========================================== | |
| # stage2_0_AHP_weight_final_candidates_selection | |
| # ========================================== | |
| class TwoLevel_AHP_Model: | |
| def __init__(self): | |
| # --- 第一層:主維度 --- | |
| self.main_criteria = [ | |
| "Return_Main", | |
| "Risk_Main", | |
| "Cost_Main", | |
| "Liquidity_Main", | |
| "Diversity_Main", | |
| "Sentiment_Main" | |
| ] | |
| # --- 第二層:子特徵 --- | |
| self.sub_criteria = { | |
| "Return_Main": ["Return_CAGR", "Return_Div"], | |
| "Risk_Main": ["Risk_Vol", "Risk_MaxDD"], | |
| "Liquidity_Main": ["Liq_Volume", "Liq_AUM"], | |
| "Cost_Main": ["Cost_ExpRatio"], # 單一特徵,權重為 1 | |
| "Diversity_Main": ["Div_Score"], # 單一特徵,權重為 1 | |
| "Sentiment_Main": ["FinBERT_score"] # 單一特徵,權重為 1 | |
| } | |
| # 定義不同矩陣大小對應的 Random Index (RI) 查表值 | |
| self.RI_dict = {1: 0.0, 2: 0.0, 3: 0.58, 4: 0.90, 5: 1.12, 6: 1.24} | |
| def _solve_matrix(self, comparisons, n): | |
| """通用的 AHP 矩陣求解器""" | |
| if n == 1: | |
| return np.array([1.0]), 0.0 # 單一特徵不需比較 | |
| num_comparisons = (n * (n - 1)) // 2 | |
| if len(comparisons) != num_comparisons: | |
| raise ValueError(f"維度為 {n} 的矩陣需要 {num_comparisons} 個成對比較值。") | |
| matrix = np.ones((n, n)) | |
| idx = 0 | |
| for i in range(n): | |
| for j in range(i + 1, n): | |
| val = comparisons[idx] + 1e-6 # 加入微小值防止除以零 | |
| matrix[i, j] = val | |
| matrix[j, i] = 1.0 / val | |
| idx += 1 | |
| eigenvalues, eigenvectors = np.linalg.eig(matrix) | |
| max_idx = np.argmax(np.real(eigenvalues)) | |
| max_eigenvalue = np.real(eigenvalues[max_idx]) | |
| eigenvector = np.real(eigenvectors[:, max_idx]) | |
| weights = eigenvector / (np.sum(eigenvector) + 1e-6) # 加入微小值防止除以零 | |
| # 當 n<=2 時,CR 理論上恆為 0,且 RI 為 0 無法相除,因此直接回傳 CR=0 | |
| if n <= 2: | |
| CR = 0.0 | |
| else: | |
| CI = (max_eigenvalue - n) / (n - 1) | |
| CR = CI / (self.RI_dict[n] + 1e-6) # 加入微小值防止除以零 | |
| return weights, CR | |
| def calculate_global_weights(self, user_inputs): | |
| """ | |
| 傳入使用者的問卷結果 (包含第一層與需要比較的第二層)。 | |
| 計算出最終 9 個子特徵的全局權重。 | |
| """ | |
| log.info("\n🚀 啟動兩層級 AHP (Two-Level AHP) 運算...") | |
| # 1. 求解第一層主維度權重 | |
| log.info("\n[第一層:主維度求解]") | |
| main_weights, main_cr = self._solve_matrix(user_inputs["Main"], len(self.main_criteria)) | |
| main_weight_dict = {crit: weight for crit, weight in zip(self.main_criteria, main_weights)} | |
| for crit, weight in main_weight_dict.items(): | |
| log.info(f" - {crit}: {weight*100:.2f}%") | |
| log.info(f" >> 一致性比率 (CR): {main_cr:.4f}") | |
| if main_cr > 0.1: | |
| log.warning("❌ 警告:主維度問卷存在邏輯矛盾 (CR > 0.1)!") | |
| # 2. 求解第二層子特徵權重並計算全局權重 | |
| log.info("\n[第二層:子特徵局部權重與全局權重]") | |
| global_weights = {} | |
| for main_crit in self.main_criteria: | |
| subs = self.sub_criteria[main_crit] | |
| main_w = main_weight_dict[main_crit] | |
| # 如果該維度只有 1 個子特徵,局部權重為 1.0 | |
| if len(subs) == 1: | |
| sub_name = subs[0] | |
| global_weights[sub_name] = main_w * 1.0 | |
| log.info(f" - {sub_name} (單一特徵) -> 全局權重: {global_weights[sub_name]*100:.2f}%") | |
| continue | |
| # 如果有多個子特徵,需要求解次矩陣 | |
| sub_comparisons = user_inputs["Sub"].get(main_crit, []) | |
| local_weights, _ = self._solve_matrix(sub_comparisons, len(subs)) | |
| for i, sub_name in enumerate(subs): | |
| global_w = main_w * local_weights[i] | |
| global_weights[sub_name] = global_w | |
| log.info(f" - {sub_name} (局部 {local_weights[i]*100:.1f}%) -> 全局權重: {global_w*100:.2f}%") | |
| return global_weights, main_cr | |
| # ========================================== | |
| # 互動式問卷前處理器 (Questionnaire Preprocessor) | |
| # ========================================== | |
| def ask_question_json_slider(question_data): | |
| """處理 -5 到 +5 刻度的 AHP 問卷互動""" | |
| print(f"\n{question_data['question']}") | |
| print(f" {question_data['left_pole']} <-- [ 0: 同等重要 ] --> {question_data['right_pole']}") | |
| # 定義 -4 到 4 映射到 AHP 1~9 的字典 | |
| ahp_mapping = { | |
| 0: 1.0, | |
| 1: 3.0, 2: 5.0, 3: 7.0, 4: 9.0, | |
| -1: 1/3.0, -2: 1/5.0, -3: 1/7.0, -4: 1/9.0 | |
| } | |
| while True: | |
| try: | |
| ans = int(input("請輸入你的偏好程度 (-4 到 4 的整數): ").strip()) | |
| if -4 <= ans <= 4: | |
| return ahp_mapping[ans] | |
| else: | |
| print("❌ 請輸入介於 -4 到 4 之間的整數。") | |
| except ValueError: | |
| print("❌ 輸入格式錯誤,請輸入整數。") | |
| def build_user_simulation(deterministic=False, questionnaire_path="json/questionnaire.json"): | |
| """讀取外部 JSON 問卷,進行標準 AHP 成對比較 (-5 到 5 刻度)""" | |
| if deterministic: | |
| # 開發測試用:直接回傳寫死的值 | |
| return parameters.DETERMINISTIC_USER_INPUTS | |
| try: | |
| with open(questionnaire_path, 'r', encoding='utf-8') as f: | |
| q_data = json.load(f) | |
| except FileNotFoundError: | |
| log.error(f"❌ 找不到問卷設定檔:{questionnaire_path}") | |
| sys.exit(1) | |
| return None | |
| print("="*60) | |
| print(" 🧠 智能理財引擎:高解析度 AHP 成對比較問卷 (-4 到 +4 刻度)") | |
| print("="*60) | |
| main_comparisons = [] | |
| for item in q_data["Main_Comparisons"]: | |
| val = ask_question_json_slider(item) | |
| main_comparisons.append(val) | |
| print("\n" + "-"*40) | |
| print("進階偏好設定 (子特徵成對比較)") | |
| sub_inputs = {} | |
| for main_crit, sub_q in q_data["Sub_Comparisons"].items(): | |
| val = ask_question_json_slider(sub_q) | |
| sub_inputs[main_crit] = [val] | |
| return { | |
| "Main": main_comparisons, | |
| "Sub": sub_inputs | |
| } | |
| # ========================================== | |
| # stage2_1_preference_driven_deduplication | |
| # ========================================== | |
| def robust_scale(series, upper_quantile=0.95, lower_quantile=0.01, is_reverse=False): | |
| """自定義正規化:包含縮尾處理,並壓縮至 0~1 區間""" | |
| lower_bound = series.quantile(lower_quantile) | |
| upper_bound = series.quantile(upper_quantile) | |
| clipped = series.clip(lower=lower_bound, upper=upper_bound) | |
| s_min = clipped.min() | |
| s_max = clipped.max() | |
| if s_max == s_min: | |
| scaled = np.zeros(len(clipped)) | |
| else: | |
| scaled = (clipped - s_min) / (s_max - s_min) | |
| # 如果是風險或成本 (越小越好),計算偏好分數時必須反向 (越高分代表效用越大) | |
| if is_reverse: | |
| return 1.0 - scaled | |
| return scaled | |
| def run_stage2_5_preference_deduplication_yq(): | |
| log.info("啟動 Stage 2_2: 高相關 ETF 分群與偏好篩選 (白名單過濾與原始數據重構)...") | |
| # 1. 讀取 Stage 2 的兩層級 9 維全局權重 | |
| try: | |
| with open("json/stage2_ahp_global_weights.json", "r", encoding="utf-8") as f: | |
| ahp_data = json.load(f) | |
| global_weights = ahp_data["Global_Weights"] | |
| except FileNotFoundError: | |
| log.error("❌ 找不到 stage2_ahp_global_weights.json。") | |
| sys.exit(1) | |
| return | |
| # 2. 讀取 Stage 1 白名單,與 Stage 0 原始數據 | |
| try: | |
| df_candidates = pd.read_csv("csv/stage1_final_candidates.csv") | |
| df_raw = pd.read_csv("csv/stage0_final_matrix.csv") | |
| except FileNotFoundError: | |
| log.error("❌ 找不到 stage1 或 stage0 的 csv 檔案。") | |
| sys.exit(1) | |
| return | |
| # 使用 Stage 1 產出的 ETF 名單作為白名單過濾 Stage 0 的原始數據 | |
| valid_tickers = df_candidates['ETF'].tolist() | |
| df = df_raw[df_raw['ETF'].isin(valid_tickers)].reset_index(drop=True) | |
| log.info(f"📥 成功載入 {len(df)} 檔候選 ETF 之原始特徵。") | |
| # 3. 9 大子特徵獨立正規化 | |
| df_scaled = pd.DataFrame({'ETF': df['ETF']}) | |
| # [正向特徵] 全面縮尾處理 | |
| df_scaled['Norm_Return_CAGR'] = robust_scale(df['Return_CAGR (%)'], upper_quantile=getattr(parameters, 'PREF_SCORE_CAGR_UPPER_Q', 0.99), lower_quantile=0.01) # 放寬上尾,獎勵高成長(展示層;noCAGR 最佳化不受影響) | |
| df_scaled['Norm_Return_Div'] = robust_scale(df['Return_Div (%)']) | |
| df_scaled['Norm_Div_Score'] = robust_scale(df['Div_Score (產出)'].fillna(0), upper_quantile=0.95, lower_quantile=0.05) # 分散分數分布較平均 | |
| df_scaled['Norm_FinBERT'] = robust_scale(df['FinBERT_score'].fillna(0), upper_quantile=0.95, lower_quantile=0.05) # 情緒分數分布較平均 | |
| # [流動性特徵] 先取對數,再嚴格進行縮尾處理 | |
| df_scaled['Norm_Liq_Volume'] = robust_scale(np.log1p(df['Liq_Volume (M)'])) | |
| df_scaled['Norm_Liq_AUM'] = robust_scale(np.log1p(df['Liq_AUM (B)'])) | |
| # [反向特徵] 縮尾處理後反轉 (1 - scaled) | |
| df_scaled['Norm_Risk_Vol'] = robust_scale(df['Risk_Vol (%)'], is_reverse=True) | |
| df_scaled['Norm_Risk_MaxDD'] = robust_scale(df['Risk_MaxDD (%)'].abs(), is_reverse=True) | |
| df_scaled['Norm_Cost_ExpRatio'] = robust_scale(df['Cost_ExpRatio (%)'], is_reverse=True) | |
| # 4. 計算使用者偏好分數 (User_Pref_Score) | |
| feature_map = { | |
| "Return_CAGR": 'Norm_Return_CAGR', | |
| "Return_Div": 'Norm_Return_Div', | |
| "Risk_Vol": 'Norm_Risk_Vol', | |
| "Risk_MaxDD": 'Norm_Risk_MaxDD', | |
| "Liq_Volume": 'Norm_Liq_Volume', | |
| "Liq_AUM": 'Norm_Liq_AUM', | |
| "Cost_ExpRatio": 'Norm_Cost_ExpRatio', | |
| "Div_Score": 'Norm_Div_Score', | |
| "FinBERT_score": 'Norm_FinBERT' | |
| } | |
| pref_scores = np.zeros(len(df)) | |
| for key, weight in global_weights.items(): | |
| if key in feature_map: | |
| col_name = feature_map[key] | |
| pref_scores += df_scaled[col_name].values * weight | |
| df['User_Pref_Score'] = pref_scores | |
| # 5. 下載歷史價格進行相關性計算 | |
| price_matrix = get_or_fetch_historical_prices(valid_tickers) | |
| returns_matrix = prepare_aligned_returns(price_matrix, valid_tickers) | |
| # 🚨 修正:先向下填補(處理中間斷層),再向上填補(處理頭部缺漏) | |
| # 最後才使用 dropna(axis=1) 把那些「即使雙向填補後依然全是 NaN」的無效標的給剔除 | |
| # 對齊成功抓到價格的 ETF | |
| final_tickers = returns_matrix.columns.tolist() | |
| df = df[df['ETF'].isin(final_tickers)].reset_index(drop=True) | |
| corr_matrix = returns_matrix[final_tickers].corr() | |
| # 6. 相關性分群去重 (Threshold = 0.99)啟動本地快取引擎 (請求總數: 20 檔)... | |
| CORR_THRESHOLD = 0.99 | |
| clusters = [] | |
| processed_tickers = set() | |
| sorted_tickers = df.sort_values(by='User_Pref_Score', ascending=False)['ETF'].tolist() | |
| for ticker in sorted_tickers: | |
| if ticker in processed_tickers: | |
| continue | |
| correlated = corr_matrix.index[corr_matrix[ticker] >= CORR_THRESHOLD].tolist() | |
| cluster = [t for t in correlated if t not in processed_tickers] | |
| if cluster: | |
| clusters.append(cluster) | |
| processed_tickers.update(cluster) | |
| # 7. 挑選偏好分數最高代表 | |
| final_portfolio_candidates = [] | |
| log.info("\n=== 🎯 Stage 2_2 分群篩選結果 ===") | |
| for i, cluster in enumerate(clusters): | |
| cluster_df = df[df['ETF'].isin(cluster)] | |
| best_etf = cluster_df.loc[cluster_df['User_Pref_Score'].idxmax()] | |
| final_portfolio_candidates.append(best_etf) | |
| if len(cluster) > 1: | |
| log.info(f" > 群集 {cluster} -> 🏆 勝出: {best_etf['ETF']} (Score: {best_etf['User_Pref_Score']:.4f})") | |
| log.info(cluster_df[['ETF', 'User_Pref_Score']].to_string(index=False)) | |
| final_df = pd.DataFrame(final_portfolio_candidates).sort_values(by='User_Pref_Score', ascending=False).reset_index(drop=True) | |
| # 🚨 新增:提取最終名單對應的「正規化特徵矩陣」並存檔 | |
| final_tickers = final_df['ETF'].tolist() | |
| final_scaled_df = df_scaled[df_scaled['ETF'].isin(final_tickers)].reset_index(drop=True) | |
| final_df.to_csv("csv/stage2_final_user_universe.csv", index=False) | |
| final_scaled_df.to_csv("csv/stage2_normalized_features.csv", index=False) | |
| log.info("\n✅ 資料已輸出至 csv/stage2_final_user_universe.csv") | |
| log.info("✅ 正規化特徵矩陣已輸出至 csv/stage2_normalized_features.csv") | |
| # ========================================== | |
| # stage3_Preference_Driven_Portfolio_Optimization | |
| # ========================================== | |
| USE_TRUE_HHI_OPTIMIZATION = parameters.USE_TRUE_HHI_OPTIMIZATION | |
| # ========================================== | |
| # 信託底線權重 (Baseline Weights) 與 Alpha 融合 | |
| # ========================================== | |
| # 系統信託底線的佔比 (0.0 = 完全聽從使用者, 1.0 = 完全使用系統底線, 0.5 = 各半) | |
| ALPHA_BASELINE = parameters.ALPHA_BASELINE | |
| VOL_SCORE_FLOOR = 0.08 | |
| VOL_SCORE_CAP = 0.30 | |
| USE_TRUE_MDD_OPTIMIZATION = True | |
| TRUE_MDD_TIME_WARNING_SECONDS = 30.0 | |
| # 專家先驗權重矩陣 (總和必須為 1.0) | |
| BASELINE_WEIGHTS = parameters.BASELINE_WEIGHTS | |
| case = parameters.CASE_NAME | |
| # ========================================== | |
| # 模組:建構 N x K 真實產業矩陣 (Sector Matrix S) | |
| # ========================================== | |
| def build_sector_matrix(etf_list, db_file): | |
| """ | |
| 將所有候選 ETF 的 JSON 產業分布,轉換為 N x K 的數學矩陣。 | |
| N = ETF 數量, K = 所有出現過的獨特產業數量 | |
| """ | |
| if not os.path.exists(db_file): | |
| log.error("⚠️ 找不到 AV 資料庫,無法建立真實產業矩陣。") | |
| sys.exit(1) | |
| return None, [] | |
| with open(db_file, 'r', encoding='utf-8') as f: | |
| db = json.load(f) | |
| asset_class_lookup = {} | |
| if os.path.exists(parameters.CSV_UNIVERSE_FILE): | |
| try: | |
| universe_df = pd.read_csv(parameters.CSV_UNIVERSE_FILE, usecols=['ticker', 'asset_class']) | |
| asset_class_lookup = dict(zip(universe_df['ticker'], universe_df['asset_class'])) | |
| except Exception: | |
| asset_class_lookup = {} | |
| # 1. 找出所有獨特的產業,建立產業字典 (Columns) | |
| all_sectors = set() | |
| for ticker in etf_list: | |
| entry = db.get(ticker, {}) | |
| sector_weights = entry.get("Sector_Weights") if isinstance(entry, dict) else None | |
| if isinstance(sector_weights, dict) and sector_weights: | |
| all_sectors.update(sector_weights.keys()) | |
| else: | |
| asset_class = asset_class_lookup.get(ticker, "Unclassified") | |
| all_sectors.add(f"ASSET_CLASS::{asset_class}") | |
| sector_list = list(all_sectors) | |
| K = len(sector_list) | |
| N = len(etf_list) | |
| if K == 0: | |
| return None, [] | |
| # 2. 填入 N x K 權重矩陣 | |
| S_matrix = np.zeros((N, K)) | |
| sector_index = {sector: idx for idx, sector in enumerate(sector_list)} | |
| for i, ticker in enumerate(etf_list): | |
| entry = db.get(ticker, {}) | |
| sector_weights = entry.get("Sector_Weights") if isinstance(entry, dict) else None | |
| if isinstance(sector_weights, dict) and sector_weights: | |
| for sector, weight in sector_weights.items(): | |
| if sector in sector_index: | |
| S_matrix[i, sector_index[sector]] = weight | |
| else: | |
| # 防呆機制:若缺失該檔 ETF 的產業資料,強制將其視為 100% 未知產業 | |
| # 在最佳化時會對其施加最嚴厲的集中度懲罰 | |
| fallback_bucket = f"ASSET_CLASS::{asset_class_lookup.get(ticker, 'Unclassified')}" | |
| S_matrix[i, sector_index[fallback_bucket]] = 1.0 | |
| # 降為 DEBUG:此函式每期/每次評分都會呼叫(含單檔基準評分),於 INFO 過於吵雜; | |
| # 有意義的數量(特徵宇宙/DEA 池/候選數/選入數/持股數)已記於 backtest diagnostics 表。 | |
| log.debug(f"📊 成功建立真實產業矩陣 (維度: {N} 檔 ETF x {K} 個產業)") | |
| return S_matrix, sector_list | |
| def plot_portfolio_analytics_and_mpt(returns_matrix, optimal_weights, max_sharpe_weights, tickers): | |
| """視覺化模組:繪製歷史軌跡與 MPT 效率前緣 (結合精確解析解)""" | |
| log.info("\n啟動視覺化模組:計算歷史軌跡與 MPT 效率前緣...") | |
| sns.set_theme(style="whitegrid") | |
| plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei', 'Noto Sans CJK TC', 'Noto Sans CJK SC', 'PingFang HK', 'Arial Unicode MS'] | |
| plt.rcParams['axes.unicode_minus'] = False | |
| # 建立輸出資料夾 | |
| os.makedirs("png", exist_ok=True) | |
| # --- 圖表 1:歷史淨值曲線與最大回撤 --- | |
| # 圖 1 改用 buy-and-hold 路徑:第一天按建議權重買入,之後一路持有到 lookback 結束。 | |
| port_daily_returns, cumulative_returns = calc_buy_and_hold_portfolio_path(returns_matrix, optimal_weights) | |
| peak = cumulative_returns.cummax() | |
| drawdown = (cumulative_returns - peak) / peak | |
| fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), gridspec_kw={'height_ratios': [3, 1]}, sharex=True) | |
| ax1.plot(cumulative_returns.index, cumulative_returns, color='navy', linewidth=2) | |
| ax1.set_title('Portfolio Cumulative Returns', fontsize=14, fontweight='bold') | |
| ax1.set_ylabel('Cumulative Value (base=1)', fontsize=12) | |
| ax2.fill_between(drawdown.index, drawdown * 100, 0, color='red', alpha=0.5) | |
| ax2.plot(drawdown.index, drawdown * 100, color='darkred', linewidth=1) | |
| ax2.set_title('Drawdown (%)', fontsize=14, fontweight='bold') | |
| ax2.set_ylabel('Drawdown (%)', fontsize=12) | |
| ax2.set_xlabel('Date', fontsize=12) | |
| plt.tight_layout() | |
| # [停用] 依使用者要求不再輸出投組績效圖(buy&hold 淨值/回撤,描述性)。 | |
| # plt.savefig(f'png/{case}_portfolio_performance.png', dpi=300) | |
| plt.close() | |
| # log.info(f"✅ 產出圖表:png/{case}_portfolio_performance.png") | |
| # --- 圖表 2:蒙地卡羅模擬 MPT 效率前緣 --- | |
| # 視覺化報酬軸改用算術平均年化(資本利得),與 Sharpe 口徑一致。 | |
| annual_returns = returns_matrix.mean() * 252 | |
| cov_matrix = returns_matrix.cov() * 252 | |
| rf_rate = 0.04 # 假設無風險利率 4% | |
| # 1. 計算偏好驅動組合落點 | |
| port_vol = np.sqrt(np.dot(optimal_weights.T, np.dot(cov_matrix, optimal_weights))) | |
| port_ret = np.dot(optimal_weights.T, annual_returns) | |
| # 2. 🚨 新增:計算精確的 Max Sharpe 組合落點 | |
| exact_ms_vol = np.sqrt(np.dot(max_sharpe_weights.T, np.dot(cov_matrix, max_sharpe_weights))) | |
| exact_ms_ret = np.dot(max_sharpe_weights.T, annual_returns) | |
| ms_sharpe_ratio = (exact_ms_ret - rf_rate) / exact_ms_vol if exact_ms_vol > 0 else 0 | |
| # 3. 改良版蒙地卡羅模擬 (加入稀疏性與權重上限以拓展邊界) | |
| # 蒙地卡羅圖已停用;只保留 1 筆 dummy sample 讓舊繪圖區塊不耗時,實際不再輸出該圖。 | |
| num_portfolios = 1 | |
| results = np.zeros((3, num_portfolios)) | |
| MAX_WEIGHT_LIMIT = parameters.MAX_WEIGHT_LIMIT # 🚨 你的單一標的最大權重限制 | |
| for i in range(num_portfolios): | |
| weights = np.random.random(len(tickers)) | |
| # 隨機將 50%~80% 的資產權重歸零,強迫模擬極端集中的組合 | |
| if i > num_portfolios // 4: | |
| mask = np.random.rand(len(tickers)) > (np.random.uniform(0.2, 0.5)) | |
| weights[mask] = 0 | |
| # 🚨 防呆機制:若上限為 40%,至少需要 3 檔 ETF (ceil(1/0.4)) 才能湊滿 100% | |
| min_required_assets = int(np.ceil(1.0 / MAX_WEIGHT_LIMIT)) | |
| if np.sum(weights > 0) < min_required_assets: | |
| # 隨機挑選足夠數量的資產補上隨機權重,避免數學上無解 | |
| fill_indices = np.random.choice(len(tickers), min_required_assets, replace=False) | |
| weights[fill_indices] = np.random.random(min_required_assets) | |
| # 初始正規化 (總和為 1) | |
| weights /= np.sum(weights) | |
| # 🚨 核心:迭代溢流分配法 (Water-filling) 來嚴格限制最大權重 | |
| while np.any(weights > MAX_WEIGHT_LIMIT): | |
| # 找出超過上限的索引 | |
| excess_idx = weights > MAX_WEIGHT_LIMIT | |
| # 找出未達上限的索引 (可以接受溢流的對象) | |
| avail_idx = weights < MAX_WEIGHT_LIMIT | |
| # 將超過的部分強制砍平到上限 | |
| weights[excess_idx] = MAX_WEIGHT_LIMIT | |
| # 計算被砍掉的總溢流權重 (還差多少才能補滿 100%) | |
| excess_sum = 1.0 - np.sum(weights) | |
| # 將這些無處安放的權重,按比例分配給其他未達上限的資產 | |
| avail_sum = np.sum(weights[avail_idx]) | |
| if avail_sum > 0: | |
| weights[avail_idx] += excess_sum * (weights[avail_idx] / avail_sum) | |
| else: | |
| # 若剩下的權重都是 0,則直接均分 | |
| weights[avail_idx] += excess_sum / np.sum(avail_idx) | |
| # 最終確保浮點數精度的微小誤差 | |
| weights /= np.sum(weights) | |
| p_ret = np.dot(weights, annual_returns) | |
| p_vol = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights))) | |
| results[0,i] = p_vol | |
| results[1,i] = p_ret | |
| results[2,i] = (p_ret - 0.04) / p_vol | |
| # 先求出數學效率前緣需要的基礎組合,兩張 frontier 圖會共用同一組座標軸。 | |
| # 座標軸嚴格依照展示邏輯:x 軸到最大報酬組合波動率 + 3%,y 軸到最大報酬組合報酬率 + 5%。 | |
| def calc_vol(w): | |
| return np.sqrt(np.dot(w.T, np.dot(cov_matrix, w))) | |
| bounds_ef = tuple((0.0, MAX_WEIGHT_LIMIT) for _ in range(len(tickers))) # 沿用 MAX_WEIGHT_LIMIT 上限 | |
| initial_w = np.array([1.0 / len(tickers)] * len(tickers)) | |
| cons_sum = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0} | |
| # A. 尋找全局最小變異組合 (Global Minimum Variance, 曲線的最低點) | |
| res_gmv = minimize(calc_vol, initial_w, method='SLSQP', bounds=bounds_ef, constraints=[cons_sum]) | |
| min_vol_ret = np.dot(res_gmv.x, annual_returns) | |
| # B. 尋找最大報酬組合 (Max Return, 曲線的最高點) | |
| res_max = minimize(lambda w: -np.dot(w, annual_returns), initial_w, method='SLSQP', bounds=bounds_ef, constraints=[cons_sum]) | |
| if res_max.success: | |
| max_ret = np.dot(res_max.x, annual_returns) | |
| max_return_vol_pct = calc_vol(res_max.x) * 100 | |
| else: | |
| log.warning(f"⚠️ 最大報酬組合求解未完全收斂: {res_max.message}") | |
| max_ret = max(port_ret, exact_ms_ret) | |
| max_return_vol_pct = max(port_vol, exact_ms_vol) * 100 | |
| frontier_xlim = (0, max_return_vol_pct + 3) | |
| frontier_ylim = (0, max_ret * 100 + 5) | |
| # 從蒙地卡羅點雲抓出每個風險區間的最高報酬,作為第一張圖的 frontier 視覺近似線。 | |
| # 精確的效率前緣仍由下一張 Mathematical Efficient Frontier 圖負責呈現。 | |
| cloud_df = pd.DataFrame({ | |
| 'Volatility': results[0,:] * 100, | |
| 'Return': results[1,:] * 100 | |
| }).replace([np.inf, -np.inf], np.nan).dropna() | |
| frontier_cloud = pd.DataFrame(columns=['Volatility', 'Return']) | |
| if not cloud_df.empty: | |
| cloud_df['Vol_Bin'] = pd.cut(cloud_df['Volatility'], bins=90, duplicates='drop') | |
| # 只針對真的有投組落入的波動率區間取最高報酬,避免空區間做 idxmax 造成 Pandas 報錯。 | |
| binned_cloud = cloud_df.dropna(subset=['Vol_Bin']) | |
| if not binned_cloud.empty: | |
| frontier_idx = binned_cloud.groupby('Vol_Bin', observed=True)['Return'].idxmax().dropna() | |
| frontier_cloud = binned_cloud.loc[frontier_idx, ['Volatility', 'Return']].sort_values('Volatility') | |
| frontier_cloud['Return'] = frontier_cloud['Return'].cummax() | |
| frontier_cloud = frontier_cloud.drop_duplicates(subset=['Return']) | |
| fig, ax = plt.subplots(figsize=(11, 7.5), facecolor='white') | |
| ax.set_facecolor('#F8FAFC') | |
| # 將隨機組合點雲降為背景,避免搶走效率前緣與兩個關鍵組合的視覺焦點 | |
| scatter = ax.scatter( | |
| results[0,:] * 100, results[1,:] * 100, | |
| c=results[2,:], cmap='viridis', marker='o', s=9, | |
| alpha=0.22, linewidths=0, label='Feasible Portfolios', zorder=1 | |
| ) | |
| cbar = fig.colorbar(scatter, ax=ax, pad=0.02) | |
| cbar.set_label('Sharpe Ratio', fontsize=11) | |
| if len(frontier_cloud) > 1: | |
| ax.plot(frontier_cloud['Volatility'], frontier_cloud['Return'], | |
| color='#BBF7D0', linewidth=8, alpha=0.6, zorder=2) | |
| ax.plot(frontier_cloud['Volatility'], frontier_cloud['Return'], | |
| color='#059669', linewidth=3.4, | |
| label='Monte Carlo Frontier (approx.)', zorder=3) | |
| mpt_xlim, mpt_ylim = frontier_xlim, frontier_ylim | |
| # ========================================== | |
| # 繪製資本市場線 (CML / Tangent Line) | |
| # ========================================== | |
| cml_x = np.array([mpt_xlim[0], exact_ms_vol * 100, mpt_xlim[1]]) | |
| # 直線方程式: y = R_f + Sharpe * x | |
| cml_y = rf_rate * 100 + ms_sharpe_ratio * cml_x | |
| ax.plot(cml_x, cml_y, color='#F59E0B', linestyle='--', linewidth=2.8, | |
| label=f'Capital Market Line (CML, Rf={rf_rate*100:.0f}%)', zorder=3) | |
| # 標示 Y 軸上的無風險利率起點 | |
| ax.scatter(0, rf_rate * 100, color='#F59E0B', marker='D', s=95, edgecolor='black', linewidth=0.8, zorder=5) | |
| # 標示專屬客製化組合,使用高對比紅色星號凸顯「使用者偏好落點」 | |
| ax.scatter(port_vol * 100, port_ret * 100, color='#DC2626', marker='*', s=520, | |
| edgecolor='white', linewidth=1.8, | |
| label=f'Preference-Driven\n(Return: {port_ret*100:.1f}%, Risk: {port_vol*100:.1f}%)', zorder=6) | |
| # 標示全局最大夏普組合,使用藍色 X 作為傳統基準點 | |
| ax.scatter(exact_ms_vol * 100, exact_ms_ret * 100, color='#2563EB', marker='X', s=260, | |
| edgecolor='white', linewidth=1.6, | |
| label=f'Max Sharpe\n(Return: {exact_ms_ret*100:.1f}%, Risk: {exact_ms_vol*100:.1f}%)', zorder=6) | |
| ax.set_xlim(*mpt_xlim) | |
| ax.set_ylim(*mpt_ylim) | |
| ax.set_title('MPT Feasible Set & Capital Market Line', fontsize=17, fontweight='bold', pad=16) | |
| ax.set_xlabel('Annualized Volatility (Risk) %', fontsize=12) | |
| ax.set_ylabel('Expected Annual Return %', fontsize=12) | |
| ax.grid(True, color='#CBD5E1', linewidth=0.9, alpha=0.7) | |
| ax.spines[['top', 'right']].set_visible(False) | |
| ax.legend(loc='lower right', frameon=True, facecolor='white', edgecolor='#CBD5E1', framealpha=0.95) | |
| plt.tight_layout() | |
| # 蒙地卡羅效率前緣圖已取消輸出,避免與數學解析效率前緣圖混淆。 | |
| # plt.savefig(f"png/{case}_mpt_efficient_frontier.png", dpi=300) | |
| plt.close() | |
| log.info("蒙地卡羅 MPT 圖已停用,本次只輸出數學解析效率前緣圖。") | |
| # 圖表3. 數學解析法:計算教科書等級的「精確效率前緣」 | |
| log.info("⏳ 計算精確效率前緣曲線 (Mathematical Efficient Frontier)...") | |
| # C. 沿著 Y 軸 (報酬) 切割網格,找出每個目標報酬對應的絕對最小風險 | |
| target_returns = np.linspace(min_vol_ret, max_ret, 200) | |
| efficient_vols = [] | |
| valid_returns = [] | |
| for target_ret in target_returns: | |
| cons_ef = [ | |
| {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}, | |
| {'type': 'eq', 'fun': lambda w: np.dot(w, annual_returns) - target_ret} | |
| ] | |
| res_ef = minimize(calc_vol, initial_w, method='SLSQP', bounds=bounds_ef, constraints=cons_ef) | |
| if res_ef.success: | |
| efficient_vols.append(calc_vol(res_ef.x)) | |
| valid_returns.append(target_ret) | |
| efficient_vols = np.array(efficient_vols) * 100 | |
| valid_returns = np.array(valid_returns) * 100 | |
| # ========================================== | |
| # 繪圖區塊:乾淨的教科書風格 | |
| # ========================================== | |
| fig, ax = plt.subplots(figsize=(11, 7.5), facecolor='white') | |
| ax.set_facecolor('#F8FAFC') | |
| # 用淡色陰影強化「可行集合」與效率前緣上方邊界的關係 | |
| if len(efficient_vols) > 1: | |
| ax.fill_between(efficient_vols, valid_returns, y2=np.min(valid_returns) - 1.0, | |
| color='#A7F3D0', alpha=0.18, zorder=1) | |
| # 效率前緣是本圖主角:先畫柔和外光,再疊上深綠主線 | |
| ax.plot(efficient_vols, valid_returns, color='#BBF7D0', linewidth=9, alpha=0.75, zorder=2) | |
| ax.plot(efficient_vols, valid_returns, color='#059669', linewidth=4.2, | |
| label='Efficient Frontier', zorder=3) | |
| # 標示專屬客製化組合,使用紅色星號凸顯偏好位置 | |
| ax.scatter(port_vol * 100, port_ret * 100, color='#DC2626', marker='*', s=560, | |
| edgecolor='white', linewidth=1.8, | |
| label=f'Preference-Driven\n(Return: {port_ret*100:.1f}%, Risk: {port_vol*100:.1f}%)', zorder=7) | |
| # 標示全局最大夏普組合,藍色 X 對應傳統風險調整報酬基準 | |
| ax.scatter(exact_ms_vol * 100, exact_ms_ret * 100, color='#2563EB', marker='X', s=280, | |
| edgecolor='white', linewidth=1.6, | |
| label=f'Max Sharpe\n(Return: {exact_ms_ret*100:.1f}%, Risk: {exact_ms_vol*100:.1f}%)', zorder=7) | |
| math_xlim, math_ylim = frontier_xlim, frontier_ylim | |
| # ========================================== | |
| # 繪製資本市場線 (CML / Tangent Line) | |
| # ========================================== | |
| cml_x = np.array([math_xlim[0], exact_ms_vol * 100, math_xlim[1]]) | |
| # 直線方程式: y = R_f + Sharpe * x | |
| cml_y = rf_rate * 100 + ms_sharpe_ratio * cml_x | |
| ax.plot(cml_x, cml_y, color='#F59E0B', linestyle='--', linewidth=2.8, | |
| label=f'Capital Market Line (CML, Rf={rf_rate*100:.0f}%)', zorder=4) | |
| # 標示 Y 軸上的無風險利率起點 | |
| ax.scatter(0, rf_rate * 100, color='#F59E0B', marker='D', s=95, edgecolor='black', linewidth=0.8, zorder=6) | |
| ax.set_title('Mathematical Efficient Frontier\n' | |
| '(Descriptive: portfolio solved by C2/BL, not optimized on this mu-sigma frontier; Max Sharpe = in-sample reference)', | |
| fontsize=13, fontweight='bold', pad=12) | |
| ax.set_xlabel('Annualized Volatility (Risk) %', fontsize=12) | |
| ax.set_ylabel('Expected Annual Return %', fontsize=12) | |
| ax.set_xlim(*math_xlim) | |
| ax.set_ylim(*math_ylim) | |
| ax.grid(True, color='#CBD5E1', linewidth=0.9, alpha=0.7) | |
| ax.spines[['top', 'right']].set_visible(False) | |
| ax.legend(loc='lower right', frameon=True, facecolor='white', edgecolor='#CBD5E1', framealpha=0.95) | |
| plt.tight_layout() | |
| plt.savefig(f"png/{case}_Mathematical Efficient Frontier.png", dpi=300) | |
| plt.close() | |
| log.info(f"✅ 產出圖表:png/{case}_Mathematical Efficient Frontier.png") | |
| def plot_preference_radar_chart(optimal_weights, max_sharpe_weights, vectors_dict, blended_weights, pref_metrics, ms_metrics, cov_matrix, tickers, max_weight_limit=parameters.MAX_WEIGHT_LIMIT, radar_score_bounds=None): | |
| """ | |
| 繪製 9 維度效用雷達圖,整合正規化效用圖形與真實物理數據,並自動突顯高權重維度。 | |
| 針對「抗回撤」採用真實投資組合的非線性風險數據進行雷達圖映射。 | |
| 波動率使用固定尺度映射,避免每次因候選 ETF universe 改變而造成視覺尺度不穩定。 | |
| """ | |
| log.info("\n啟動視覺化模組:繪製帶有真實數據的多維度效用雷達圖...") | |
| sns.set_theme(style="whitegrid") | |
| plt.rcParams['font.sans-serif'] = ['Microsoft JhengHei', 'Noto Sans CJK TC', 'Noto Sans CJK SC', 'PingFang HK', 'Arial Unicode MS'] | |
| plt.rcParams['axes.unicode_minus'] = False | |
| categories = list(blended_weights.keys()) | |
| N = len(categories) | |
| # ========================================== | |
| # 固定尺度映射:波動率使用預先設定的合理區間,不再重新求 GMV 邊界 | |
| # ========================================== | |
| log.info(f" [雷達圖尺度] 波動率分數固定映射區間: {VOL_SCORE_FLOOR*100:.1f}% ~ {VOL_SCORE_CAP*100:.1f}%") | |
| # ========================================== | |
| # 核心修改:動態分配各維度的真實效用分數 | |
| # ========================================== | |
| pref_scores = [] | |
| ms_scores = [] | |
| radar_score_bounds = radar_score_bounds or {} | |
| def bounded_score(value, lower_bound, upper_bound): | |
| """Map a real metric to 0-1 using fixed or theoretically feasible bounds.""" | |
| if pd.isna(value) or pd.isna(lower_bound) or pd.isna(upper_bound): | |
| return 0.5 | |
| if upper_bound <= lower_bound: | |
| return 0.5 | |
| return float(np.clip((value - lower_bound) / (upper_bound - lower_bound), 0.0, 1.0)) | |
| def relative_pair_score(pref_value, ms_value, higher_is_better=True, low=0.4, high=0.9): | |
| """Map two portfolio-level real metrics to radar scores with correct direction.""" | |
| if pd.isna(pref_value) or pd.isna(ms_value): | |
| return 0.5, 0.5 | |
| if np.isclose(pref_value, ms_value, rtol=1e-6, atol=1e-9): | |
| return 0.65, 0.65 | |
| pref_wins = pref_value > ms_value if higher_is_better else pref_value < ms_value | |
| return (high, low) if pref_wins else (low, high) | |
| for cat in categories: | |
| if cat == "Return_CAGR" and "Beta_vs_VT" in pref_metrics and not np.isnan(pref_metrics["Beta_vs_VT"]): | |
| # ★報酬軸改用「系統性風險曝險(對 VT 的 beta)」評分,與新偏好評分一致(見 beta 評分)。 | |
| # 市場 beta=1→0.5;beta=PREF_BETA_REF→1.0;低於市場 floor 0.5(不懲罰)。 | |
| # 雷達圖專用 beta 滿分參考(RADAR_BETA_REF,預設 1.2);與偏好分數 PREF_BETA_REF 解耦, | |
| # 不影響已驗證的 win_VT。本系統 beta 天花板約 1.1~1.2,故用 1.2 讓報酬軸有鑑別度。 | |
| _ref = float(getattr(parameters, "RADAR_BETA_REF", | |
| getattr(parameters, "PREF_BETA_REF", 2.0))) | |
| _bscore = lambda b: 0.5 + 0.5 * float(np.clip((b - 1.0) / max(_ref - 1.0, 1e-9), 0.0, 1.0)) | |
| p_score = _bscore(pref_metrics["Beta_vs_VT"]) | |
| m_score = _bscore(ms_metrics.get("Beta_vs_VT", 1.0)) | |
| pref_scores.append(p_score) | |
| ms_scores.append(m_score) | |
| elif cat == "Return_CAGR" and "Arithmetic_Ret" in pref_metrics: | |
| # 後備:取不到 beta 時,沿用算術平均年化報酬映射。 | |
| lower_bound, upper_bound = radar_score_bounds.get("Return_CAGR", (4.0, 20.0)) | |
| p_score = bounded_score(pref_metrics["Arithmetic_Ret"], lower_bound, upper_bound) | |
| m_score = bounded_score(ms_metrics["Arithmetic_Ret"], lower_bound, upper_bound) | |
| pref_scores.append(p_score) | |
| ms_scores.append(m_score) | |
| elif cat == "Return_Div" and "Div_Yield" in pref_metrics: | |
| # 殖利率軸改用「固定尺度映射」(0%~5%),取代原本的相對勝負映射。 | |
| # 原相對映射不論差距大小,贏家固定 0.9、輸家固定 0.4,會把「1.76% vs 1.77%」 | |
| # 這種 0.01% 的微小差距畫成雷達圖上的巨大落差(純視覺假象)。改固定尺度後, | |
| # 兩者皆 ≈0.35,幾乎重疊,忠實反映真實差距;高殖利率才會明顯往外擴。 | |
| DIV_SCORE_FLOOR, DIV_SCORE_CAP = 0.0, 5.0 | |
| p_score = bounded_score(pref_metrics["Div_Yield"], DIV_SCORE_FLOOR, DIV_SCORE_CAP) | |
| m_score = bounded_score(ms_metrics["Div_Yield"], DIV_SCORE_FLOOR, DIV_SCORE_CAP) | |
| pref_scores.append(p_score) | |
| ms_scores.append(m_score) | |
| elif cat == "Risk_Vol" and "Volatility" in pref_metrics: | |
| # 1. 波動率:使用固定尺度映射,低波動者分數較高 | |
| p_vol = pref_metrics["Volatility"] | |
| m_vol = ms_metrics["Volatility"] | |
| p_score = np.clip( | |
| 1.0 - ((p_vol / 100.0 - VOL_SCORE_FLOOR) / (VOL_SCORE_CAP - VOL_SCORE_FLOOR)), | |
| 0.0, | |
| 1.0 | |
| ) | |
| m_score = np.clip( | |
| 1.0 - ((m_vol / 100.0 - VOL_SCORE_FLOOR) / (VOL_SCORE_CAP - VOL_SCORE_FLOOR)), | |
| 0.0, | |
| 1.0 | |
| ) | |
| pref_scores.append(p_score) | |
| ms_scores.append(m_score) | |
| elif cat == "Risk_MaxDD" and "MaxDD" in pref_metrics: | |
| # 2. 最大回撤:維持使用 0% ~ 40% 作為視覺邊界 | |
| p_dd = abs(pref_metrics["MaxDD"]) | |
| m_dd = abs(ms_metrics["MaxDD"]) | |
| p_score = np.clip(1.0 - (p_dd / 40.0), 0.0, 1.0) | |
| m_score = np.clip(1.0 - (m_dd / 40.0), 0.0, 1.0) | |
| pref_scores.append(p_score) | |
| ms_scores.append(m_score) | |
| elif cat == "Liq_AUM" and "AUM" in pref_metrics: | |
| # 放棄線性分數加總,直接拿真實組合規模 (如 99.4B) 來評分 | |
| # 設定 50B 為滿分 1.0 門檻 | |
| p_aum = pref_metrics["AUM"] | |
| m_aum = ms_metrics["AUM"] | |
| p_score = np.clip(p_aum / 50.0, 0.0, 1.0) | |
| m_score = np.clip(m_aum / 50.0, 0.0, 1.0) | |
| pref_scores.append(p_score) | |
| ms_scores.append(m_score) | |
| elif cat == "Liq_Volume" and "Volume" in pref_metrics: | |
| # 設定 10M (一千萬) 為滿分 1.0 門檻 | |
| p_vol = pref_metrics["Volume"] | |
| m_vol = ms_metrics["Volume"] | |
| p_score = np.clip(p_vol / 10.0, 0.0, 1.0) | |
| m_score = np.clip(m_vol / 10.0, 0.0, 1.0) | |
| pref_scores.append(p_score) | |
| ms_scores.append(m_score) | |
| elif cat == "FinBERT_score" and "Sentiment" in pref_metrics: | |
| # 🚨 動態相對映射:確保原始分數較高者,在圖形上「絕對」更靠外 | |
| p_val, m_val = pref_metrics["Sentiment"], ms_metrics["Sentiment"] | |
| max_v, min_v = max(p_val, m_val), min(p_val, m_val) | |
| if max_v > min_v: | |
| # 較低者拿 0.4 基礎分,較高者依比例往上疊加至最高 0.9 | |
| p_score = 0.4 + 0.5 * ((p_val - min_v) / (max_v - min_v)) | |
| m_score = 0.4 + 0.5 * ((m_val - min_v) / (max_v - min_v)) | |
| else: | |
| p_score, m_score = 0.5, 0.5 | |
| pref_scores.append(p_score); ms_scores.append(m_score) | |
| else: | |
| # 3. 其他維度:維持預先算好的線性加權 | |
| pref_scores.append(np.dot(optimal_weights, vectors_dict[cat])) | |
| ms_scores.append(np.dot(max_sharpe_weights, vectors_dict[cat])) | |
| # ========================================== | |
| # 找出使用者最重視的前 3 個維度 | |
| top_3_cats = sorted(categories, key=lambda c: blended_weights[c], reverse=True)[:3] | |
| metric_map = { | |
| # 報酬軸以「對 VT 的 beta(系統性風險曝險)」評分(見上方 beta 評分), | |
| # 因此這裡顯示的原始代理值也改成 beta,避免雷達位置(以 beta)與標註數字(報酬率)互相矛盾。 | |
| "Return_CAGR": ("Beta_vs_VT", "β={:.2f}"), "Return_Div": ("Div_Yield", "{:.2f}%"), | |
| "Risk_Vol": ("Volatility", "{:.2f}%"), "Risk_MaxDD": ("MaxDD", "{:.2f}%"), | |
| "Cost_ExpRatio": ("Cost", "{:.3f}%"), "Div_Score": ("Proxy_Div", "{:.4f}"), | |
| "FinBERT_score": ("Sentiment", "{:.4f}"), "Liq_Volume": ("Volume", "{:,.1f}M"), | |
| "Liq_AUM": ("AUM", "{:,.1f}B") | |
| } | |
| label_map = { | |
| "Return_CAGR": "Return (beta vs VT)", "Return_Div": "Div Yield", "Risk_Vol": "Low Volatility", | |
| "Risk_MaxDD": "Low Drawdown", "Cost_ExpRatio": "Low Cost", "Liq_Volume": "Volume", | |
| "Liq_AUM": "AUM", "Div_Score": "Diversification", "FinBERT_score": "Sentiment" | |
| } | |
| axis_labels = [] | |
| for cat in categories: | |
| weight_pct = blended_weights[cat] * 100 | |
| metric_key, fmt = metric_map.get(cat, (None, "")) | |
| prefix = "* " if cat in top_3_cats else "" | |
| title = f"{prefix}{label_map.get(cat, cat)} ({weight_pct:.1f}%)" | |
| if (metric_key and metric_key in pref_metrics and metric_key in ms_metrics | |
| and pd.notna(pref_metrics[metric_key]) and pd.notna(ms_metrics[metric_key])): | |
| pref_val = fmt.format(pref_metrics[metric_key]) | |
| ms_val = fmt.format(ms_metrics[metric_key]) | |
| data_text = f"Pref: {pref_val}\nMaxSharpe: {ms_val}" | |
| else: | |
| data_text = "(n/a)" | |
| axis_labels.append(f"{title}\n{data_text}") | |
| angles = [n / float(N) * 2 * np.pi for n in range(N)] | |
| angles += angles[:1] | |
| pref_scores += pref_scores[:1] | |
| ms_scores += ms_scores[:1] | |
| fig, ax = plt.subplots(figsize=(12, 12), subplot_kw=dict(polar=True)) | |
| ax.set_theta_offset(np.pi / 2) | |
| ax.set_theta_direction(-1) | |
| ax.set_xticks(angles[:-1]) | |
| ax.set_xticklabels(axis_labels, size=14, linespacing=1.5) | |
| ax.tick_params(axis='x', pad=30) | |
| for i, label in enumerate(ax.get_xticklabels()): | |
| if categories[i] in top_3_cats: | |
| label.set_color('darkred') | |
| label.set_fontweight('bold') | |
| ax.set_rlabel_position(30) | |
| plt.yticks([0.2, 0.4, 0.6, 0.8, 1.0], ["0.2", "0.4", "0.6", "0.8", "1.0 (Max)"], color="grey", size=9) | |
| plt.ylim(0, 1.05) | |
| ax.plot(angles, ms_scores, color='blue', linewidth=2, linestyle='dashed', label='Max Sharpe') | |
| ax.fill(angles, ms_scores, color='blue', alpha=0.1) | |
| ax.plot(angles, pref_scores, color='red', linewidth=3, label='Preference-Driven (red area = utility advantage)') | |
| ax.fill(angles, pref_scores, color='red', alpha=0.3) | |
| plt.title('Multi-Dimensional Portfolio Decision Analysis', size=22, fontweight='bold', y=1.15) | |
| plt.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1), frameon=True, shadow=True) | |
| output_path = f"png/{case}_radar_chart.png" | |
| plt.tight_layout() | |
| plt.savefig(output_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| log.info(f"✅ 產出圖表:{output_path}") | |
| def run_stage3_pipeline(): | |
| log.info("啟動 Stage 3: 偏好驅動投資組合最佳化與深度分析...") | |
| AV_DB_FILE = parameters.AV_DB_FILE | |
| try: | |
| df_stage2 = pd.read_csv("csv/stage2_final_user_universe.csv") | |
| df_stage0 = pd.read_csv("csv/stage0_final_matrix.csv") | |
| df_scaled_features = pd.read_csv("csv/stage2_normalized_features.csv") | |
| with open("json/stage2_ahp_global_weights.json", "r", encoding="utf-8") as f: | |
| # 🚨 修正:提取 9 個全局權重 | |
| global_weights = json.load(f)["Global_Weights"] | |
| except FileNotFoundError: | |
| log.error("❌ 找不到必要檔案,請確認 csv 檔案與 AHP 權重檔案存在。") | |
| sys.exit(1) | |
| return | |
| tickers = df_stage2['ETF'].tolist() | |
| # 抓取歷史價格並對齊資料 | |
| log.info(f"⏳ 載入 {len(tickers)} 檔 ETF 進行最佳化與歷史回測...") | |
| price_matrix = get_or_fetch_historical_prices(tickers) | |
| returns_matrix = prepare_aligned_returns(price_matrix, tickers) | |
| # 🚨 修正:先向下填補(處理中間斷層),再向上填補(處理頭部缺漏) | |
| # 最後才使用 dropna(axis=1) 把那些「即使雙向填補後依然全是 NaN」的無效標的給剔除 | |
| # 對齊有效 Tickers | |
| valid_tickers = returns_matrix.columns.tolist() | |
| returns_matrix = returns_matrix[valid_tickers] | |
| n_assets = len(valid_tickers) | |
| df_merged = df_stage2[['ETF', 'User_Pref_Score']].merge(df_stage0, on='ETF', how='left') | |
| df_merged_valid = df_merged.set_index('ETF').loc[valid_tickers].reset_index() | |
| sector_matrix, sector_names = build_sector_matrix(valid_tickers, AV_DB_FILE) | |
| # 提取對應有效標的的正規化特徵 | |
| df_scaled_valid = df_scaled_features.set_index('ETF').loc[valid_tickers].reset_index() | |
| # --- 輸出共變異數矩陣 CSV --- | |
| cov_matrix_annual = returns_matrix.cov() * 252 | |
| # --- 目標函數參數準備 --- | |
| log.info(f"\n⚖️ 啟動權重融合機制 (Alpha = {ALPHA_BASELINE})") | |
| log.info("-" * 50) | |
| # 進行 Alpha 凸組合融合 (Convex Combination) | |
| blended_weights = {} | |
| for key in BASELINE_WEIGHTS.keys(): | |
| user_w = global_weights.get(key, 0.0) | |
| base_w = BASELINE_WEIGHTS[key] | |
| # 融合公式:W_final = α * W_base + (1 - α) * W_user | |
| blended_w = (ALPHA_BASELINE * base_w) + ((1 - ALPHA_BASELINE) * user_w) | |
| blended_weights[key] = blended_w | |
| log.info(f"{key:<15}: 使用者 {user_w*100:>5.2f}% | 融合後 -> {blended_w*100:>5.2f}%") | |
| log.info("-" * 50) | |
| # 將融合後的安全權重,指派給最佳化引擎使用的全局變數 | |
| w_cagr = blended_weights["Return_CAGR"] | |
| w_div = blended_weights["Return_Div"] | |
| w_vol_risk = blended_weights["Risk_Vol"] | |
| w_maxdd = blended_weights["Risk_MaxDD"] | |
| w_cost = blended_weights["Cost_ExpRatio"] | |
| w_liq_vol = blended_weights["Liq_Volume"] | |
| w_liq_aum = blended_weights["Liq_AUM"] | |
| w_div_score = blended_weights["Div_Score"] | |
| w_sent = blended_weights["FinBERT_score"] | |
| # --- 🚨直接讀取已縮尾之正規化數據--- | |
| vec_cagr = df_scaled_valid['Norm_Return_CAGR'].values | |
| vec_div = df_scaled_valid['Norm_Return_Div'].values | |
| vec_maxdd = df_scaled_valid['Norm_Risk_MaxDD'].values | |
| vec_cost = df_scaled_valid['Norm_Cost_ExpRatio'].values | |
| vec_liq_vol = df_scaled_valid['Norm_Liq_Volume'].values | |
| vec_liq_aum = df_scaled_valid['Norm_Liq_AUM'].values | |
| vec_div_score = df_scaled_valid['Norm_Div_Score'].values | |
| vec_sent = df_scaled_valid['Norm_FinBERT'].values | |
| # 共變異數矩陣動態正規化 (V_p = w^T * Sigma * w) | |
| cov_matrix_annual = compute_cov_annual(returns_matrix) | |
| # 投組層級 beta(對 VT)向量,供雷達報酬軸(beta 評分,與偏好分數一致)與報表使用。 | |
| _anchor_m = getattr(parameters, "BETA_ANCHOR_TICKER", None) or getattr(parameters, "DEFAULT_BENCHMARK_TICKER", "VT") | |
| _c_m, _vb_m = compute_benchmark_cov_vector(returns_matrix, get_benchmark_returns_aligned(returns_matrix, _anchor_m)) | |
| beta_vec_metrics = (_c_m / _vb_m) if (_c_m is not None and _vb_m and _vb_m > 0) else None | |
| returns_values_for_true_mdd = np.nan_to_num(returns_matrix.values, nan=0.0) | |
| true_mdd_lower_bound, true_mdd_upper_bound = calculate_individual_maxdd_bounds(returns_matrix) | |
| if USE_TRUE_MDD_OPTIMIZATION: | |
| log.info( | |
| f" [MaxDD 效用] 使用真實投組 MaxDD;分數尺度: " | |
| f"{true_mdd_lower_bound*100:.2f}% ~ {true_mdd_upper_bound*100:.2f}%," | |
| f"超時觀察門檻 {TRUE_MDD_TIME_WARNING_SECONDS:.0f} 秒。" | |
| ) | |
| # 偏好分數「報酬維度」的 beta 評分向量(展示/評估用;與 backtest_engine 同一套)。 | |
| # 報酬分數 = 0.5 + 0.5·clip((beta−1)/(REF−1),0,1):市場 beta=1→0.5、低於市場不懲罰、beta=REF→1.0。 | |
| # ★只供 for_display=True 的展示分數使用,不進求解器目標★(求解器仍用 vec_cagr)。 | |
| beta_score_vec = None | |
| if str(getattr(parameters, "PREF_RETURN_BASIS", "cagr")).lower() == "beta": | |
| _anchor = getattr(parameters, "BETA_ANCHOR_TICKER", None) or getattr(parameters, "DEFAULT_BENCHMARK_TICKER", "VT") | |
| _bench_ret = get_benchmark_returns_aligned(returns_matrix, _anchor) | |
| _c, _vb = compute_benchmark_cov_vector(returns_matrix, _bench_ret) | |
| if _c is not None and _vb and _vb > 0: | |
| _beta = _c / _vb | |
| _ref = float(getattr(parameters, "PREF_BETA_REF", 2.0)) | |
| beta_score_vec = 0.5 + 0.5 * np.clip((_beta - 1.0) / max(_ref - 1.0, 1e-9), 0.0, 1.0) | |
| # ========================================== | |
| # 核心:定義全局效用函數 U(P) | |
| # ========================================== | |
| def calc_utility(w, for_display=False): | |
| """計算投資組合在當前 AHP 權重下的總效用分數 (Utility)。 | |
| for_display=True 且 PREF_RETURN_BASIS=="beta" 時,報酬維度改用 beta 評分(展示用,不影響求解器)。""" | |
| if for_display and beta_score_vec is not None: | |
| port_cagr = np.dot(w, beta_score_vec) # beta 評分(展示) | |
| else: | |
| port_cagr = np.dot(w, vec_cagr) # 過去 CAGR 排名(求解器目標 / cagr 基礎) | |
| port_div = np.dot(w, vec_div) | |
| proxy_maxdd = np.dot(w, vec_maxdd) | |
| true_maxdd_score = ( | |
| calculate_true_maxdd_score(w, returns_values_for_true_mdd, true_mdd_lower_bound, true_mdd_upper_bound) | |
| if USE_TRUE_MDD_OPTIMIZATION | |
| else None | |
| ) | |
| # 若真實 MaxDD 計算失效,保留舊版線性加權 proxy,方便未來效能或穩定性測試時切回。 | |
| port_maxdd = proxy_maxdd if true_maxdd_score is None else true_maxdd_score | |
| port_cost = np.dot(w, vec_cost) | |
| port_liq_vol = np.dot(w, vec_liq_vol) | |
| port_liq_aum = np.dot(w, vec_liq_aum) | |
| # 🚨 分散度計算邏輯分歧 | |
| if USE_TRUE_HHI_OPTIMIZATION and sector_matrix is not None: | |
| # 1. 算出投資組合在各產業的絕對總曝險 (1 x K 向量) | |
| port_sector_exposures = np.dot(w, sector_matrix) | |
| # 2. 計算真實投資組合 HHI (各產業權重的平方和) | |
| true_hhi = np.sum(port_sector_exposures ** 2) | |
| # 3. 轉換為 AHP 可理解的「正向效用」 | |
| # HHI 範圍是 0 到 1 (越小越分散)。因此 1 - HHI = 真實分散度分數 | |
| port_div_score= 1.0 - true_hhi | |
| else: | |
| # 舊演算法:線性加權的代理指標 | |
| port_div_score = np.dot(w, vec_div_score) | |
| port_sent = np.dot(w, vec_sent) | |
| port_vol = np.sqrt(np.dot(w.T, np.dot(cov_matrix_annual, w))) | |
| port_vol_penalty = np.clip( | |
| (port_vol - VOL_SCORE_FLOOR) / (VOL_SCORE_CAP - VOL_SCORE_FLOOR), | |
| 0.0, | |
| 1.0 | |
| ) | |
| port_vol_score = 1.0 - port_vol_penalty | |
| U = (w_cagr * port_cagr) + (w_div * port_div) \ | |
| + (w_liq_vol * port_liq_vol) + (w_liq_aum * port_liq_aum) \ | |
| + (w_div_score * port_div_score) + (w_sent * port_sent) \ | |
| + (w_maxdd * port_maxdd) + (w_cost * port_cost) \ | |
| + (w_vol_risk * port_vol_score) | |
| return U | |
| def objective_function(w): | |
| # SciPy 求最小,故回傳負效用 | |
| return -calc_utility(w) | |
| # ... 執行偏好驅動最佳化 ... | |
| MAX_WEIGHT_LIMIT = parameters.MAX_WEIGHT_LIMIT | |
| weight_bounds = tuple((0.0, MAX_WEIGHT_LIMIT) for _ in range(n_assets)) #上限為 0.40 (防止單一標的過度集中,最高 40%) | |
| bounds = tuple((0.0, 1.0) for _ in range(n_assets)) | |
| constraints = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}) | |
| initial_w = np.array([1.0 / n_assets] * n_assets) | |
| optimization_start_time = time.perf_counter() | |
| if str(getattr(parameters, "OPTIMIZATION_ARM", "A")).upper() == "B": | |
| # Arm B(U-1 + U-2):真實期望報酬(資本利得算術年化 + 殖利率)+ 風險預算約束。 | |
| # 品質維度(成本/流動性/分散/情緒)的偏好已在 Stage 2_2 候選篩選層表達,這裡聚焦報酬/風險配置。 | |
| cap_gain_arith = returns_matrix.mean().values * 252.0 | |
| # 對「資本利得樣本平均」做收縮(殖利率較穩定故不收縮),降低均值估計雜訊。 | |
| cap_gain_arith = shrink_mean_returns(cap_gain_arith) | |
| div_yield_vec = df_merged_valid['Return_Div (%)'].fillna(0.0).values / 100.0 | |
| # 殖利率依使用者報酬子維度偏好比例加權(Return_Div / Return_CAGR),而非 1:1。 | |
| # 反映「在此使用者眼裡 1% 股息 ≠ 1% 資本利得」;預設使用者比例約 0.10/0.40 = 0.25。 | |
| div_pref_ratio = global_weights.get("Return_Div", 0.0) / max(global_weights.get("Return_CAGR", 0.0), 1e-6) | |
| mu_total = cap_gain_arith + div_pref_ratio * div_yield_vec | |
| lam = float(getattr(parameters, "RISK_AVERSION_LAMBDA", 2.0)) | |
| vol_budget = float(getattr(parameters, "RISK_BUDGET_VOL", 0.30)) | |
| def objective_function_armB(w): | |
| port_ret = np.dot(w, mu_total) | |
| port_var = np.dot(w.T, np.dot(cov_matrix_annual, w)) | |
| return -(port_ret - 0.5 * lam * port_var) | |
| constraints_armB = [ | |
| {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}, | |
| {'type': 'ineq', 'fun': lambda w: vol_budget - np.sqrt(np.dot(w.T, np.dot(cov_matrix_annual, w)))}, | |
| ] | |
| log.info(f" [Arm B] 真實報酬 + 風險預算最佳化:lambda={lam}, 波動預算={vol_budget*100:.0f}%") | |
| result = minimize(objective_function_armB, initial_w, method='SLSQP', bounds=weight_bounds, constraints=constraints_armB, options={'maxiter': 1000, 'ftol': 1e-9}) | |
| if not result.success: | |
| log.warning(" [Arm B] 風險預算約束無解,改用無風險預算 fallback。") | |
| result = minimize(objective_function_armB, initial_w, method='SLSQP', bounds=weight_bounds, constraints=constraints, options={'maxiter': 1000, 'ftol': 1e-9}) | |
| elif str(getattr(parameters, "OPTIMIZATION_ARM", "A")).upper() == "C": | |
| # Arm C:最小變異核心 + 排名式偏好傾斜 + 品質約束(μ-free,不放噪音資本利得 μ)。 | |
| tau = float(getattr(parameters, "TILT_STRENGTH", 0.1)) | |
| s_full = df_merged_valid['User_Pref_Score'].fillna(0.0).values.astype(float) | |
| if getattr(parameters, "TILT_INCLUDE_CAGR", True): | |
| s_tilt = s_full | |
| else: | |
| s_tilt = s_full - float(global_weights.get('Return_CAGR', 0.0)) * vec_cagr | |
| vol_budget = float(getattr(parameters, "RISK_BUDGET_VOL", 0.30)) | |
| def objective_function_armC(w): | |
| return 0.5 * np.dot(w.T, np.dot(cov_matrix_annual, w)) - tau * np.dot(w, s_tilt) | |
| cons_armC = [ | |
| {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}, | |
| {'type': 'ineq', 'fun': lambda w: vol_budget - np.sqrt(np.dot(w.T, np.dot(cov_matrix_annual, w)))}, | |
| ] | |
| cons_armC += build_quality_constraints( | |
| global_weights, parameters.MAX_WEIGHT_LIMIT, | |
| cost_vec=df_merged_valid['Cost_ExpRatio (%)'].fillna(0.0).values, | |
| sector_matrix=sector_matrix, | |
| norm_liq_vol=vec_liq_vol, norm_liq_aum=vec_liq_aum, sent_vec=vec_sent) | |
| s_label = 'full' if getattr(parameters, "TILT_INCLUDE_CAGR", True) else 'noCAGR' | |
| log.info(f" [Arm C] 最小變異 + 偏好傾斜:tau={tau}, s={s_label}, 波動預算={vol_budget*100:.0f}%") | |
| result = minimize(objective_function_armC, initial_w, method='SLSQP', bounds=weight_bounds, constraints=cons_armC, options={'maxiter': 1000, 'ftol': 1e-9}) | |
| if not result.success: | |
| log.warning(" [Arm C] 含品質約束無解,改用僅 Σw=1 + 波動預算 fallback。") | |
| result = minimize(objective_function_armC, initial_w, method='SLSQP', bounds=weight_bounds, constraints=cons_armC[:2], options={'maxiter': 1000, 'ftol': 1e-9}) | |
| if not result.success: | |
| result = minimize(objective_function_armC, initial_w, method='SLSQP', bounds=weight_bounds, constraints=constraints, options={'maxiter': 1000, 'ftol': 1e-9}) | |
| elif str(getattr(parameters, "OPTIMIZATION_ARM", "A")).upper() == "C2": | |
| # Arm C2(U-C2):profile-dependent 核心,由 g(w) 依偏好權重決定 核心類型/風險預算/τ。 | |
| # 三核心只需每日報酬:Σ(ETF 共變異)與 c(各 ETF 對 VT 共變異)。與 backtest 完全同邏輯。 | |
| gp = derive_params_from_weights(global_weights) | |
| core_mode = gp["core_mode"] | |
| tau = float(gp["tau"]) | |
| # 風險預算改用「相對候選池可行波動範圍」:恆可行、不脫離當期池子。 | |
| vb_budget, v_min, v_max = compute_feasible_vol_budget(cov_matrix_annual, parameters.MAX_WEIGHT_LIMIT, gp["risk_fraction"]) | |
| vol_budget = vb_budget if vb_budget is not None else float(gp["vol_budget"]) | |
| s_full = df_merged_valid['User_Pref_Score'].fillna(0.0).values.astype(float) | |
| if getattr(parameters, "TILT_INCLUDE_CAGR", True): | |
| s_tilt = s_full | |
| else: | |
| s_tilt = s_full - float(global_weights.get('Return_CAGR', 0.0)) * vec_cagr | |
| # market / beta 核心需要 c(對基準報酬流的共變異);minvar 不需要。 | |
| c_vec, var_bench = (None, None) | |
| if core_mode in ("market", "beta"): | |
| anchor_ticker = getattr(parameters, "BETA_ANCHOR_TICKER", None) or getattr(parameters, "DEFAULT_BENCHMARK_TICKER", "VT") | |
| bench_ret = get_benchmark_returns_aligned(returns_matrix, anchor_ticker) | |
| c_vec, var_bench = compute_benchmark_cov_vector(returns_matrix, bench_ret) | |
| if c_vec is None: | |
| log.warning(f" [Arm C2] 取不到基準共變異,核心由 {core_mode} 退回 minvar。") | |
| core_mode = "minvar" | |
| if core_mode == "beta": | |
| beta_vec = (c_vec / var_bench) if (var_bench and var_bench > 0) else c_vec | |
| def objective_function_armC2(w, b=beta_vec, st=s_tilt, t=tau): | |
| return -(np.dot(w, b) + t * np.dot(w, st)) # max wᵀβ + τ·wᵀs | |
| elif core_mode == "market": | |
| def objective_function_armC2(w, cc=c_vec, st=s_tilt, t=tau): | |
| return 0.5 * np.dot(w.T, np.dot(cov_matrix_annual, w)) - np.dot(w, cc) - t * np.dot(w, st) # 對基準追蹤誤差 + 傾斜 | |
| else: # minvar | |
| def objective_function_armC2(w, st=s_tilt, t=tau): | |
| return 0.5 * np.dot(w.T, np.dot(cov_matrix_annual, w)) - t * np.dot(w, st) | |
| cons_armC2 = [ | |
| {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}, | |
| {'type': 'ineq', 'fun': lambda w: vol_budget - np.sqrt(np.dot(w.T, np.dot(cov_matrix_annual, w)))}, | |
| ] | |
| cons_armC2 += build_quality_constraints( | |
| global_weights, parameters.MAX_WEIGHT_LIMIT, | |
| cost_vec=df_merged_valid['Cost_ExpRatio (%)'].fillna(0.0).values, | |
| sector_matrix=sector_matrix, | |
| norm_liq_vol=vec_liq_vol, norm_liq_aum=vec_liq_aum, sent_vec=vec_sent) | |
| log.info(f" [Arm C2] g(w) 核心={core_mode}, tau={tau:.4f}, risk_frac={gp['risk_fraction']:.2f}, " | |
| f"波動預算={vol_budget*100:.1f}% (可行範圍 {v_min*100:.1f}%~{v_max*100:.1f}%)") | |
| result = minimize(objective_function_armC2, initial_w, method='SLSQP', bounds=weight_bounds, constraints=cons_armC2, options={'maxiter': 1000, 'ftol': 1e-9}) | |
| if not result.success: | |
| log.warning(" [Arm C2] 含品質約束無解,改用僅 Σw=1 + 波動預算 fallback。") | |
| result = minimize(objective_function_armC2, initial_w, method='SLSQP', bounds=weight_bounds, constraints=cons_armC2[:2], options={'maxiter': 1000, 'ftol': 1e-9}) | |
| if not result.success: | |
| result = minimize(objective_function_armC2, initial_w, method='SLSQP', bounds=weight_bounds, constraints=constraints, options={'maxiter': 1000, 'ftol': 1e-9}) | |
| elif str(getattr(parameters, "OPTIMIZATION_ARM", "A")).upper() == "BL": | |
| # Black-Litterman 路(a) 統一目標:max wᵀΠ + τ·wᵀs s.t. vol≤budget(設計見 08)。 | |
| # Π = 市場隱含報酬,用 β(=c/var_bench)表示(scale 與傾斜相稱);約束式 → λ 都化進 vol_budget。 | |
| # 三核心(minvar/market/beta)統一成此一式,core_mode 由 risk_fraction 連續涵蓋。 | |
| gp = derive_params_from_weights(global_weights) | |
| tau = float(gp["tau"]) | |
| s_full = df_merged_valid['User_Pref_Score'].fillna(0.0).values.astype(float) | |
| if getattr(parameters, "TILT_INCLUDE_CAGR", True): | |
| s_tilt = s_full | |
| else: | |
| s_tilt = s_full - float(global_weights.get('Return_CAGR', 0.0)) * vec_cagr | |
| anchor_ticker = getattr(parameters, "BETA_ANCHOR_TICKER", None) or getattr(parameters, "DEFAULT_BENCHMARK_TICKER", "VT") | |
| bench_ret = get_benchmark_returns_aligned(returns_matrix, anchor_ticker) | |
| c_vec, var_bench = compute_benchmark_cov_vector(returns_matrix, bench_ret) | |
| vb_budget, v_min, v_max = compute_feasible_vol_budget(cov_matrix_annual, parameters.MAX_WEIGHT_LIMIT, gp["risk_fraction"]) | |
| vol_budget = vb_budget if vb_budget is not None else float(gp["vol_budget"]) | |
| if c_vec is not None and var_bench and var_bench > 0: | |
| pi_vec = c_vec / var_bench # Π(市場隱含報酬,正規化為 beta 尺度) | |
| def objective_function_BL(w, b=pi_vec, st=s_tilt, t=tau): | |
| return -(np.dot(w, b) + t * np.dot(w, st)) | |
| else: | |
| log.warning(" [BL] 取不到基準共變異,退回最小變異+傾斜。") | |
| def objective_function_BL(w, st=s_tilt, t=tau): | |
| return 0.5 * np.dot(w.T, np.dot(cov_matrix_annual, w)) - t * np.dot(w, st) | |
| cons_BL = [ | |
| {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}, | |
| {'type': 'ineq', 'fun': lambda w: vol_budget - np.sqrt(np.dot(w.T, np.dot(cov_matrix_annual, w)))}, | |
| ] | |
| cons_BL += build_quality_constraints( | |
| global_weights, parameters.MAX_WEIGHT_LIMIT, | |
| cost_vec=df_merged_valid['Cost_ExpRatio (%)'].fillna(0.0).values, | |
| sector_matrix=sector_matrix, | |
| norm_liq_vol=vec_liq_vol, norm_liq_aum=vec_liq_aum, sent_vec=vec_sent) | |
| log.info(f" [BL] 統一(市場先驗Π+傾斜): tau={tau:.4f}, risk_frac={gp['risk_fraction']:.2f}, 波動預算={vol_budget*100:.1f}% (可行 {v_min*100:.1f}~{v_max*100:.1f}%)") | |
| result = minimize(objective_function_BL, initial_w, method='SLSQP', bounds=weight_bounds, constraints=cons_BL, options={'maxiter': 1000, 'ftol': 1e-9}) | |
| if not result.success: | |
| result = minimize(objective_function_BL, initial_w, method='SLSQP', bounds=weight_bounds, constraints=cons_BL[:2], options={'maxiter': 1000, 'ftol': 1e-9}) | |
| if not result.success: | |
| result = minimize(objective_function_BL, initial_w, method='SLSQP', bounds=weight_bounds, constraints=constraints, options={'maxiter': 1000, 'ftol': 1e-9}) | |
| else: | |
| result = minimize(objective_function, initial_w, method='SLSQP', bounds=weight_bounds, constraints=constraints, options={'maxiter': 1000, 'ftol': 1e-9}) | |
| optimization_elapsed = time.perf_counter() - optimization_start_time | |
| if USE_TRUE_MDD_OPTIMIZATION and optimization_elapsed > TRUE_MDD_TIME_WARNING_SECONDS: | |
| log.warning( | |
| f"真實 MaxDD 最佳化耗時 {optimization_elapsed:.1f} 秒,已超過 " | |
| f"{TRUE_MDD_TIME_WARNING_SECONDS:.0f} 秒觀察門檻;若測試多次都過慢,可切回舊版 MaxDD proxy。" | |
| ) | |
| if not result.success: | |
| log.error("❌ 最佳化求解失敗:", result.message) | |
| sys.exit(1) | |
| return | |
| optimal_weights = np.round(result.x, 4) | |
| # ... 執行傳統 Max Sharpe 最佳化 ... | |
| # Max Sharpe 期望報酬改用算術平均年化(Sharpe 比率的定義所需,避免幾何低估分子)。 | |
| annual_returns_array = (returns_matrix.mean() * 252).values | |
| def neg_sharpe_objective(w): | |
| p_ret = np.dot(w, annual_returns_array) | |
| p_vol = np.sqrt(np.dot(w.T, np.dot(cov_matrix_annual, w))) | |
| return - (p_ret - 0.04) / p_vol if p_vol > 0 else 0 | |
| res_sharpe = minimize(neg_sharpe_objective, initial_w, method='SLSQP', bounds=weight_bounds, constraints=constraints, options={'maxiter': 1000, 'ftol': 1e-9}) | |
| max_sharpe_weights = np.round(res_sharpe.x, 4) if res_sharpe.success else initial_w | |
| if not res_sharpe.success: | |
| log.warning(f"⚠️ 夏普組合求解未完全收斂: {res_sharpe.message}") | |
| # ========================================== | |
| # 🚨 新增:計算兩者的「偏好效用分數 U(P)」 | |
| # ========================================== | |
| pref_utility_score = calc_utility(optimal_weights, for_display=True) | |
| ms_utility_score = calc_utility(max_sharpe_weights, for_display=True) | |
| # ========================================== | |
| # 深度資料計算 (建立通用的計算函式) | |
| # ========================================== | |
| def get_portfolio_metrics(weights): | |
| # 計算每日與年化報酬、波動率、夏普 | |
| # 依照主程式績效理念:lookback 第一日買入建議比例,之後不再平衡,持有到最後一日。 | |
| port_daily, nav_path = calc_buy_and_hold_portfolio_path(returns_matrix, weights) | |
| volatility = port_daily.std() * np.sqrt(252) | |
| annual_ret = port_daily.mean() * 252 | |
| years = len(port_daily) / 252 | |
| cagr = ((nav_path.iloc[-1] / nav_path.iloc[0]) ** (1 / years) - 1) * 100 if years > 0 else 0 | |
| sharpe = (annual_ret - 0.04) / volatility if volatility > 0 else 0 | |
| # 計算最大回撤 (Max Drawdown) | |
| # 最大回撤也用同一條 buy-and-hold NAV 路徑計算,避免報酬與風險定義不一致。 | |
| max_dd = ((nav_path - nav_path.cummax()) / nav_path.cummax()).min() * 100 | |
| # 計算線性加權的原始特徵指標 | |
| exp_ratio = np.dot(weights, df_merged_valid['Cost_ExpRatio (%)'].fillna(0)) | |
| div_yield = np.dot(weights, df_merged_valid['Return_Div (%)'].fillna(0)) | |
| # 計算線性加權的代理分散度 (確保 df_merged_valid 有 Div_Score 欄位) | |
| proxy_div = np.dot(weights, df_merged_valid['Div_Score (產出)'].fillna(0)) | |
| # 計算線性加權的 FinBERT 宏觀情緒分數 | |
| sentiment = np.dot(weights, df_merged_valid['FinBERT_score'].fillna(0)) | |
| # 計算線性加權的流動性指標 (請確認 df_merged_valid 內的確切欄位名稱) | |
| volume = np.dot(weights, df_merged_valid['Liq_Volume (M)'].fillna(0)) | |
| aum = np.dot(weights, df_merged_valid['Liq_AUM (B)'].fillna(0)) | |
| port_beta = float(np.dot(weights, beta_vec_metrics)) if beta_vec_metrics is not None else float('nan') | |
| return { | |
| 'Arithmetic_Ret': annual_ret * 100, | |
| 'Beta_vs_VT': port_beta, | |
| 'CAGR': cagr, | |
| 'Div_Yield': div_yield, | |
| 'Cost': exp_ratio, | |
| 'Volatility': volatility * 100, | |
| 'MaxDD': max_dd, | |
| 'Proxy_Div': proxy_div, | |
| 'Sentiment': sentiment, | |
| 'Volume': volume, | |
| 'AUM': aum, | |
| 'Sharpe': sharpe | |
| } | |
| # 分別取得兩組權重的深度數據 | |
| pref_metrics = get_portfolio_metrics(optimal_weights) | |
| ms_metrics = get_portfolio_metrics(max_sharpe_weights) | |
| # 🚨 動態計算真實投資組合 HHI (若矩陣已成功建立) | |
| if sector_matrix is not None and len(sector_matrix) > 0: | |
| true_hhi_pref = np.sum(np.dot(optimal_weights, sector_matrix) ** 2) | |
| true_hhi_ms = np.sum(np.dot(max_sharpe_weights, sector_matrix) ** 2) | |
| hhi_str_pref = f"{true_hhi_pref:.4f}" | |
| hhi_str_ms = f"{true_hhi_ms:.4f}" | |
| else: | |
| hhi_str_pref = "N/A (API Limit)" | |
| hhi_str_ms = "N/A (API Limit)" | |
| # 呼叫視覺化函式 | |
| # 依使用者要求:**保留「數學解效率前緣圖」(Mathematical Efficient Frontier)**; | |
| # 但函式內停用「投組績效圖」(portfolio_performance) 與「蒙地卡羅前緣圖」(mpt_efficient_frontier)。 | |
| # (績效圖描述性、蒙地卡羅圖早已停用;數學前緣保留作 μ-σ 參考,標題已註明非 C2/BL 最佳化所在。) | |
| plot_portfolio_analytics_and_mpt(returns_matrix, optimal_weights, max_sharpe_weights, valid_tickers) | |
| # 🚨 新增:整理向量字典,呼叫視覺化函式 2: 雷達圖 | |
| vectors_dict = { | |
| "Return_CAGR": vec_cagr, | |
| "Return_Div": vec_div, | |
| "Risk_Vol": df_scaled_valid['Norm_Risk_Vol'].values, | |
| "Risk_MaxDD": vec_maxdd, | |
| "Cost_ExpRatio": vec_cost, | |
| "Liq_Volume": vec_liq_vol, | |
| "Liq_AUM": vec_liq_aum, | |
| "Div_Score": vec_div_score, | |
| "FinBERT_score": vec_sent | |
| } | |
| def capped_weighted_max_metric(values, max_weight): | |
| """Maximum weighted average metric under long-only sum-to-one and per-asset cap.""" | |
| clean_values = np.array(pd.Series(values).fillna(0.0).values, dtype=float) | |
| sorted_values = np.sort(clean_values)[::-1] | |
| remaining_weight = 1.0 | |
| total = 0.0 | |
| for value in sorted_values: | |
| if remaining_weight <= 1e-12: | |
| break | |
| allocation = min(max_weight, remaining_weight) | |
| total += allocation * value | |
| remaining_weight -= allocation | |
| return total | |
| # 雷達圖的報酬尺度也改用 Stage 3 實際 lookback 報酬,避免再混用 Stage 0 靜態 CAGR。 | |
| # 雷達圖報酬尺度也改用算術平均年化,與雷達分數口徑一致。 | |
| stage3_asset_cagr_pct = (returns_matrix.mean() * 252 * 100).reindex(valid_tickers).fillna(0.0) | |
| return_cagr_upper_bound = capped_weighted_max_metric( | |
| stage3_asset_cagr_pct, | |
| parameters.MAX_WEIGHT_LIMIT, | |
| ) | |
| radar_score_bounds = { | |
| "Return_CAGR": (4.0, return_cagr_upper_bound) | |
| } | |
| log.info( | |
| f" [雷達圖尺度] 歷史報酬映射區間: 4.00% ~ {return_cagr_upper_bound:.2f}% " | |
| f"(單一 ETF 上限 {parameters.MAX_WEIGHT_LIMIT*100:.0f}%)" | |
| ) | |
| # 注意:風險波動率 (Risk_Vol) 在你的二次規劃中是矩陣運算, | |
| # 為了雷達圖展示單一維度表現,這裡提取對角線(資產本身的波動率)做反向正規化作為視覺代理。 | |
| plot_preference_radar_chart( | |
| optimal_weights, | |
| max_sharpe_weights, | |
| vectors_dict, | |
| blended_weights, | |
| pref_metrics, # 傳入偏好組合的真實數據 | |
| ms_metrics, # 傳入夏普組合的真實數據 | |
| cov_matrix_annual, | |
| valid_tickers, | |
| radar_score_bounds=radar_score_bounds | |
| ) | |
| # ========================================== | |
| # 輸出比較報表 | |
| # ========================================== | |
| comparison_df = pd.DataFrame({ | |
| 'ETF': valid_tickers, | |
| '偏好組合 Weight (%)': optimal_weights * 100, | |
| '最大夏普 Weight (%)': max_sharpe_weights * 100 | |
| }) | |
| comparison_df = comparison_df[(comparison_df['偏好組合 Weight (%)'] > 0.01) | (comparison_df['最大夏普 Weight (%)'] > 0.01)] | |
| comparison_df = comparison_df.sort_values(by='偏好組合 Weight (%)', ascending=False).reset_index(drop=True) | |
| log.info("\n" + "="*65) | |
| log.info(" 🎯 專題最終產出:持股權重對比") | |
| log.info("="*65) | |
| log.info(comparison_df.to_string(index=False)) | |
| # 建立深度健檢報告的 DataFrame 確保排版對齊 | |
| analytics_df = pd.DataFrame({ | |
| 'Metric': [ | |
| 'Arithmetic Annual Return (%)', | |
| 'Capital Gain CAGR (%)', | |
| 'Dividend Yield (%)', | |
| 'Estimated Total Return, No Reinvestment (%)', | |
| 'Expense Ratio (%)', | |
| 'Annualized Volatility (%)', | |
| 'Maximum Drawdown (%)', | |
| 'Liquidity Volume (Millions)', | |
| 'Liquidity AUM (Billions)', | |
| 'True Portfolio HHI (Real)', | |
| 'Weighted Sentiment Score', | |
| 'Sharpe Ratio' | |
| ], | |
| 'Preference-Driven': [ | |
| f"{pref_metrics['Arithmetic_Ret']:.2f}", | |
| f"{pref_metrics['CAGR']:.2f}", | |
| f"{pref_metrics['Div_Yield']:.2f}", | |
| f"{pref_metrics['CAGR'] + pref_metrics['Div_Yield']:.2f}", | |
| f"{pref_metrics['Cost']:.3f}", | |
| f"{pref_metrics['Volatility']:.2f}", | |
| f"{pref_metrics['MaxDD']:.2f}", | |
| f"{pref_metrics['Volume']:.2f}", | |
| f"{pref_metrics['AUM']:.2f}", | |
| f"{hhi_str_pref}", | |
| f"{pref_metrics['Sentiment']:.4f}", | |
| f"{pref_metrics['Sharpe']:.3f}" | |
| ], | |
| 'Max Sharpe': [ | |
| f"{ms_metrics['Arithmetic_Ret']:.2f}", | |
| f"{ms_metrics['CAGR']:.2f}", | |
| f"{ms_metrics['Div_Yield']:.2f}", | |
| f"{ms_metrics['CAGR'] + ms_metrics['Div_Yield']:.2f}", | |
| f"{ms_metrics['Cost']:.3f}", | |
| f"{ms_metrics['Volatility']:.2f}", | |
| f"{ms_metrics['MaxDD']:.2f}", | |
| f"{ms_metrics['Volume']:.2f}", | |
| f"{ms_metrics['AUM']:.2f}", | |
| f"{hhi_str_ms}", | |
| f"{ms_metrics['Sentiment']:.4f}", | |
| f"{ms_metrics['Sharpe']:.3f}" | |
| ] | |
| }) | |
| log.info("\n" + "="*65) | |
| log.info(" 📊 投資組合深度健檢報告 (Portfolio Analytics Comparison)") | |
| log.info("="*65) | |
| log.info(analytics_df.to_string(index=False, justify='right')) | |
| log.info("-" * 65) | |
| # 輸出效用對決 | |
| pref_utility_score = calc_utility(optimal_weights, for_display=True) | |
| ms_utility_score = calc_utility(max_sharpe_weights, for_display=True) | |
| log.info(f" 偏好驅動組合【偏好效用分數】: {pref_utility_score:.4f}") | |
| log.info(f" 傳統最大夏普【偏好效用分數】: {ms_utility_score:.4f}") | |
| log.info("="*65) | |
| # 建立 report 資料夾 (若無則自動建立) | |
| report_dir = "report" | |
| os.makedirs(report_dir, exist_ok=True) | |
| # 為了讓每次測試的檔案不會互相覆蓋,建議使用 case_name,若無則用預設名稱 | |
| file_prefix = case | |
| # 1. 輸出完美排版的純文字報告 (.txt) | |
| txt_path = f"{report_dir}\\{file_prefix}_summary.txt" | |
| with open(txt_path, 'w', encoding='utf-8') as f: | |
| f.write("="*65 + "\n") | |
| f.write(" 🎯 專題最終產出:持股權重對比\n") | |
| f.write("="*65 + "\n") | |
| f.write(comparison_df.to_string(index=False) + "\n\n") | |
| f.write("="*65 + "\n") | |
| f.write(" 📊 投資組合深度健檢報告 (Portfolio Analytics Comparison)\n") | |
| f.write("="*65 + "\n") | |
| f.write(analytics_df.to_string(index=False, justify='right') + "\n") | |
| f.write( | |
| "Note: Capital Gain CAGR uses the buy-and-hold portfolio NAV path over the Stage 3 lookback period; " | |
| "Estimated Total Return adds weighted dividend yield without dividend reinvestment.\n" | |
| ) | |
| f.write("-" * 65 + "\n\n") | |
| f.write(f" 偏好驅動組合【偏好效用分數】: {pref_utility_score:.4f}\n") | |
| f.write(f" 傳統最大夏普【偏好效用分數】: {ms_utility_score:.4f}\n") | |
| f.write("="*65 + "\n") | |
| # 2. 輸出方便 Excel 讀取的表格檔案 (.csv) | |
| csv_weights_path = f"{report_dir}\\{file_prefix}_weights.csv" | |
| csv_analytics_path = f"{report_dir}\\{file_prefix}_analytics.csv" | |
| comparison_df.to_csv(csv_weights_path, index=False, encoding='utf-8-sig') | |
| analytics_df.to_csv(csv_analytics_path, index=False, encoding='utf-8-sig') | |
| # ── 將本次主系統的「全部圖表 + 報表」集中到 user_results/new_user_{n}/ ── | |
| # 每次主系統執行 = 一個新的 new_user_{n}:n = 現有 new_user_* 的最大編號 + 1(永不覆寫前一位使用者)。 | |
| # 固定展示夾(如 showcase_7_profiles/)名稱不符 new_user_\d+,不會被計入或覆寫。 | |
| # 內部再分兩個子資料夾: | |
| # 01_screening_eda/ ── DEA 分數分布 + EDA(前處理)圖與 DEA 結果表 | |
| # 02_portfolio/ ── 投組推薦(雷達圖 + 數學前緣 + 權重/分析報表) | |
| try: | |
| import glob as _glob | |
| import shutil | |
| import re as _re | |
| _user_root = getattr(parameters, "USER_RESULTS_DIR", "user_results") | |
| os.makedirs(_user_root, exist_ok=True) | |
| _idx = [] | |
| for _name in os.listdir(_user_root): | |
| _m = _re.fullmatch(r"new_user_(\d+)", _name) | |
| if _m and os.path.isdir(os.path.join(_user_root, _name)): | |
| _idx.append(int(_m.group(1))) | |
| _next_n = (max(_idx) + 1) if _idx else 1 | |
| # 自訂名稱(網頁可填)取代 new_user_{n};未填則 fallback 回 new_user_{n}。 | |
| _custom = globals().get("CUSTOM_RUN_NAME") or getattr(parameters, "CUSTOM_RUN_NAME", None) | |
| _safe = _re.sub(r"[^\w\-]+", "_", str(_custom).strip()).strip("_")[:40] if _custom else "" | |
| if _safe: | |
| _dest = os.path.join(_user_root, _safe) | |
| _k = 2 | |
| while os.path.isdir(_dest): # 同名已存在 → 加序號,不覆蓋既有 | |
| _dest = os.path.join(_user_root, f"{_safe}_{_k}") | |
| _k += 1 | |
| else: | |
| _dest = os.path.join(_user_root, f"new_user_{_next_n}") | |
| # 對外公開本次使用者資料夾路徑,讓接續的 prompt 回測(stage3b)把回測夾巢狀進來。 | |
| globals()["LAST_MAIN_USER_DIR"] = _dest | |
| _groups = { | |
| "01_screening_eda": [ | |
| "png/eda_*.png", "png/*dea*.png", "png/*DEA*.png", | |
| "csv/stage1_dea_results.csv", | |
| ], | |
| "02_portfolio": [ | |
| f"png/{case}_radar_chart.png", | |
| f"png/{case}_Mathematical Efficient Frontier.png", | |
| f"report\\{file_prefix}_*.txt", f"report\\{file_prefix}_*.csv", | |
| ], | |
| } | |
| _n = 0 | |
| for _sub, _patterns in _groups.items(): | |
| _subdir = os.path.join(_dest, _sub) | |
| os.makedirs(_subdir, exist_ok=True) | |
| for _pat in _patterns: | |
| for _f in _glob.glob(_pat): | |
| shutil.copy2(_f, os.path.join(_subdir, os.path.basename(_f))) | |
| _n += 1 | |
| log.info(f"[user_results] 本次推薦圖表+報表({_n} 檔)已分兩夾集中到 {_dest} " | |
| f"(01_screening_eda / 02_portfolio)") | |
| except Exception as _exc: | |
| log.warning(f"[user_results] 集中輸出失敗:{_exc}") | |