# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2023/7/15 13:42
describe: 特征分析相关的传感器
"""
import os
import pandas as pd
from tqdm import tqdm
from loguru import logger
from czsc.utils.corr import cross_sectional_ic
from czsc.utils.stats import daily_performance
from czsc.utils.trade import update_nbars
from concurrent.futures import ProcessPoolExecutor, as_completed
[docs]class FeatureAnalyzeBase:
"""【基类】特征计算与分析,适用于时序量价类因子"""
def __init__(self, symbols, read_bars, **kwargs) -> None:
"""初始化函数
:param symbols: list,需要获取特征的品种列表
:param read_bars: function,获取品种K线数据的函数
:param kwargs: dict,其他参数
- freq: str,K线周期,可选值:日线、60分钟、30分钟、15分钟、5分钟、1分钟
- sdt: str,开始日期
- edt: str,结束日期
- fq: str,复权方式,可选值:前复权、后复权,默认为后复权
- max_workers: int,多进程获取特征的最大进程数
"""
self.symbols = symbols
self.read_bars = read_bars
self.kwargs = kwargs
self.dfs = self.get_features()
self.report()
@property
def new_features(self):
"""list,新增的特征列表"""
raise NotImplementedError
[docs] def add_features(self, df):
"""向df中添加特征
df 包含以下列:
- dt: 日期
- open: 开盘价
- close: 收盘价
- high: 最高价
- low: 最低价
- vol: 成交量
- amount: 成交额
"""
raise NotImplementedError
def _one_symbol_features(self, symbol):
"""获取单个品种的特征"""
freq = self.kwargs.get('freq', '日线')
sdt = self.kwargs.get('sdt', '2010-01-01')
edt = self.kwargs.get('edt', '2023-01-01')
fq = self.kwargs.get('fq', "后复权")
nbars_seq = self.kwargs.get('nbars_seq', (1, 2, 5))
try:
bars = self.read_bars(symbol=symbol, freq=freq, sdt=sdt, edt=edt, fq=fq)
_df = pd.DataFrame(bars)
update_nbars(_df, numbers=nbars_seq, price_col='open', move=1)
_df = self.add_features(_df)
return _df
except Exception as e:
logger.error(f"{symbol} error: {e}")
return pd.DataFrame()
[docs] def get_features(self) -> pd.DataFrame:
"""
此方法检索self.symbols中每个符号的K线数据,
使用add_features方法向数据添加特征
:return: 包含所有K线和特征的DataFrame
"""
max_workers = self.kwargs.get('max_workers', 1)
if max_workers == 1:
res = []
for symbol in tqdm(self.symbols, desc="获取特征"):
df = self._one_symbol_features(symbol)
res.append(df)
else:
res = []
with ProcessPoolExecutor(max_workers) as executor:
tasks = [executor.submit(self._one_symbol_features, symbol) for symbol in self.symbols]
for future in tqdm(as_completed(tasks), desc="获取特征", total=len(tasks)):
df = future.result()
res.append(df)
return pd.concat(res, ignore_index=True)
[docs] def layering(self, feature, min_q, max_q):
"""分层回测"""
df = self.dfs.copy()
# q 是截面排序,按照特征值从大到小排序,q的取值范围是[0, 1]
df['q'] = df.groupby('dt')[feature].transform(lambda x: x.rank(pct=True))
df1 = df[(df['q'] >= min_q) & (df['q'] <= max_q)]
dfm = df1.groupby('dt').agg({'n1b': 'mean'}).fillna(0)
logger.info(f"分层累计收益 {feature} {min_q}-{max_q}:{dfm['n1b'].sum():.4f}; {daily_performance((dfm['n1b'] / 10000).to_list())}")
return dfm
[docs] def report(self):
"""打印特征分析报告"""
results_path = self.kwargs.get('results_path', None)
if results_path:
os.makedirs(results_path, exist_ok=True)
dfs = self.dfs.copy()
dfs['freq'] = dfs['freq'].apply(lambda x: x.value)
dfs.drop(['cache'], axis=1, inplace=True)
logger.add(os.path.join(results_path, "feature.log"), rotation="1 week", encoding='utf-8')
dfs.to_feather(os.path.join(results_path, "features.feather"))
logger.info(f"新增特征:{self.new_features}")
corr_method = self.kwargs.get('corr_method', 'pearson')
logger.info(f"相关系数计算方法:{corr_method}")
for feature in self.new_features:
logger.info(f"特征 {feature} 的取值范围:{self.dfs[feature].describe().round(4).to_dict()}")
df1, res1 = cross_sectional_ic(self.dfs, x_col=feature, y_col='n1b', method=corr_method, dt_col='dt')
logger.info(f"特征 {feature} 与未来1日收益的相关系数:{res1}")
_ = self.layering(feature, 0.95, 1)
_ = self.layering(feature, 0.9, 1)
_ = self.layering(feature, 0, 0.1)
_ = self.layering(feature, 0, 0.05)
df1['年月'] = df1['dt'].apply(lambda x: x.strftime("%Y年%m月"))
dfm = df1.groupby('年月').agg({'ic': 'mean'})
logger.info(f"特征 {feature} 与未来1日收益的相关系数月度描述:{dfm.describe().round(4).to_dict()}")
logger.info(f"特征 {feature} 与未来1日收益的相关系数月度胜率:{len(dfm[dfm['ic'] > 0])/len(dfm):.4f}")
[docs]class FixedNumberSelector:
"""选择固定数量(等权)的交易品种
可优化项:
1. 传入 res_path, 将分析过程和分析结果保存下来
2. 支持传入大盘择时信号,例如:大盘择时信号为空头时,多头只平不开
"""
def __init__(self, dfs, k, d, **kwargs):
"""
:param dfs: pd.DataFrame,所有交易品种的特征打分数据,必须包含以下列:dt, open, close, high, low, vol, amount, score;数据样例:
=================== ========= ======= ======= ======= ======= ======== ========= ======== =============
dt symbol open close high low vol amount n1b score
=================== ========= ======= ======= ======= ======= ======== ========= ======== =============
2017-01-03 00:00:00 000001.SZ 954.345 959.583 961.678 952.25 45984049 420595176 21.8583 nan
2017-01-04 00:00:00 000001.SZ 958.536 959.583 961.678 957.488 44932953 411503444 0 0
2017-01-05 00:00:00 000001.SZ 960.631 960.631 961.678 958.536 34437291 315769693 -43.6213 3.17018e-11
2017-01-06 00:00:00 000001.SZ 960.631 956.441 960.631 954.345 35815420 327176433 21.9062 -1.21795e-10
2017-01-09 00:00:00 000001.SZ 956.441 958.536 960.631 954.345 36108157 329994604 -10.9292 6.06684e-11
2017-01-10 00:00:00 000001.SZ 958.536 958.536 959.583 957.488 24105395 220575131 -10.9411 0
2017-01-11 00:00:00 000001.SZ 957.488 957.488 960.631 956.441 30343089 277553207 10.9531 -3.60186e-11
2017-01-12 00:00:00 000001.SZ 956.441 958.536 960.631 956.441 42800677 391869402 10.9411 2.5563e-11
2017-01-13 00:00:00 000001.SZ 957.488 959.583 962.726 955.393 43430137 397601906 -32.7865 2.51649e-11
2017-01-16 00:00:00 000001.SZ 958.536 957.488 959.583 950.155 68316586 623025820 21.9292 -3.19607e-11
=================== ========= ======= ======= ======= ======= ======== ========= ======== =============
:param k: int,每期固定选择的数量
:param d: int,每期允许变动的数量
"""
logger.info(f"选择固定数量的交易品种,k={k},d={d}, dfs.shape={dfs.shape}, kwargs={kwargs}")
self.dfs = dfs # 所有交易品种的特征打分数据,必须包含以下列:dt, open, close, high, low, vol, amount, score
self.k = k # 每期固定选择的数量
self.d = d # 每期允许变动的数量
self.kwargs = kwargs
self.is_stocks = kwargs.get('is_stocks', False) # 是否是A股,如果是A股,需要考虑涨跌停的情况
self.__preprocess()
self.operate_fee = kwargs.get('operate_fee', 15) # 单边手续费+交易滑点,单位:BP
self.holds = {} # 每期持有的品种
self.operates = {} # 每期操作的品种
for dt in self.dts:
self.__deal_one_time(dt)
def __preprocess(self):
assert 'dt' in self.dfs.columns, "必须包含dt列"
assert 'n1b' in self.dfs.columns, "必须包含n1b列"
assert 'symbol' in self.dfs.columns, "必须包含symbol列"
assert 'score' in self.dfs.columns, "必须包含score列, 这是选择交易品种的依据"
self.dfs['dt'] = pd.to_datetime(self.dfs['dt']).dt.strftime("%Y-%m-%d %H:%M:%S")
dts = sorted(self.dfs['dt'].unique())
last_dt_map = {dt: dts[i-1] for i, dt in enumerate(dts)}
self.dts, self.last_dt_map = dts, last_dt_map
self.score_map = {dt: dfg[['symbol', 'dt', 'open', 'close', 'high', 'low', 'score', 'n1b']].copy() for dt, dfg in self.dfs.groupby('dt')}
def __deal_one_time(self, dt):
"""单次调整记录"""
k, d, is_stocks = self.k, self.d, self.is_stocks
score = self.score_map[dt]
if is_stocks:
zt_symbols = [x['symbol'] for _, x in score.iterrows() if x['close'] == x['high'] >= x['open']]
dt_symbols = [x['symbol'] for _, x in score.iterrows() if x['close'] == x['low'] <= x['open']]
score_a = score[~score.symbol.isin(zt_symbols + dt_symbols)].copy()
logger.info(f"A股今日{dt}涨停{len(zt_symbols)}个品种,跌停{len(dt_symbols)}个品种,已跳过")
else:
score_a = score.copy()
if not self.holds:
logger.info(f"当前持仓为空,选择前{k}个品种")
assert not self.operates, "当holds是空的时候,操作记录必须为空"
_df = score_a.sort_values(by='score', ascending=False).head(k)
_df['edge'] = _df['n1b'] - self.operate_fee
self.holds[dt] = _df
_df_operates = [{'symbol': row['symbol'], 'dt': dt, 'action': 'buy', 'price': row['close']} for _, row in _df.iterrows()]
self.operates[dt] = pd.DataFrame(_df_operates)
return
# 有持仓的情况
score = self.score_map[dt]
last_dt = self.last_dt_map[dt]
last_holds = self.holds[last_dt].copy()
last_symbols = last_holds['symbol'].tolist()
skip_symbols = [x for x in last_symbols if x not in score['symbol'].tolist()]
if skip_symbols:
logger.warning(f"【数据缺陷提示】上一期持仓中,有{len(skip_symbols)}个品种,本期{dt}不在交易品种中,已跳过: {skip_symbols}")
topk_symbols = score_a.sort_values(by='score', ascending=False).head(k)['symbol'].tolist()
sell_symbols = score_a[score_a.symbol.isin(last_symbols)].sort_values(by='score', ascending=False).tail(d)['symbol'].tolist()
sell_symbols = [x for x in sell_symbols if x not in topk_symbols] + skip_symbols
keep_symbols = [x for x in last_symbols if x not in sell_symbols]
if len(keep_symbols) != k - len(sell_symbols):
logger.warning(f"保持品种数量不对,当前只有{len(keep_symbols)}个品种")
buy_symbols = score_a[~score_a.symbol.isin(keep_symbols)].sort_values(by='score', ascending=False).head(len(sell_symbols))['symbol'].tolist()
assert len(buy_symbols) == len(sell_symbols), "买入品种数量必须等于卖出品种数量"
assert len(keep_symbols + buy_symbols) == k, "保持品种数量+买入品种数量必须等于k"
_df = score[score.symbol.isin(keep_symbols + buy_symbols)].sort_values(by='score', ascending=False)
if len(_df) != k:
logger.warning(f"选择的品种数量不等于{k},当前只有{len(_df)}个品种")
_df['edge'] = _df.apply(lambda row: row['n1b'] - self.operate_fee if row['symbol'] in buy_symbols else row['n1b'], axis=1)
self.holds[dt] = _df
# 平仓扣费,在上一期的持仓中,卖出的品种,需要扣除手续费
last_holds['edge'] = last_holds.apply(lambda row: row['edge'] - self.operate_fee if row['symbol'] in sell_symbols else row['edge'], axis=1)
self.holds[last_dt] = last_holds
_sell_operates = [{'symbol': row['symbol'], 'dt': dt, 'action': 'sell', 'price': row['close']}
for _, row in score[score.symbol.isin(sell_symbols)].iterrows()]
_buy_operates = [{'symbol': row['symbol'], 'dt': dt, 'action': 'buy', 'price': row['close']}
for _, row in score[score.symbol.isin(buy_symbols)].iterrows()]
_df_operates = pd.DataFrame(_sell_operates + _buy_operates)
self.operates[dt] = _df_operates