# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2021/11/17 18:50
"""
import pandas as pd
import numpy as np
from tqdm import tqdm
from collections import Counter
from typing import List
from sklearn.preprocessing import KBinsDiscretizer
from ..data import TsDataCache
[docs]def discretizer(df: pd.DataFrame, col: str, n_bins=20, encode='ordinal', strategy='quantile'):
"""使用 KBinsDiscretizer 对连续变量在时间截面上进行离散化
:param df: 数据对象
:param col: 连续变量列名
:param n_bins: 参见 KBinsDiscretizer 文档
https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.KBinsDiscretizer.html
:param encode: 参见 KBinsDiscretizer 文档
:param strategy: 参见 KBinsDiscretizer 文档
:return:
"""
assert col in df.columns, f'{col} not in {df.columns}'
assert 'dt' in df.columns
new_col = f'{col}_bins{n_bins}'
results = []
for dt, dfg in tqdm(df.groupby('dt'), desc=f"{col}_bins{n_bins}"):
kb = KBinsDiscretizer(n_bins=n_bins, encode=encode, strategy=strategy)
# 加1,使分组从1开始
dfg[new_col] = kb.fit_transform(dfg[col].values.reshape(-1, 1)).ravel() + 1
results.append(dfg)
df = pd.concat(results, ignore_index=True)
return df
[docs]def get_index_beta(dc: TsDataCache, sdt: str, edt: str, freq='D', file_xlsx=None, indices=None):
"""获取基准指数的Beta
:param dc: 数据缓存对象
:param sdt: 开始日期
:param edt: 结束日期
:param freq: K线周期,D 日线,W 周线,M 月线
:param file_xlsx: 结果保存文件
:param indices: 定义指数列表
:return:
"""
if not indices:
indices = ['000001.SH', '000016.SH', '000905.SH', '000300.SH', '399001.SZ', '399006.SZ']
beta = {}
p = []
for ts_code in indices:
df = dc.pro_bar(ts_code=ts_code, start_date=sdt, end_date=edt, freq=freq, asset="I", raw_bar=False)
beta[ts_code] = df
df = df.fillna(0)
start_i, end_i, mdd = max_draw_down(df['n1b'].to_list())
start_dt = df.iloc[start_i]['trade_date']
end_dt = df.iloc[end_i]['trade_date']
row = {
'标的': ts_code,
"开始日期": sdt,
"结束日期": edt,
"最大回撤": mdd,
"回撤开始": start_dt,
"回撤结束": end_dt,
"交易次数": len(df),
"交易胜率": round(len(df[df.n1b > 0]) / len(df), 4),
"累计收益": round(df.n1b.sum(), 4),
}
cols = [x for x in df.columns if x[0] == 'n' and x[-1] == 'b']
row.update({x: round(df[x].mean(), 4) for x in cols})
p.append(row)
dfp = pd.DataFrame(p)
if file_xlsx:
f = pd.ExcelWriter(file_xlsx)
dfp.to_excel(f, index=False, sheet_name="指数表现")
for name, df_ in beta.items():
df_.to_excel(f, index=False, sheet_name=name)
f.close()
else:
beta['dfp'] = dfp
return beta
def max_draw_down(n1b: List):
"""最大回撤
参考:https://blog.csdn.net/weixin_38997425/article/details/82915386
:param n1b: 逐个结算周期的收益列表,单位:BP,换算关系是 10000BP = 100%
如,n1b = [100.1, -90.5, 212.6],表示第一个结算周期收益为100.1BP,也就是1.001%,以此类推。
:return: 最大回撤起止位置和最大回撤
"""
curve = np.cumsum(n1b)
curve += 10000
# 获取结束位置
i = np.argmax((np.maximum.accumulate(curve) - curve) / np.maximum.accumulate(curve))
if i == 0:
return 0, 0, 0
# 获取开始位置
j = np.argmax(curve[:i])
mdd = int((curve[j] - curve[i]) / curve[j] * 10000) / 10000
return j, i, mdd
[docs]def turn_over_rate(df_holds: pd.DataFrame) -> [pd.DataFrame, float]:
"""计算持仓明细对应的组合换手率
:param df_holds: 每个交易日的持仓明细,数据样例如下
证券代码 成分日期 持仓权重
0 000576.SZ 2020-01-02 0.0099
1 000639.SZ 2020-01-02 0.0099
2 000803.SZ 2020-01-02 0.0099
3 000811.SZ 2020-01-02 0.0099
4 000829.SZ 2020-01-02 0.0099
:return: 组合换手率
"""
dft = pd.pivot_table(df_holds, index='成分日期', columns='证券代码', values='持仓权重', aggfunc='sum')
dft = dft.fillna(0)
df_turns = dft.diff().abs().sum(axis=1).reset_index()
df_turns.columns = ['date', 'change']
# 由于是 diff 计算,第一个时刻的仓位变化被忽视了,修改一下
sdt = df_holds['成分日期'].min()
df_turns.loc[(df_turns['date'] == sdt), 'change'] = df_holds[df_holds['成分日期'] == sdt]['持仓权重'].sum()
return df_turns, round(df_turns.change.sum() / 2, 4)
[docs]def holds_concepts_effect(holds: pd.DataFrame, concepts: dict, top_n=20, min_n=3, **kwargs):
"""股票持仓列表的板块效应
原理概述:在选股时,如果股票的概念板块与组合中的其他股票的概念板块有重合,那么这个股票的表现会更好。
函数计算逻辑:
1. 如果kwargs中存在'copy'键且对应值为True,则将holds进行复制。
2. 为holds添加'概念板块'列,该列的值是holds中'symbol'列对应的股票的概念板块列表,如果没有对应的概念板块则填充为空。
3. 添加'概念数量'列,该列的值是每个股票的概念板块数量。
4. 从holds中筛选出概念数量大于0的行,赋值给holds。
5. 创建空列表new_holds和空字典dt_key_concepts。
6. 对holds按照'dt'进行分组,遍历每个分组,计算板块效应。
a. 计算密集出现的概念,选取出现次数最多的前top_n个概念,赋值给key_concepts列表。
b. 将日期dt和对应的key_concepts存入dt_key_concepts字典。
c. 计算在密集概念中出现次数超过min_n的股票,将符合条件的股票添加到new_holds列表中。
7. 使用pd.concat将new_holds中的DataFrame进行合并,忽略索引,赋值给dfh。
8. 创建DataFrame dfk,其中包含日期(dt)和对应的强势概念(key_concepts)。
9. 返回dfh和dfk。
:param holds: 组合股票池数据,样例:
=================== ========= ==========
dt symbol weight
=================== ========= ==========
2023-05-09 00:00:00 601858.SH 0.00333333
2023-05-09 00:00:00 300502.SZ 0.00333333
2023-05-09 00:00:00 603258.SH 0.00333333
2023-05-09 00:00:00 300499.SZ 0.00333333
2023-05-09 00:00:00 300624.SZ 0.00333333
=================== ========= ==========
:param concepts: 股票的概念板块,样例:
{
'002507.SZ': ['电子商务', '超级品牌', '国企改革'],
'002508.SZ': ['家用电器', '杭州亚运会', '恒大概念']
}
:param top_n: 选取前 n 个密集概念
:param min_n: 单股票至少要有 n 个概念在 top_n 中
:return: 过滤后的选股结果,每个时间点的 top_n 概念
"""
if kwargs.get('copy', True):
holds = holds.copy()
holds['概念板块'] = holds['symbol'].map(concepts).fillna('')
holds['概念数量'] = holds['概念板块'].apply(len)
holds = holds[holds['概念数量'] > 0]
new_holds = []
dt_key_concepts = {}
for dt, dfg in tqdm(holds.groupby('dt'), desc='计算板块效应'):
# 计算密集出现的概念
key_concepts = [k for k, v in Counter([x for y in dfg['概念板块'] for x in y]).most_common(top_n)]
dt_key_concepts[dt] = key_concepts
# 计算在密集概念中出现次数超过min_n的股票
dfg['强势概念'] = dfg['概念板块'].apply(lambda x: ','.join(set(x) & set(key_concepts)))
sel = dfg[dfg['强势概念'].apply(lambda x: len(x.split(',')) >= min_n)]
new_holds.append(sel)
dfh = pd.concat(new_holds, ignore_index=True)
dfk = pd.DataFrame([{"dt": k, '强势概念': v} for k, v in dt_key_concepts.items()])
return dfh, dfk