Source code for czsc.utils.stats

# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2023/4/19 23:27
describe: 绩效表现统计
"""
import numpy as np
import pandas as pd
from deprecated import deprecated
from collections import Counter


def cal_break_even_point(seq) -> float:
    """计算单笔收益序列的盈亏平衡点

    :param seq: 单笔收益序列,数据样例:[0.01, 0.02, -0.01, 0.03, 0.02, -0.02, 0.01, -0.01, 0.02, 0.01]
    :return: 盈亏平衡点
    """
    if sum(seq) < 0:
        return 1.0
    seq = np.cumsum(sorted(seq))  # type: ignore
    return (np.sum(seq < 0) + 1) / len(seq)  # type: ignore


[docs]def subtract_fee(df, fee=1): """依据单品种持仓信号扣除手续费 函数执行逻辑: 1. 首先,函数对输入的df进行检查,确保其包含所需的列:'dt'(日期时间)和'pos'(持仓)。同时,检查'pos'列的值是否符合要求,即只能是0、1或-1。 2. 如果df中不包含'n1b'(名义收益率)列,函数会根据'price'列计算'n1b'列。 3. 然后,函数为输入的DataFrame df添加一个新列'date',该列包含交易日期(从'dt'列中提取)。 4. 接下来,函数根据持仓('pos')和名义收益率('n1b')计算'edge_pre_fee'(手续费前收益)和'edge_post_fee'(手续费后收益)两列。 5. 函数根据持仓信号计算开仓和平仓的位置。 开仓位置(open_pos)是持仓信号发生变化的位置(即,当前持仓与前一个持仓不同),并且当前持仓不为0。 平仓位置(exit_pos)是持仓信号发生变化的位置(即,当前持仓与前一个持仓不同),并且前一个持仓不为0。 6. 根据手续费规则,开仓时在第一个持仓K线上扣除手续费,平仓时在最后一个持仓K线上扣除手续费。 函数通过将'edge_post_fee'列的值在开仓和平仓位置上分别减去手续费(fee)来实现这一逻辑。 7. 最后,函数返回修改后的DataFrame df。 :param df: 包含dt、pos、price、n1b列的DataFrame :param fee: 手续费,单位:BP :return: 修改后的DataFrame """ assert "dt" in df.columns, "dt 列必须存在" assert "pos" in df.columns, "pos 列必须存在" assert all(x in [0, 1, -1] for x in df["pos"].unique()), "pos 列的值必须是 0, 1, -1 中的一个" if "n1b" not in df.columns: assert "price" in df.columns, "当n1b列不存在时,price 列必须存在" df["n1b"] = (df["price"].shift(-1) / df["price"] - 1) * 10000 df["date"] = df["dt"].dt.date df["edge_pre_fee"] = df["pos"] * df["n1b"] df["edge_post_fee"] = df["pos"] * df["n1b"] # 扣费规则, 开仓扣费在第一个持仓K线上,平仓扣费在最后一个持仓K线上 open_pos = (df["pos"].shift() != df["pos"]) & (df["pos"] != 0) exit_pos = (df["pos"].shift(-1) != df["pos"]) & (df["pos"] != 0) df.loc[open_pos, "edge_post_fee"] = df.loc[open_pos, "edge_post_fee"] - fee df.loc[exit_pos, "edge_post_fee"] = df.loc[exit_pos, "edge_post_fee"] - fee return df
[docs]def daily_performance(daily_returns, **kwargs): """采用单利计算日收益数据的各项指标 函数计算逻辑: 1. 首先,将传入的日收益率数据转换为NumPy数组,并指定数据类型为float64。 2. 然后,进行一系列判断:如果日收益率数据为空或标准差为零或全部为零,则返回字典,其中所有指标的值都为零。 3. 如果日收益率数据满足要求,则进行具体的指标计算: - 年化收益率 = 日收益率列表的和 / 日收益率列表的长度 * 252 - 夏普比率 = 日收益率的均值 / 日收益率的标准差 * 标准差的根号252 - 最大回撤 = 累计日收益率的最高累积值 - 累计日收益率 - 卡玛比率 = 年化收益率 / 最大回撤(如果最大回撤不为零,则除以最大回撤;否则为10) - 日胜率 = 大于零的日收益率的个数 / 日收益率的总个数 - 年化波动率 = 日收益率的标准差 * 标准差的根号252 - 下行波动率 = 日收益率中小于零的日收益率的标准差 * 标准差的根号252 - 非零覆盖 = 非零的日收益率个数 / 日收益率的总个数 - 回撤风险 = 最大回撤 / 年化波动率;一般认为 1 以下为低风险,1-2 为中风险,2 以上为高风险 4. 将所有指标的值存储在字典中,其中键为指标名称,值为相应的计算结果。 :param daily_returns: 日收益率数据,样例: [0.01, 0.02, -0.01, 0.03, 0.02, -0.02, 0.01, -0.01, 0.02, 0.01] :param kwargs: 其他参数 - yearly_days: int, 252, 一年的交易日数 :return: dict """ daily_returns = np.array(daily_returns, dtype=np.float64) yearly_days = kwargs.get("yearly_days", 252) if len(daily_returns) == 0 or np.std(daily_returns) == 0 or all(x == 0 for x in daily_returns): return { "绝对收益": 0, "年化": 0, "夏普": 0, "最大回撤": 0, "卡玛": 0, "日胜率": 0, "日盈亏比": 0, "日赢面": 0, "年化波动率": 0, "下行波动率": 0, "非零覆盖": 0, "盈亏平衡点": 0, "新高间隔": 0, "新高占比": 0, "回撤风险": 0, } annual_returns = np.sum(daily_returns) / len(daily_returns) * yearly_days sharpe_ratio = np.mean(daily_returns) / np.std(daily_returns) * np.sqrt(yearly_days) cum_returns = np.cumsum(daily_returns) dd = np.maximum.accumulate(cum_returns) - cum_returns max_drawdown = np.max(dd) kama = annual_returns / max_drawdown if max_drawdown != 0 else 10 win_pct = len(daily_returns[daily_returns >= 0]) / len(daily_returns) daily_mean_loss = np.mean(daily_returns[daily_returns < 0]) if len(daily_returns[daily_returns < 0]) > 0 else 0 daily_ykb = np.mean(daily_returns[daily_returns >= 0]) / abs(daily_mean_loss) if daily_mean_loss != 0 else 5 annual_volatility = np.std(daily_returns) * np.sqrt(yearly_days) none_zero_cover = len(daily_returns[daily_returns != 0]) / len(daily_returns) downside_volatility = np.std(daily_returns[daily_returns < 0]) * np.sqrt(yearly_days) # 计算最大新高间隔 max_interval = Counter(np.maximum.accumulate(cum_returns).tolist()).most_common(1)[0][1] # 计算新高时间占比 high_pct = len([i for i, x in enumerate(dd) if x == 0]) / len(dd) def __min_max(x, min_val, max_val, digits=4): if x < min_val: x1 = min_val elif x > max_val: x1 = max_val else: x1 = x return round(x1, digits) sta = { "绝对收益": round(np.sum(daily_returns), 4), "年化": round(annual_returns, 4), "夏普": __min_max(sharpe_ratio, -5, 5, 2), "最大回撤": round(max_drawdown, 4), "卡玛": __min_max(kama, -10, 10, 2), "日胜率": round(win_pct, 4), "日盈亏比": round(daily_ykb, 4), "日赢面": round(win_pct * daily_ykb - (1 - win_pct), 4), "年化波动率": round(annual_volatility, 4), "下行波动率": round(downside_volatility, 4), "非零覆盖": round(none_zero_cover, 4), "盈亏平衡点": round(cal_break_even_point(daily_returns), 4), "新高间隔": max_interval, "新高占比": round(high_pct, 4), "回撤风险": round(max_drawdown / annual_volatility, 4), } return sta
[docs]def rolling_daily_performance(df: pd.DataFrame, ret_col, window=252, min_periods=100, **kwargs): """计算滚动日收益 :param df: pd.DataFrame, 日收益数据,columns=['dt', ret_col] :param ret_col: str, 收益列名 :param window: int, 滚动窗口, 自然天数 :param min_periods: int, 最小样本数 :param kwargs: 其他参数 - yearly_days: int, 252, 一年的交易日数 """ if not df.index.dtype == "datetime64[ns]": df["dt"] = pd.to_datetime(df["dt"]) df.set_index("dt", inplace=True) assert df.index.dtype == "datetime64[ns]", "index必须是datetime64[ns]类型, 请先使用 pd.to_datetime 进行转换" df = df[[ret_col]].copy().fillna(0) df.sort_index(inplace=True, ascending=True) dts = sorted(df.index.to_list()) res = [] for edt in dts[min_periods:]: sdt = edt - pd.Timedelta(days=window) dfg = df[(df.index >= sdt) & (df.index <= edt)].copy() s = daily_performance(dfg[ret_col].to_list(), yearly_days=kwargs.get("yearly_days", 252)) s["sdt"] = sdt s["edt"] = edt res.append(s) dfr = pd.DataFrame(res) return dfr
[docs]@deprecated(version="1.0.0", reason="请使用 daily_performance;调整 yearly_days 参数 52 即可") def weekly_performance(weekly_returns): """采用单利计算周收益数据的各项指标 :param weekly_returns: 周收益率数据,样例: [0.01, 0.02, -0.01, 0.03, 0.02, -0.02, 0.01, -0.01, 0.02, 0.01] :return: dict """ weekly_returns = np.array(weekly_returns, dtype=np.float64) if len(weekly_returns) == 0 or np.std(weekly_returns) == 0 or all(x == 0 for x in weekly_returns): return { "年化": 0, "夏普": 0, "最大回撤": 0, "卡玛": 0, "周胜率": 0, "年化波动率": 0, "非零覆盖": 0, "盈亏平衡点": 0, "新高间隔": 0, "新高占比": 0, } annual_returns = np.sum(weekly_returns) / len(weekly_returns) * 52 sharpe_ratio = np.mean(weekly_returns) / np.std(weekly_returns) * np.sqrt(52) cum_returns = np.cumsum(weekly_returns) dd = np.maximum.accumulate(cum_returns) - cum_returns max_drawdown = np.max(dd) kama = annual_returns / max_drawdown if max_drawdown != 0 else 10 win_pct = len(weekly_returns[weekly_returns >= 0]) / len(weekly_returns) annual_volatility = np.std(weekly_returns) * np.sqrt(52) none_zero_cover = len(weekly_returns[weekly_returns != 0]) / len(weekly_returns) # 计算最大新高间隔 high_index = [i for i, x in enumerate(dd) if x == 0] max_interval = 0 for i in range(len(high_index) - 1): max_interval = max(max_interval, high_index[i + 1] - high_index[i]) # 计算新高时间占比 high_pct = len(high_index) / len(dd) def __min_max(x, min_val, max_val, digits=4): if x < min_val: x1 = min_val elif x > max_val: x1 = max_val else: x1 = x return round(x1, digits) sta = { "年化": round(annual_returns, 4), "夏普": __min_max(sharpe_ratio, -5, 5, 2), "最大回撤": round(max_drawdown, 4), "卡玛": __min_max(kama, -10, 10, 2), "周胜率": round(win_pct, 4), "年化波动率": round(annual_volatility, 4), "非零覆盖": round(none_zero_cover, 4), "盈亏平衡点": round(cal_break_even_point(weekly_returns), 4), "新高间隔": max_interval, "时间占比": round(high_pct, 4), } return sta
[docs]@deprecated(version="1.0.0", reason="请使用 daily_performance;调整 yearly_days 参数 12 即可") def net_value_stats(nv: pd.DataFrame, exclude_zero: bool = False, sub_cost=True) -> dict: """统计净值曲线的年化收益、夏普等 :param nv: 净值数据,格式如下: dt edge cost 0 2017-01-03 09:30:00 0.0 0.0 1 2017-01-03 10:00:00 0.0 0.0 2 2017-01-03 10:30:00 0.0 0.0 3 2017-01-03 11:00:00 0.0 0.0 4 2017-01-03 13:00:00 0.0 0.0 列说明: dt: 交易时间 edge: 单利收益,单位:BP cost: 交易成本,单位:BP;可选列,如果没有成本列,则默认为0 :param exclude_zero: 是否排除收益为0的情况,一般认为收益为0的情况是没有持仓的 :param sub_cost: 是否扣除成本 :return: """ nv = nv.copy(deep=True) nv["dt"] = pd.to_datetime(nv["dt"]) if sub_cost: assert "cost" in nv.columns, "成本列cost不存在" nv["edge"] = nv["edge"] - nv["cost"] else: if "cost" not in nv.columns: nv["cost"] = 0 if exclude_zero: nv = nv[(nv["edge"] != 0) | (nv["cost"] != 0)] # 按日期聚合 nv["date"] = nv["dt"].apply(lambda x: x.date()) df_nav = nv.groupby("date")["edge"].sum() / 10000 df_nav = df_nav.cumsum() if all(x == 0 for x in nv["edge"]): # 处理没有持仓记录的情况 sharp = 0 y_ret = 0 calmar = 0 mdd = 0 else: # y_ret = yearly return N = 252 y_ret = df_nav.iloc[-1] * (N / len(df_nav)) if df_nav.diff().std() != 0: sharp = df_nav.diff().mean() / df_nav.diff().std() * pow(N, 0.5) else: sharp = 0 df0 = df_nav.shift(1).ffill().fillna(0) mdd = (1 - (df0 + 1) / (df0 + 1).cummax()).max() calmar = y_ret / mdd if mdd != 0 else 1 prefix = "有持仓时间" if exclude_zero else "" res = { "夏普": round(sharp, 2), "卡玛": round(calmar, 2), "年化": round(y_ret, 4), "最大回撤": round(mdd, 4), } res = {f"{prefix}{k}": v for k, v in res.items()} if not exclude_zero: res["持仓覆盖"] = round(len(nv[(nv["edge"] != 0) | (nv["cost"] != 0)]) / len(nv), 4) if len(nv) > 0 else 0 return res
def evaluate_pairs(pairs: pd.DataFrame, trade_dir: str = "多空") -> dict: """评估开平交易记录的表现 :param pairs: 开平交易记录,数据样例如下: ========== ========== =================== =================== ========== ========== =========== ============ ========== ========== 标的代码 交易方向 开仓时间 平仓时间 开仓价格 平仓价格 持仓K线数 事件序列 持仓天数 盈亏比例 ========== ========== =================== =================== ========== ========== =========== ============ ========== ========== DLi9001 多头 2019-02-25 21:36:00 2019-02-25 21:51:00 1147.8 1150.72 16 开多 -> 平多 0 25.47 DLi9001 多头 2021-09-15 14:06:00 2021-09-15 14:09:00 3155.88 3153.61 4 开多 -> 平多 0 -7.22 DLi9001 多头 2019-08-29 21:01:00 2019-08-29 22:54:00 1445.86 1454.55 114 开多 -> 平多 0 60.09 DLi9001 多头 2021-10-11 21:46:00 2021-10-11 22:11:00 3631.77 3622.66 26 开多 -> 平多 0 -25.08 DLi9001 多头 2020-05-13 09:16:00 2020-05-13 09:26:00 1913.13 1917.64 11 开多 -> 平多 0 23.55 ========== ========== =================== =================== ========== ========== =========== ============ ========== ========== :param trade_dir: 交易方向,可选值 ['多头', '空头', '多空'] :return: 交易表现 """ from czsc.objects import cal_break_even_point assert trade_dir in [ "多头", "空头", "多空", ], "trade_dir 参数错误,可选值 ['多头', '空头', '多空']" pairs = pairs.copy() p = { "交易方向": trade_dir, "交易次数": 0, "累计收益": 0, "单笔收益": 0, "盈利次数": 0, "累计盈利": 0, "单笔盈利": 0, "亏损次数": 0, "累计亏损": 0, "单笔亏损": 0, "交易胜率": 0, "累计盈亏比": 0, "单笔盈亏比": 0, "盈亏平衡点": 1, "持仓天数": 0, "持仓K线数": 0, } if len(pairs) == 0: return p if trade_dir in ["多头", "空头"]: pairs = pairs[pairs["交易方向"] == trade_dir] if len(pairs) == 0: return p pairs = pairs.to_dict(orient="records") p["交易次数"] = len(pairs) p["盈亏平衡点"] = round(cal_break_even_point([x["盈亏比例"] for x in pairs]), 4) p["累计收益"] = round(sum([x["盈亏比例"] for x in pairs]), 2) p["单笔收益"] = round(p["累计收益"] / p["交易次数"], 2) p["持仓天数"] = round(sum([x["持仓天数"] for x in pairs]) / len(pairs), 2) p["持仓K线数"] = round(sum([x["持仓K线数"] for x in pairs]) / len(pairs), 2) win_ = [x for x in pairs if x["盈亏比例"] >= 0] if len(win_) > 0: p["盈利次数"] = len(win_) p["累计盈利"] = sum([x["盈亏比例"] for x in win_]) p["单笔盈利"] = round(p["累计盈利"] / p["盈利次数"], 4) p["交易胜率"] = round(p["盈利次数"] / p["交易次数"], 4) loss_ = [x for x in pairs if x["盈亏比例"] < 0] if len(loss_) > 0: p["亏损次数"] = len(loss_) p["累计亏损"] = sum([x["盈亏比例"] for x in loss_]) p["单笔亏损"] = round(p["累计亏损"] / p["亏损次数"], 4) p["累计盈亏比"] = round(p["累计盈利"] / abs(p["累计亏损"]), 4) p["单笔盈亏比"] = round(p["单笔盈利"] / abs(p["单笔亏损"]), 4) return p
[docs]def holds_performance(df, **kwargs): """组合持仓权重表现 :param df: pd.DataFrame, columns=['dt', 'symbol', 'weight', 'n1b'] 数据说明,dt: 交易时间,symbol: 标的代码,weight: 权重,n1b: 名义收益率 必须是每个时间点都有数据,如果某个时间点没有数据,可以增加一行数据,权重为0 :param kwargs: - fee: float, 单边费率,BP - digits: int, 保留小数位数 :return: pd.DataFrame, columns=['date', 'change', 'edge_pre_fee', 'cost', 'edge_post_fee'] """ fee = kwargs.get("fee", 15) digits = kwargs.get("digits", 2) # 保留小数位数 df = df.copy() df["weight"] = df["weight"].round(digits) df = df.sort_values(["dt", "symbol"]).reset_index(drop=True) dft = pd.pivot_table(df, index="dt", columns="symbol", values="weight", aggfunc="sum").fillna(0) df_turns = dft.diff().abs().sum(axis=1).reset_index() df_turns.columns = ["date", "change"] sdt = df["dt"].min() df_turns.loc[(df_turns["date"] == sdt), "change"] = df[df["dt"] == sdt]["weight"].sum() df_edge = df.groupby("dt").apply(lambda x: (x["weight"] * x["n1b"]).sum()).reset_index() df_edge.columns = ["date", "edge_pre_fee"] dfr = pd.merge(df_turns, df_edge, on="date", how="left") dfr["cost"] = dfr["change"] * fee / 10000 # 换手成本 dfr["edge_post_fee"] = dfr["edge_pre_fee"] - dfr["cost"] # 净收益 return dfr
[docs]def top_drawdowns(returns: pd.Series, top: int = 10) -> pd.DataFrame: """分析最大回撤,返回最大回撤的波峰、波谷、恢复日期、回撤天数、恢复天数 :param returns: pd.Series, 日收益率序列,index为日期 :param top: int, optional, 返回最大回撤的数量,默认10 :return: pd.DataFrame """ returns = returns.copy() df_cum = returns.cumsum() underwater = df_cum - df_cum.cummax() drawdowns = [] for _ in range(top): valley = underwater.idxmin() # end of the period peak = underwater[:valley][underwater[:valley] == 0].index[-1] try: recovery = underwater[valley:][underwater[valley:] == 0].index[0] except IndexError: recovery = np.nan # drawdown not recovered # Slice out draw-down period if not pd.isnull(recovery): underwater.drop(underwater[peak:recovery].index[1:-1], inplace=True) else: # drawdown has not ended yet underwater = underwater.loc[:peak] drawdown = df_cum.loc[valley] - df_cum.loc[peak] drawdown_days = (valley - peak).days recovery_days = (recovery - valley).days if not pd.isnull(recovery) else np.nan new_high_days = drawdown_days + recovery_days if not pd.isnull(recovery) else np.nan drawdowns.append((peak, valley, recovery, drawdown, drawdown_days, recovery_days, new_high_days)) if (len(returns) == 0) or (len(underwater) == 0) or (np.min(underwater) == 0): break df_drawdowns = pd.DataFrame( drawdowns, columns=["回撤开始", "回撤结束", "回撤修复", "净值回撤", "回撤天数", "恢复天数", "新高间隔"] ) return df_drawdowns
[docs]def psi(df: pd.DataFrame, factor, segment, **kwargs): """PSI 群体稳定性指标,反映数据在不同分箱中的分布变化 PSI = ∑(实际占比 - 基准占比) * ln(实际占比 / 基准占比) 参考:https://zhuanlan.zhihu.com/p/79682292 风控模型—群体稳定性指标(PSI)深入理解应用 :param df: 数据, 必须包含 dt 和 col 列 :param factor: 分组因子 :param segment: 样本分组 :param kwargs: :return: pd.DataFrame """ dfg = df.groupby([factor, segment], observed=False).size().unstack().fillna(0).apply(lambda x: x / x.sum(), axis=0) dfg["总体分布"] = df.groupby(factor).size().values / len(df) base_col = "总体分布" cols = [x for x in dfg.columns if x != base_col] for rate_col in cols: dfg[f"{rate_col}_PSI"] = np.where( (dfg[base_col] != 0) & (dfg[rate_col] != 0), (dfg[rate_col] - dfg[base_col]) * np.log((dfg[rate_col] / dfg[base_col])), dfg[rate_col] - dfg[base_col], ) psi_cols = [x for x in dfg.columns if x.endswith("_PSI")] dfg["PSI"] = dfg[psi_cols].mean(axis=1) dfg.loc["总计"] = dfg.sum(axis=0) return dfg