Source code for czsc.utils.cross

# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2023/5/11 17:53
describe: CrossSectionalPerformance - 截面绩效分析
"""
import os
import time
import numpy as np
import pandas as pd
from loguru import logger
from czsc.utils import WordWriter
from czsc.utils.stats import net_value_stats
from czsc.utils.plt_plot import plot_net_value
from deprecated import deprecated


[docs]@deprecated(version="1.0.0", reason="不需要这个类,截面策略的绩效分析可以直接使用 czsc.utils.cross") class CrossSectionalPerformance: """根据截面持仓信息,计算截面绩效""" def __init__(self, dfh: pd.DataFrame, **kwargs): """计算截面绩效 :param dfh: 截面持仓信息,包含以下字段 symbol: 标的代码 dt: 交易时间 pos: 持仓方向,1 = 多头,-1 = 空头,0 = 平仓 n1b: 持仓至下根K线结束的收益率,单位:BP weight: 持仓权重,可选,如果不输入,则默认等权 数据示例: ========= =================== ===== ===== symbol dt pos n1b ========= =================== ===== ===== 000008.SZ 2017-01-03 09:45:00 0 10.7 000008.SZ 2017-01-03 10:00:00 1 10.7 000008.SZ 2017-01-03 10:15:00 0 -10.7 000008.SZ 2017-01-03 10:30:00 1 10.7 000008.SZ 2017-01-03 10:45:00 0 -10.7 ========= =================== ===== ===== :param kwargs: 其他参数 """ self.version = "V230528" dfh = dfh.copy() dfh["dt"] = pd.to_datetime(dfh["dt"]) dfh["date"] = dfh["dt"].apply(lambda x: x.date()) self.dfh = dfh # self.dfh = self.__add_count(dfh) self.dfh = self.__add_equal_weight(self.dfh, max_total_weight=kwargs.get("max_total_weight", 1)) self.dfh["edge"] = self.dfh["n1b"] * self.dfh["weight"] self.kwargs = kwargs @staticmethod def __add_count(dfh): """添加连续持仓计数""" res = [] for symbol, dfs in dfh.groupby("symbol"): dfs = dfs.sort_values("dt", ascending=True) dfs["count"] = dfs["pos"].groupby((dfs["pos"] != dfs["pos"].shift()).cumsum()).cumcount() + 1 dfs["count"] = np.where(dfs["pos"] == 0, 0, dfs["count"]) res.append(dfs) return pd.concat(res, ignore_index=True) @staticmethod def __add_equal_weight(dfh, max_total_weight=1): """添加等权重 :param dfh: :param max_total_weight: 最大权重,如果某个截面的持仓权重超过该值,则按该值计算; 1 表示 100% 权重,即 1 倍杠杆,2 表示 200% 权重,即 2 倍杠杆 :return: """ if "weight" in dfh.columns: return dfh results = [] for dt, dfg in dfh.groupby("dt"): dfg["weight"] = 0 if dfg["pos"].abs().sum() != 0: symbol_weight = max_total_weight / dfg["pos"].abs().sum() dfg["weight"] = symbol_weight * dfg["pos"] results.append(dfg) dfh = pd.concat(results, ignore_index=True) return dfh
[docs] def cal_turnover(self): """计算换手率""" dfh = self.dfh.copy() dft = pd.pivot_table(dfh, index="dt", columns="symbol", values="weight", aggfunc="sum") dft = dft.fillna(0) dft1 = dft.diff().abs().sum(axis=1) # 由于是 diff 计算,第一个时刻的仓位变化被忽视了,修改一下 dft1.iloc[0] = dfh[dfh["dt"] == dfh["dt"].min()]["pos"].sum() dft2 = dft.apply(lambda x: x[x != 0].count(), axis=1).fillna(0) dft = pd.concat([dft1, dft2], axis=1) dft.columns = ["换手率", "持仓数量"] return dft
[docs] def cross_net_value(self, by="dt", values="edge"): """计算截面等权净值 :param by: 按什么字段计算截面等权净值,默认按交易时间 :param values: 计算截面等权净值时,使用的字段,默认使用 edge,计算策略收益,可选值:edge, n1b 输入 edge,计算策略收益 输入 n1b,计算基准收益 :return: """ assert values in ["edge", "n1b"] dfh = self.dfh.copy() dfe = pd.pivot_table(dfh, index=by, columns="symbol", values=values, aggfunc="sum") if values == "edge": dfe["截面收益"] = dfe.sum(axis=1).fillna(0) else: dfe["截面收益"] = dfe.mean(axis=1).fillna(0) dfe["累计净值"] = dfe["截面收益"].cumsum() dfe["动态回撤"] = ((dfe["累计净值"] + 10000) / (dfe["累计净值"] + 10000).cummax() - 1) * 10000 + 10000 dfe["dt"] = dfe.index.values return dfe[["dt", "截面收益", "累计净值", "动态回撤"]].fillna(0).reset_index(drop=True)
[docs] def report(self, file_docx): if os.path.exists(file_docx): os.remove(file_docx) logger.warning(f"删除已存在的文件:{file_docx}") writer = WordWriter(file_docx) writer.add_title("截面绩效分析报告") writer.add_paragraph( "本文档由 czsc 编写,用于分析截面绩效。" "截面绩效分析,是指在某个时间点,对所有标的收益进行等权汇总,计算出截面等权收益。" ) dft = self.cal_turnover() writer.add_heading("换手率", level=1) writer.add_paragraph("换手率,是指在某个时间点,所有标的权重变化的绝对值之和。", first_line_indent=0) writer.add_paragraph( f"平均换手率:{round(dft['换手率'].mean(), 4)}; " f"累计换手率:{round(dft['换手率'].sum(), 4)}", first_line_indent=0, ) for dt_col in ["dt", "date"]: writer.add_heading(f"按 {dt_col} 截面进行评价", level=1) dfe = self.cross_net_value(by=dt_col) nv = dfe[["dt", "截面收益"]].copy() nv.columns = ["dt", "edge"] stats = net_value_stats(nv, sub_cost=False) stats_info = "\n".join([f"{k}: {v}" for k, v in stats.items()]) writer.add_paragraph( f"截面等权净值,按 {dt_col} 截面进行评价,统计结果如下:\n{stats_info}", first_line_indent=0 ) # 绘制净值曲线 writer.add_paragraph(f"按 {dt_col} 截面的策略净值曲线如下:", first_line_indent=0) dfe = self.cross_net_value(by=dt_col, values="edge") file_png = f"{time.time_ns()}.png" plot_net_value(dfe, file_png=file_png, figsize=(9, 5)) writer.add_picture(file_png, width=15, height=9) os.remove(file_png) writer.add_paragraph(f"按 {dt_col} 截面的基准净值曲线如下:", first_line_indent=0) dfe = self.cross_net_value(by=dt_col, values="n1b") file_png = f"{time.time_ns()}.png" plot_net_value(dfe, file_png=file_png, figsize=(9, 5)) writer.add_picture(file_png, width=15, height=9) os.remove(file_png) # 计算每个月的累计收益 nv["month"] = nv["dt"].apply(lambda x: x.strftime("%Y-%m")) nvm = nv.groupby("month")["edge"].apply(sum).reset_index(drop=False) nvm[["year", "month"]] = nvm["month"].apply(lambda x: (x[:4], x[-2:])).values.tolist() ymr = pd.pivot_table(nvm, index="year", columns="month", values="edge", aggfunc="sum").fillna(0).round(1) writer.add_heading(f"按月进行收益汇总:月胜率 = {len(nvm[nvm['edge'] > 0]) / len(nvm) * 100:.2f}%", level=2) writer.add_df_table(ymr.reset_index(drop=False), style="Medium List 1 Accent 2", font_size=8) writer.save() logger.info(f"报告生成成功:{file_docx}")
[docs]def cross_sectional_ranker(df, x_cols, y_col, **kwargs): """截面打分排序 :param df: 因子数据,必须包含日期、品种、因子值、预测列,且按日期升序排列,样例数据如下: :param x_cols: 因子列名 :param y_col: 预测列名 :param kwargs: 其他参数 - model_params: dict, 模型参数,默认{'n_estimators': 40, 'learning_rate': 0.01},可调整,参考lightgbm文档 - n_splits: int, 时间拆分次数,默认5,即5段时间 - rank_ascending: bool, 打分排序是否升序,默认False-降序 - copy: bool, 是否拷贝df,True-拷贝,False-不拷贝 :return: df, 包含预测分数和排序列 """ from lightgbm import LGBMRanker from sklearn.model_selection import TimeSeriesSplit assert "symbol" in df.columns, "df must have column 'symbol'" assert "dt" in df.columns, "df must have column 'dt'" if kwargs.get("copy", True): df = df.copy() df["dt"] = pd.to_datetime(df["dt"]) df = df.sort_values(["dt", y_col], ascending=[True, False]) model_params = kwargs.get("model_params", {"n_estimators": 40, "learning_rate": 0.01}) model = LGBMRanker(**model_params) dfd = pd.DataFrame({"dt": sorted(df["dt"].unique())}).values tss = TimeSeriesSplit(n_splits=kwargs.get("n_splits", 5)) for train_index, test_index in tss.split(dfd): train_dts = dfd[train_index][:, 0] test_dts = dfd[test_index][:, 0] # 拆分训练集和测试集 train, test = df[df["dt"].isin(train_dts)], df[df["dt"].isin(test_dts)] X_train, X_test, y_train = train[x_cols], test[x_cols], train[y_col] query_train = train.groupby("dt")["symbol"].count().values # 训练模型 & 预测 model.fit(X_train, y_train, group=query_train) df.loc[X_test.index, "score"] = model.predict(X_test) df["rank"] = df.groupby("dt")["score"].rank(ascending=kwargs.get("rank_ascending", False)) return df