Source code for czsc.utils.cross
# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2023/5/11 17:53
describe: CrossSectionalPerformance - 截面绩效分析
"""
import os
import time
import numpy as np
import pandas as pd
from loguru import logger
from czsc.utils import WordWriter
from czsc.utils.stats import net_value_stats
from czsc.utils.plt_plot import plot_net_value
[docs]class CrossSectionalPerformance:
"""根据截面持仓信息,计算截面绩效"""
def __init__(self, dfh: pd.DataFrame, **kwargs):
"""计算截面绩效
:param dfh: 截面持仓信息,包含以下字段
symbol: 标的代码
dt: 交易时间
pos: 持仓方向,1 = 多头,-1 = 空头,0 = 平仓
n1b: 持仓至下根K线结束的收益率,单位:BP
weight: 持仓权重,可选,如果不输入,则默认等权
数据示例:
========= =================== ===== =====
symbol dt pos n1b
========= =================== ===== =====
000008.SZ 2017-01-03 09:45:00 0 10.7
000008.SZ 2017-01-03 10:00:00 1 10.7
000008.SZ 2017-01-03 10:15:00 0 -10.7
000008.SZ 2017-01-03 10:30:00 1 10.7
000008.SZ 2017-01-03 10:45:00 0 -10.7
========= =================== ===== =====
:param kwargs: 其他参数
"""
self.version = 'V230528'
dfh = dfh.copy()
dfh['dt'] = pd.to_datetime(dfh['dt'])
dfh['date'] = dfh['dt'].apply(lambda x: x.date())
self.dfh = dfh
# self.dfh = self.__add_count(dfh)
self.dfh = self.__add_equal_weight(self.dfh, max_total_weight=kwargs.get('max_total_weight', 1))
self.dfh['edge'] = self.dfh['n1b'] * self.dfh['weight']
self.kwargs = kwargs
@staticmethod
def __add_count(dfh):
"""添加连续持仓计数"""
res = []
for symbol, dfs in dfh.groupby('symbol'):
dfs = dfs.sort_values('dt', ascending=True)
dfs['count'] = dfs['pos'].groupby((dfs['pos'] != dfs['pos'].shift()).cumsum()).cumcount() + 1
dfs['count'] = np.where(dfs['pos'] == 0, 0, dfs['count'])
res.append(dfs)
return pd.concat(res, ignore_index=True)
@staticmethod
def __add_equal_weight(dfh, max_total_weight=1):
"""添加等权重
:param dfh:
:param max_total_weight: 最大权重,如果某个截面的持仓权重超过该值,则按该值计算;
1 表示 100% 权重,即 1 倍杠杆,2 表示 200% 权重,即 2 倍杠杆
:return:
"""
if 'weight' in dfh.columns:
return dfh
results = []
for dt, dfg in dfh.groupby('dt'):
dfg['weight'] = 0
if dfg['pos'].abs().sum() != 0:
symbol_weight = max_total_weight / dfg['pos'].abs().sum()
dfg['weight'] = symbol_weight * dfg['pos']
results.append(dfg)
dfh = pd.concat(results, ignore_index=True)
return dfh
[docs] def cal_turnover(self):
"""计算换手率"""
dfh = self.dfh.copy()
dft = pd.pivot_table(dfh, index='dt', columns='symbol', values='weight', aggfunc='sum')
dft = dft.fillna(0)
dft1 = dft.diff().abs().sum(axis=1)
# 由于是 diff 计算,第一个时刻的仓位变化被忽视了,修改一下
dft1.iloc[0] = dfh[dfh['dt'] == dfh['dt'].min()]['pos'].sum()
dft2 = dft.apply(lambda x: x[x != 0].count(), axis=1).fillna(0)
dft = pd.concat([dft1, dft2], axis=1)
dft.columns = ['换手率', '持仓数量']
return dft
[docs] def cross_net_value(self, by='dt', values='edge'):
"""计算截面等权净值
:param by: 按什么字段计算截面等权净值,默认按交易时间
:param values: 计算截面等权净值时,使用的字段,默认使用 edge,计算策略收益,可选值:edge, n1b
输入 edge,计算策略收益
输入 n1b,计算基准收益
:return:
"""
assert values in ['edge', 'n1b']
dfh = self.dfh.copy()
dfe = pd.pivot_table(dfh, index=by, columns='symbol', values=values, aggfunc='sum')
if values == 'edge':
dfe['截面收益'] = dfe.sum(axis=1).fillna(0)
else:
dfe['截面收益'] = dfe.mean(axis=1).fillna(0)
dfe['累计净值'] = dfe['截面收益'].cumsum()
dfe['动态回撤'] = ((dfe['累计净值'] + 10000) / (dfe['累计净值'] + 10000).cummax() - 1) * 10000 + 10000
dfe['dt'] = dfe.index.values
return dfe[['dt', '截面收益', '累计净值', '动态回撤']].fillna(0).reset_index(drop=True)
[docs] def report(self, file_docx):
if os.path.exists(file_docx):
os.remove(file_docx)
logger.warning(f"删除已存在的文件:{file_docx}")
writer = WordWriter(file_docx)
writer.add_title("截面绩效分析报告")
writer.add_paragraph("本文档由 czsc 编写,用于分析截面绩效。"
"截面绩效分析,是指在某个时间点,对所有标的收益进行等权汇总,计算出截面等权收益。")
dft = self.cal_turnover()
writer.add_heading("换手率", level=1)
writer.add_paragraph("换手率,是指在某个时间点,所有标的权重变化的绝对值之和。", first_line_indent=0)
writer.add_paragraph(f"平均换手率:{round(dft['换手率'].mean(), 4)}; "
f"累计换手率:{round(dft['换手率'].sum(), 4)}", first_line_indent=0)
for dt_col in ['dt', 'date']:
writer.add_heading(f"按 {dt_col} 截面进行评价", level=1)
dfe = self.cross_net_value(by=dt_col)
nv = dfe[['dt', '截面收益']].copy()
nv.columns = ['dt', 'edge']
stats = net_value_stats(nv, sub_cost=False)
stats_info = "\n".join([f"{k}: {v}" for k, v in stats.items()])
writer.add_paragraph(f"截面等权净值,按 {dt_col} 截面进行评价,统计结果如下:\n{stats_info}", first_line_indent=0)
# 绘制净值曲线
writer.add_paragraph(f"按 {dt_col} 截面的策略净值曲线如下:", first_line_indent=0)
dfe = self.cross_net_value(by=dt_col, values='edge')
file_png = f"{time.time_ns()}.png"
plot_net_value(dfe, file_png=file_png, figsize=(9, 5))
writer.add_picture(file_png, width=15, height=9)
os.remove(file_png)
writer.add_paragraph(f"按 {dt_col} 截面的基准净值曲线如下:", first_line_indent=0)
dfe = self.cross_net_value(by=dt_col, values='n1b')
file_png = f"{time.time_ns()}.png"
plot_net_value(dfe, file_png=file_png, figsize=(9, 5))
writer.add_picture(file_png, width=15, height=9)
os.remove(file_png)
# 计算每个月的累计收益
nv['month'] = nv['dt'].apply(lambda x: x.strftime("%Y-%m"))
nvm = nv.groupby('month')['edge'].apply(sum).reset_index(drop=False)
nvm[['year', 'month']] = nvm['month'].apply(lambda x: (x[:4], x[-2:])).values.tolist()
ymr = pd.pivot_table(nvm, index='year', columns='month', values='edge', aggfunc='sum').fillna(0).round(1)
writer.add_heading(f"按月进行收益汇总:月胜率 = {len(nvm[nvm['edge'] > 0]) / len(nvm) * 100:.2f}%", level=2)
writer.add_df_table(ymr.reset_index(drop=False), style='Medium List 1 Accent 2', font_size=8)
writer.save()
logger.info(f"报告生成成功:{file_docx}")
[docs]def cross_sectional_ranker(df, x_cols, y_col, **kwargs):
"""截面打分排序
:param df: 因子数据,必须包含日期、品种、因子值、预测列,且按日期升序排列,样例数据如下:
:param x_cols: 因子列名
:param y_col: 预测列名
:param kwargs: 其他参数
- model_params: dict, 模型参数,默认{'n_estimators': 40, 'learning_rate': 0.01},可调整,参考lightgbm文档
- n_splits: int, 时间拆分次数,默认5,即5段时间
- rank_ascending: bool, 打分排序是否升序,默认False-降序
- copy: bool, 是否拷贝df,True-拷贝,False-不拷贝
:return: df, 包含预测分数和排序列
"""
from lightgbm import LGBMRanker
from sklearn.model_selection import TimeSeriesSplit
assert "symbol" in df.columns, "df must have column 'symbol'"
assert "dt" in df.columns, "df must have column 'dt'"
if kwargs.get('copy', True):
df = df.copy()
df['dt'] = pd.to_datetime(df['dt'])
df = df.sort_values(['dt', y_col], ascending=[True, False])
model_params = kwargs.get('model_params', {'n_estimators': 40, 'learning_rate': 0.01})
model = LGBMRanker(**model_params)
dfd = pd.DataFrame({'dt': sorted(df['dt'].unique())}).values
tss = TimeSeriesSplit(n_splits=kwargs.get('n_splits', 5))
for train_index, test_index in tss.split(dfd):
train_dts = dfd[train_index][:, 0]
test_dts = dfd[test_index][:, 0]
# 拆分训练集和测试集
train, test = df[df['dt'].isin(train_dts)], df[df['dt'].isin(test_dts)]
X_train, X_test, y_train = train[x_cols], test[x_cols], train[y_col]
query_train = train.groupby('dt')['symbol'].count().values
# 训练模型 & 预测
model.fit(X_train, y_train, group=query_train)
df.loc[X_test.index, 'score'] = model.predict(X_test)
df['rank'] = df.groupby('dt')['score'].rank(ascending=kwargs.get('rank_ascending', False))
return df