Source code for czsc.features.utils

# 工具函数
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import minmax_scale, scale, maxabs_scale, robust_scale
from loguru import logger


[docs]def is_event_feature(df, col, **kwargs): """事件类因子的判断函数 事件因子的特征:多头事件发生时,因子值为1;空头事件发生时,因子值为-1;其他情况,因子值为0。 :param df: DataFrame :param col: str, 因子字段名称 """ unique_values = df[col].unique() return all([x in [0, 1, -1] for x in unique_values])
[docs]def rolling_corr(df, col1, col2, window=300, min_periods=100, **kwargs): """滚动计算两个序列的相关系数 :param df: pd.DataFrame :param col1: str :param col2: str :param window: int, default 300, 滚动窗口大小, None表示扩展窗口 :param min_periods: int, default 100, 最小观测数量, 用于计算相关系数的最小观测数量 """ if kwargs.get("copy", False): df = df.copy() assert isinstance(df, pd.DataFrame), "df must be pd.DataFrame" assert col1 in df.columns, f"{col1} not in df.columns" assert col2 in df.columns, f"{col2} not in df.columns" new_col = kwargs.get("new_col", f"{col1}_corr_{col2}") assert min_periods < window, "min_periods must be less than window" df[new_col] = df[col1].rolling(window=window, min_periods=min_periods).corr(df[col2]).fillna(0) return df
[docs]def rolling_rank(df: pd.DataFrame, col, window=300, min_periods=100, new_col=None, **kwargs): """计算序列的滚动排名 :param df: pd.DataFrame, 待计算的数据 :param col: str, 待计算的列 :param window: int, 滚动窗口大小, 默认为300 :param min_periods: int, 最小计算周期, 默认为100 :param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_rank' 作为新列名 """ if kwargs.get("copy", False): df = df.copy() min_periods = kwargs.get("min_periods", 2) new_col = new_col if new_col else f"{col}_rank" df[new_col] = df[col].rolling(window=window, min_periods=min_periods).rank(pct=True) df[new_col] = df[new_col].fillna(0) return df
[docs]def rolling_norm(df: pd.DataFrame, col, window=300, min_periods=100, new_col=None, **kwargs): """计算序列的滚动归一化值 :param df: pd.DataFrame, 待计算的数据 :param col: str, 待计算的列 :param window: int, 滚动窗口大小, 默认为300 :param min_periods: int, 最小计算周期, 默认为100 :param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_norm' 作为新列名 """ if kwargs.get("copy", False): df = df.copy() min_periods = kwargs.get("min_periods", 2) new_col = new_col if new_col else f"{col}_norm" df[new_col] = ( df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: (x[-1] - x.mean()) / x.std(), raw=True) ) df[new_col] = df[new_col].fillna(0) return df
[docs]def rolling_qcut(df: pd.DataFrame, col, window=300, min_periods=100, new_col=None, **kwargs): """计算序列的滚动分位数 :param df: pd.DataFrame, 待计算的数据 :param col: str, 待计算的列 :param window: int, 滚动窗口大小, 默认为300 :param min_periods: int, 最小计算周期, 默认为100 :param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_qcut' 作为新列名 """ if kwargs.get("copy", False): df = df.copy() q = kwargs.get("q", 10) min_periods = kwargs.get("min_periods", q) new_col = new_col if new_col else f"{col}_qcut" def __qcut_func(x): return pd.qcut(x, q=q, labels=False, duplicates="drop")[-1] df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(__qcut_func, raw=True) df[new_col] = df[new_col].fillna(-1) return df
[docs]def rolling_compare(df, col1, col2, window=300, min_periods=100, new_col=None, **kwargs): """计算序列的滚动归一化值 :param df: pd.DataFrame 待计算的数据 :param col1: str 第一个列名 :param col2: str 第二个列名 :param window: int 滚动窗口大小, 默认为300 :param new_col: str 新列名,默认为 None, 表示使用 f'{col}_norm' 作为新列名 :param kwargs: min_periods: int 最小计算周期 """ window = kwargs.get("window", 300) min_periods = kwargs.get("min_periods", 2) new_col = new_col if new_col else f"compare_{col1}_{col2}" method = kwargs.get("method", "sub") assert method in [ "sub", "divide", "lr_intercept", "lr_coef", ], "method 必须为 sub, divide, lr_intercept, lr_coef 中的一种" for i in range(len(df)): dfi = df.loc[i - window + 1 : i, [col1, col2]] dfi = dfi.copy() if i < min_periods: df.loc[i, new_col] = 0 continue if method == "sub": df.loc[i, new_col] = dfi[col1].sub(dfi[col2]).mean() elif method == "divide": df.loc[i, new_col] = dfi[col1].divide(dfi[col2]).mean() elif method == "lr_intercept": x = dfi[col2].values.reshape(-1, 1) y = dfi[col1].values.reshape(-1, 1) reg = LinearRegression().fit(x, y) df.loc[i, new_col] = reg.intercept_[0] elif method == "lr_coef": x = dfi[col2].values.reshape(-1, 1) y = dfi[col1].values.reshape(-1, 1) reg = LinearRegression().fit(x, y) df.loc[i, new_col] = reg.coef_[0][0] else: raise ValueError(f"method {method} not support")
[docs]def rolling_scale(df: pd.DataFrame, col: str, window=300, min_periods=100, new_col=None, **kwargs): """对序列进行滚动归一化 :param df: pd.DataFrame, 待计算的数据 :param col: str, 待计算的列 :param window: int, 滚动窗口大小, 默认为300 :param min_periods: int, 最小计算周期, 默认为100 :param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_scale' 作为新列名 """ if kwargs.get("copy", False): df = df.copy() df = df.sort_values("dt", ascending=True).reset_index(drop=True) new_col = new_col if new_col else f"{col}_scale" method = kwargs.get("method", "scale") method_map = { "scale": scale, "minmax_scale": minmax_scale, "maxabs_scale": maxabs_scale, "robust_scale": robust_scale, } assert method in method_map, f"method must be one of {list(method_map.keys())}" scale_method = method_map[method] if method == "minmax_scale": df[new_col] = ( df[col] .rolling(window=window, min_periods=min_periods) .apply(lambda x: minmax_scale(x, feature_range=(-1, 1))[-1]) ) else: df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: scale_method(x)[-1]) df[new_col] = df[new_col].fillna(0) return df
[docs]def rolling_tanh(df: pd.DataFrame, col: str, window=300, min_periods=100, new_col=None, **kwargs): """对序列进行滚动 tanh 变换 双曲正切函数:https://baike.baidu.com/item/%E5%8F%8C%E6%9B%B2%E6%AD%A3%E5%88%87%E5%87%BD%E6%95%B0/15469414 :param df: pd.DataFrame, 待计算的数据 :param col: str, 待计算的列 :param window: int, 滚动窗口大小, 默认为300 :param min_periods: int, 最小计算周期, 默认为100 :param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_scale' 作为新列名 """ if kwargs.get("copy", False): df = df.copy() new_col = new_col if new_col else f"{col}_tanh" df = df.sort_values("dt", ascending=True).reset_index(drop=True) df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: np.tanh(scale(x))[-1]) # type: ignore df[new_col] = df[new_col].fillna(0) return df
[docs]def rolling_slope(df: pd.DataFrame, col: str, window=300, min_periods=100, new_col=None, **kwargs): """计算序列的滚动斜率 大于0表示序列的斜率向上,小于0表示序列的斜率向下,绝对值越大表示斜率越陡峭 :param df: pd.DataFrame, 待计算的数据 :param col: str, 待计算的列 :param window: int, 滚动窗口大小, 默认为300 :param min_periods: int, 最小计算周期, 默认为100 :param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_slope' 作为新列名 :param kwargs: - min_periods: int, 最小计算周期 - method: str, 计算方法 - linear: 使用线性回归计算斜率 - std/mean: 使用序列的 std/mean 计算斜率 - snr: 使用序列的 snr 计算斜率 """ method = kwargs.get("method", "linear") new_col = new_col if new_col else f"{col}_slope_{method}" if method == "linear": # 使用线性回归计算斜率 def __lr_slope(x): return LinearRegression().fit(list(range(len(x))), x).coef_[0] df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(__lr_slope, raw=True) elif method == "std/mean": # 用 window 内 std 的变化率除以 mean 的变化率,来衡量序列的斜率 # 如果 std/mean > 0, 则表示序列的斜率在变大,反之则表示序列的斜率在变小 df["temp_std"] = df[col].rolling(window=window, min_periods=min_periods).std().pct_change(window) df["temp_mean"] = df[col].rolling(window=window, min_periods=min_periods).mean().pct_change(window) df[new_col] = np.where(df["temp_mean"] != 0, df["temp_std"] / df["temp_mean"], 0) # 加入变化率的正负号 df[new_col] = df[new_col] * np.sign(df[col].pct_change(window)) df.drop(["temp_std", "temp_mean"], axis=1, inplace=True) elif method == "snr": # 用 window 内的信噪比变化率来衡量序列的斜率 df[new_col] = df[col].diff(window) / df[col].diff().abs().rolling(window=window, min_periods=min_periods).sum() else: raise ValueError(f"Unknown method: {method}") df[new_col] = df[new_col].fillna(0) return df
[docs]def normalize_corr(df: pd.DataFrame, fcol, ycol=None, **kwargs): """标准化因子与收益相关性为正数 方法说明:对因子进行滚动相关系数计算,因子乘以滚动相关系数的符号 **注意:** 1. simple 模式下,计算过程有一定的未来信息泄露,在回测中使用时需要注意 2. rolling 模式下,计算过程依赖 window 参数,有可能调整后相关性为负数 :param df: pd.DataFrame, 必须包含 dt、symbol、price 列,以及因子列 :param fcol: str 因子列名 :param kwargs: dict - window: int, 滚动窗口大小 - min_periods: int, 最小计算周期 - mode: str, 计算方法, rolling 表示使用滚动调整相关系数,simple 表示使用镜像反转相关系数 - copy: bool, 是否复制 df :return: pd.DataFrame """ window = kwargs.get("window", 1000) min_periods = kwargs.get("min_periods", 5) mode = kwargs.get("mode", "rolling") if kwargs.get("copy", False): df = df.copy() df = df.sort_values(["symbol", "dt"], ascending=True).reset_index(drop=True) for symbol, dfg in df.groupby("symbol"): dfg["ycol"] = dfg["price"].pct_change().shift(-1) if mode.lower() == "rolling": dfg["corr_sign"] = np.sign(dfg[fcol].rolling(window=window, min_periods=min_periods).corr(dfg["ycol"])) dfg[fcol] = (dfg["corr_sign"].shift(3) * dfg[fcol]).fillna(0) elif mode.lower() == "simple": corr_sign = np.sign(dfg[fcol].corr(dfg["ycol"])) dfg[fcol] = corr_sign * dfg[fcol] else: raise ValueError(f"Unknown mode: {mode}") df.loc[df["symbol"] == symbol, fcol] = dfg[fcol] return df
def feature_adjust_V230101(df: pd.DataFrame, fcol, **kwargs): """特征调整函数:对特征进行调整,使其符合持仓权重的定义 方法说明:对因子进行滚动相关系数计算,然后对因子值用 maxabs_scale 进行归一化,最后乘以滚动相关系数的符号 :param df: pd.DataFrame, 必须包含 dt、symbol、price 列,以及因子列 :param fcol: str 因子列名 :param kwargs: dict """ window = kwargs.get("window", 1000) min_periods = kwargs.get("min_periods", 200) df = df.copy().sort_values("dt", ascending=True).reset_index(drop=True) df["n1b"] = df["price"].shift(-1) / df["price"] - 1 df["corr"] = df[fcol].rolling(window=window, min_periods=min_periods).corr(df["n1b"]) df["corr"] = df["corr"].shift(5).fillna(0) df = rolling_scale( df, col=fcol, window=window, min_periods=min_periods, new_col="weight", method="maxabs_scale", copy=True ) df["weight"] = df["weight"] * np.sign(df["corr"]) df.drop(["n1b", "corr"], axis=1, inplace=True) return df def feature_adjust_V240323(df: pd.DataFrame, fcol, **kwargs): """特征调整函数:对特征进行调整,使其符合持仓权重的定义 方法说明:对因子进行滚动相关系数计算,然后对因子值用 scale + tanh 进行归一化,最后乘以滚动相关系数的符号 :param df: pd.DataFrame, 必须包含 dt、symbol、price 列,以及因子列 :param fcol: str 因子列名 :param kwargs: dict """ window = kwargs.get("window", 1000) min_periods = kwargs.get("min_periods", 200) df = df.copy().sort_values("dt", ascending=True).reset_index(drop=True) df["n1b"] = df["price"].shift(-1) / df["price"] - 1 df["corr"] = df[fcol].rolling(window=window, min_periods=min_periods).corr(df["n1b"]) df["corr"] = df["corr"].shift(5).fillna(0) df = rolling_tanh(df, col=fcol, window=window, min_periods=min_periods, new_col="weight") df["weight"] = df["weight"] * np.sign(df["corr"]) df.drop(["n1b", "corr"], axis=1, inplace=True) return df
[docs]def feature_adjust(df: pd.DataFrame, fcol, method, **kwargs): """特征调整函数:对特征进行调整,使其符合持仓权重的定义 :param df: pd.DataFrame, 待调整的数据 :param fcol: str, 因子列名 :param method: str, 调整方法 - KEEP: 直接使用原始因子值作为权重 - V230101: 对因子进行滚动相关系数计算,然后对因子值用 maxabs_scale 进行归一化,最后乘以滚动相关系数的符号 - V240323: 对因子进行滚动相关系数计算,然后对因子值用 scale + tanh 进行归一化,最后乘以滚动相关系数的符号 :param kwargs: dict - window: int, 滚动窗口大小 - min_periods: int, 最小计算周期 :return: pd.DataFrame, 新增 weight 列 """ if method == "KEEP": df["weight"] = df[fcol] return df if method == "V230101": return feature_adjust_V230101(df, fcol, **kwargs) elif method == "V240323": return feature_adjust_V240323(df, fcol, **kwargs) else: raise ValueError(f"Unknown method: {method}")
[docs]def feature_to_weight(df, factor, positive, **kwargs): """时序因子转换为持仓权重 处理流程: 1. 缩尾处理:去除极端值 2. scale 缩放,均值为0 3. maxabs_scale 缩放至 [-1, 1] :param df: pd.DataFrame, 包含因子列的数据 :param factor: str, 因子列名 :param positive: bool, 因子是否为正向因子 :param kwargs: - window: int, 计算窗口长度,默认为1000 - min_periods: int, 最小计算窗口长度,默认为100 - q_threshold: float, 缩尾阈值,默认为0.05 """ window = kwargs.get("window", 1000) min_periods = kwargs.get("min_periods", 100) q_threshold = kwargs.get("q_threshold", 0.05) assert df["symbol"].nunique() == 1, "必须按品种计算权重" # 缩尾处理 df["upper"] = df[factor].rolling(window, min_periods).quantile(1 - q_threshold) df["lower"] = df[factor].rolling(window, min_periods).quantile(q_threshold) df[factor] = df[factor].clip(lower=df["lower"], upper=df["upper"]) # scale 缩放,均值为0 df["norm"] = df[factor].rolling(window, min_periods).apply(lambda x: scale(x)[-1]) # maxabs_scale 缩放至 [-1, 1] df["weight"] = df["norm"].rolling(window, min_periods).apply(lambda x: maxabs_scale(x)[-1]) df["weight"] = df["weight"].fillna(0) if not positive: df["weight"] = -df["weight"] df.drop(["upper", "lower", "norm"], axis=1, inplace=True) return df
[docs]def feature_returns(df, factor, target="n1b", **kwargs): """计算因子特征截面收益率 :param df: pd.DataFrame, 必须包含 dt、symbol、factor, target 列 :param factor: str, 因子列名 :param target: str, 预测目标收益率列名 :param kwargs: - fit_intercept: bool, 是否拟合截距项,默认为 False :return: pd.DataFrame, 新增 returns 列 """ from sklearn.linear_model import LinearRegression df = df.copy() fit_intercept = kwargs.get("fit_intercept", False) ret = [] for dt, dfg in df.groupby("dt"): dfg = dfg.copy().dropna(subset=[factor, target]) if dfg.empty or len(dfg) < 5: ret.append([dt, 0]) logger.warning(f"{dt} has no enough data, only {len(dfg)} rows") continue x = dfg[factor].values.reshape(-1, 1) y = dfg[target].values.reshape(-1, 1) model = LinearRegression(fit_intercept=fit_intercept).fit(x, y) ret.append([dt, model.coef_[0][0]]) dft = pd.DataFrame(ret, columns=["dt", "returns"]) return dft
[docs]def feature_sectional_corr(df, factor, target="n1b", method="pearson", **kwargs): """计算因子特征截面相关性(IC) :param df:数据,DateFrame格式 :param factor:因子列名,一般采用F#开头的列 :param target:目标列名,一般为n1b :param method:{'pearson', 'kendall', 'spearman'} or callable * pearson : standard correlation coefficient * kendall : Kendall Tau correlation coefficient * spearman : Spearman rank correlation * callable: callable with input two 1d ndarrays and returning a float :return:df,res: 前者是每日相关系数结果,后者是每日相关系数的统计结果 """ from czsc.utils import single_linear df = df.copy() corr = [] for dt, dfg in df.groupby("dt"): dfg = dfg.copy().dropna(subset=[factor, target]) if dfg.empty or len(dfg) < 5: corr.append([dt, 0]) logger.warning(f"{dt} has no enough data, only {len(dfg)} rows") else: c = dfg[factor].corr(dfg[target], method=method) corr.append([dt, c]) dft = pd.DataFrame(corr, columns=["dt", "corr"]) res = { "factor": factor, "target": target, "method": method, "IC均值": 0, "IC标准差": 0, "ICIR": 0, "IC胜率": 0, "累计IC回归R2": 0, "累计IC回归斜率": 0, } if dft.empty: return dft, res dft = dft[~dft["ic"].isnull()].copy() ic_avg = dft["ic"].mean() ic_std = dft["ic"].std() res["IC均值"] = round(ic_avg, 4) res["IC标准差"] = round(ic_std, 4) res["ICIR"] = round(ic_avg / ic_std, 4) if ic_std != 0 else 0 if ic_avg > 0: res["IC胜率"] = round(len(dft[dft["ic"] > 0]) / len(dft), 4) else: res["IC胜率"] = round(len(dft[dft["ic"] < 0]) / len(dft), 4) lr_ = single_linear(y=dft["ic"].cumsum().to_list()) res.update({"累计IC回归R2": lr_["r2"], "累计IC回归斜率": lr_["slope"]}) return dft, res