Source code for czsc.sensors.utils

# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2021/11/17 18:50
"""
import pandas as pd
import numpy as np
from tqdm import tqdm
from collections import Counter
from typing import List
from deprecated import deprecated
from sklearn.preprocessing import KBinsDiscretizer
from ..data import TsDataCache


[docs]def discretizer(df: pd.DataFrame, col: str, n_bins=20, encode="ordinal", strategy="quantile"): """使用 KBinsDiscretizer 对连续变量在时间截面上进行离散化 :param df: 数据对象 :param col: 连续变量列名 :param n_bins: 参见 KBinsDiscretizer 文档 https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.KBinsDiscretizer.html :param encode: 参见 KBinsDiscretizer 文档 :param strategy: 参见 KBinsDiscretizer 文档 :return: """ assert col in df.columns, f"{col} not in {df.columns}" assert "dt" in df.columns new_col = f"{col}_bins{n_bins}" results = [] for dt, dfg in tqdm(df.groupby("dt"), desc=f"{col}_bins{n_bins}"): kb = KBinsDiscretizer(n_bins=n_bins, encode=encode, strategy=strategy) # 加1,使分组从1开始 dfg[new_col] = kb.fit_transform(dfg[col].values.reshape(-1, 1)).ravel() + 1 results.append(dfg) df = pd.concat(results, ignore_index=True) return df
[docs]def get_index_beta(dc: TsDataCache, sdt: str, edt: str, freq="D", file_xlsx=None, indices=None): """获取基准指数的Beta :param dc: 数据缓存对象 :param sdt: 开始日期 :param edt: 结束日期 :param freq: K线周期,D 日线,W 周线,M 月线 :param file_xlsx: 结果保存文件 :param indices: 定义指数列表 :return: """ if not indices: indices = ["000001.SH", "000016.SH", "000905.SH", "000300.SH", "399001.SZ", "399006.SZ"] beta = {} p = [] for ts_code in indices: df = dc.pro_bar(ts_code=ts_code, start_date=sdt, end_date=edt, freq=freq, asset="I", raw_bar=False) beta[ts_code] = df df = df.fillna(0) start_i, end_i, mdd = max_draw_down(df["n1b"].to_list()) start_dt = df.iloc[start_i]["trade_date"] end_dt = df.iloc[end_i]["trade_date"] row = { "标的": ts_code, "开始日期": sdt, "结束日期": edt, "最大回撤": mdd, "回撤开始": start_dt, "回撤结束": end_dt, "交易次数": len(df), "交易胜率": round(len(df[df.n1b > 0]) / len(df), 4), "累计收益": round(df.n1b.sum(), 4), } cols = [x for x in df.columns if x[0] == "n" and x[-1] == "b"] row.update({x: round(df[x].mean(), 4) for x in cols}) p.append(row) dfp = pd.DataFrame(p) if file_xlsx: f = pd.ExcelWriter(file_xlsx) dfp.to_excel(f, index=False, sheet_name="指数表现") for name, df_ in beta.items(): df_.to_excel(f, index=False, sheet_name=name) f.close() else: beta["dfp"] = dfp return beta
def max_draw_down(n1b: List): """最大回撤 参考:https://blog.csdn.net/weixin_38997425/article/details/82915386 :param n1b: 逐个结算周期的收益列表,单位:BP,换算关系是 10000BP = 100% 如,n1b = [100.1, -90.5, 212.6],表示第一个结算周期收益为100.1BP,也就是1.001%,以此类推。 :return: 最大回撤起止位置和最大回撤 """ curve = np.cumsum(n1b) curve += 10000 # 获取结束位置 i = np.argmax((np.maximum.accumulate(curve) - curve) / np.maximum.accumulate(curve)) if i == 0: return 0, 0, 0 # 获取开始位置 j = np.argmax(curve[:i]) mdd = int((curve[j] - curve[i]) / curve[j] * 10000) / 10000 return j, i, mdd
[docs]def turn_over_rate(df_holds: pd.DataFrame) -> [pd.DataFrame, float]: """计算持仓明细对应的组合换手率 :param df_holds: 每个交易日的持仓明细,数据样例如下 证券代码 成分日期 持仓权重 0 000576.SZ 2020-01-02 0.0099 1 000639.SZ 2020-01-02 0.0099 2 000803.SZ 2020-01-02 0.0099 3 000811.SZ 2020-01-02 0.0099 4 000829.SZ 2020-01-02 0.0099 :return: 组合换手率 """ dft = pd.pivot_table(df_holds, index="成分日期", columns="证券代码", values="持仓权重", aggfunc="sum") dft = dft.fillna(0) df_turns = dft.diff().abs().sum(axis=1).reset_index() df_turns.columns = ["date", "change"] # 由于是 diff 计算,第一个时刻的仓位变化被忽视了,修改一下 sdt = df_holds["成分日期"].min() df_turns.loc[(df_turns["date"] == sdt), "change"] = df_holds[df_holds["成分日期"] == sdt]["持仓权重"].sum() return df_turns, round(df_turns.change.sum() / 2, 4)
[docs]def holds_concepts_effect(holds: pd.DataFrame, concepts: dict, top_n=20, min_n=3, **kwargs): """股票持仓列表的板块效应 原理概述:在选股时,如果股票的概念板块与组合中的其他股票的概念板块有重合,那么这个股票的表现会更好。 函数计算逻辑: 1. 如果kwargs中存在'copy'键且对应值为True,则将holds进行复制。 2. 为holds添加'概念板块'列,该列的值是holds中'symbol'列对应的股票的概念板块列表,如果没有对应的概念板块则填充为空。 3. 添加'概念数量'列,该列的值是每个股票的概念板块数量。 4. 从holds中筛选出概念数量大于0的行,赋值给holds。 5. 创建空列表new_holds和空字典dt_key_concepts。 6. 对holds按照'dt'进行分组,遍历每个分组,计算板块效应。 a. 计算密集出现的概念,选取出现次数最多的前top_n个概念,赋值给key_concepts列表。 b. 将日期dt和对应的key_concepts存入dt_key_concepts字典。 c. 计算在密集概念中出现次数超过min_n的股票,将符合条件的股票添加到new_holds列表中。 7. 使用pd.concat将new_holds中的DataFrame进行合并,忽略索引,赋值给dfh。 8. 创建DataFrame dfk,其中包含日期(dt)和对应的强势概念(key_concepts)。 9. 返回dfh和dfk。 :param holds: 组合股票池数据,样例: =================== ========= ========== dt symbol weight =================== ========= ========== 2023-05-09 00:00:00 601858.SH 0.00333333 2023-05-09 00:00:00 300502.SZ 0.00333333 2023-05-09 00:00:00 603258.SH 0.00333333 2023-05-09 00:00:00 300499.SZ 0.00333333 2023-05-09 00:00:00 300624.SZ 0.00333333 =================== ========= ========== :param concepts: 股票的概念板块,样例: { '002507.SZ': ['电子商务', '超级品牌', '国企改革'], '002508.SZ': ['家用电器', '杭州亚运会', '恒大概念'] } :param top_n: 选取前 n 个密集概念 :param min_n: 单股票至少要有 n 个概念在 top_n 中 :return: 过滤后的选股结果,每个时间点的 top_n 概念 """ if kwargs.get("copy", True): holds = holds.copy() holds["概念板块"] = holds["symbol"].map(concepts).fillna("") holds["概念数量"] = holds["概念板块"].apply(len) holds = holds[holds["概念数量"] > 0] new_holds = [] dt_key_concepts = {} for dt, dfg in tqdm(holds.groupby("dt"), desc="计算板块效应"): # 计算密集出现的概念 key_concepts = [k for k, v in Counter([x for y in dfg["概念板块"] for x in y]).most_common(top_n)] dt_key_concepts[dt] = key_concepts # 计算在密集概念中出现次数超过min_n的股票 dfg["强势概念"] = dfg["概念板块"].apply(lambda x: ",".join(set(x) & set(key_concepts))) sel = dfg[dfg["强势概念"].apply(lambda x: len(x.split(",")) >= min_n)] new_holds.append(sel) dfh = pd.concat(new_holds, ignore_index=True) dfk = pd.DataFrame([{"dt": k, "强势概念": v} for k, v in dt_key_concepts.items()]) return dfh, dfk