# 工具函数
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import minmax_scale, scale, maxabs_scale, robust_scale
[docs]def is_event_feature(df, col, **kwargs):
"""事件类因子的判断函数
事件因子的特征:多头事件发生时,因子值为1;空头事件发生时,因子值为-1;其他情况,因子值为0。
:param df: DataFrame
:param col: str, 因子字段名称
"""
unique_values = df[col].unique()
return all([x in [0, 1, -1] for x in unique_values])
[docs]def rolling_corr(df, col1, col2, window=300, min_periods=100, **kwargs):
"""滚动计算两个序列的相关系数
:param df: pd.DataFrame
:param col1: str
:param col2: str
:param window: int, default 300, 滚动窗口大小, None表示扩展窗口
:param min_periods: int, default 100, 最小观测数量, 用于计算相关系数的最小观测数量
"""
if kwargs.get("copy", False):
df = df.copy()
assert isinstance(df, pd.DataFrame), "df must be pd.DataFrame"
assert col1 in df.columns, f"{col1} not in df.columns"
assert col2 in df.columns, f"{col2} not in df.columns"
new_col = kwargs.get("new_col", f"{col1}_corr_{col2}")
assert min_periods < window, "min_periods must be less than window"
df[new_col] = df[col1].rolling(window=window, min_periods=min_periods).corr(df[col2]).fillna(0)
return df
[docs]def rolling_rank(df: pd.DataFrame, col, window=300, min_periods=100, new_col=None, **kwargs):
"""计算序列的滚动排名
:param df: pd.DataFrame, 待计算的数据
:param col: str, 待计算的列
:param window: int, 滚动窗口大小, 默认为300
:param min_periods: int, 最小计算周期, 默认为100
:param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_rank' 作为新列名
"""
if kwargs.get("copy", False):
df = df.copy()
min_periods = kwargs.get('min_periods', 2)
new_col = new_col if new_col else f'{col}_rank'
df[new_col] = df[col].rolling(window=window, min_periods=min_periods).rank(pct=True)
df[new_col] = df[new_col].fillna(0)
return df
[docs]def rolling_norm(df: pd.DataFrame, col, window=300, min_periods=100, new_col=None, **kwargs):
"""计算序列的滚动归一化值
:param df: pd.DataFrame, 待计算的数据
:param col: str, 待计算的列
:param window: int, 滚动窗口大小, 默认为300
:param min_periods: int, 最小计算周期, 默认为100
:param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_norm' 作为新列名
"""
if kwargs.get("copy", False):
df = df.copy()
min_periods = kwargs.get('min_periods', 2)
new_col = new_col if new_col else f'{col}_norm'
df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: (x[-1] - x.mean()) / x.std(), raw=True)
df[new_col] = df[new_col].fillna(0)
return df
[docs]def rolling_qcut(df: pd.DataFrame, col, window=300, min_periods=100, new_col=None, **kwargs):
"""计算序列的滚动分位数
:param df: pd.DataFrame, 待计算的数据
:param col: str, 待计算的列
:param window: int, 滚动窗口大小, 默认为300
:param min_periods: int, 最小计算周期, 默认为100
:param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_qcut' 作为新列名
"""
if kwargs.get("copy", False):
df = df.copy()
q = kwargs.get('q', 10)
min_periods = kwargs.get('min_periods', q)
new_col = new_col if new_col else f'{col}_qcut'
def __qcut_func(x):
return pd.qcut(x, q=q, labels=False, duplicates='drop')[-1]
df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(__qcut_func, raw=True)
df[new_col] = df[new_col].fillna(-1)
return df
[docs]def rolling_compare(df, col1, col2, window=300, min_periods=100, new_col=None, **kwargs):
"""计算序列的滚动归一化值
:param df: pd.DataFrame
待计算的数据
:param col1: str
第一个列名
:param col2: str
第二个列名
:param window: int
滚动窗口大小, 默认为300
:param new_col: str
新列名,默认为 None, 表示使用 f'{col}_norm' 作为新列名
:param kwargs:
min_periods: int
最小计算周期
"""
window = kwargs.get('window', 300)
min_periods = kwargs.get('min_periods', 2)
new_col = new_col if new_col else f'compare_{col1}_{col2}'
method = kwargs.get('method', 'sub')
assert method in ['sub', 'divide', 'lr_intercept', 'lr_coef'], "method 必须为 sub, divide, lr_intercept, lr_coef 中的一种"
for i in range(len(df)):
dfi = df.loc[i - window + 1:i, [col1, col2]]
dfi = dfi.copy()
if i < min_periods:
df.loc[i, new_col] = 0
continue
if method == 'sub':
df.loc[i, new_col] = dfi[col1].sub(dfi[col2]).mean()
elif method == 'divide':
df.loc[i, new_col] = dfi[col1].divide(dfi[col2]).mean()
elif method == 'lr_intercept':
x = dfi[col2].values.reshape(-1, 1)
y = dfi[col1].values.reshape(-1, 1)
reg = LinearRegression().fit(x, y)
df.loc[i, new_col] = reg.intercept_[0]
elif method == 'lr_coef':
x = dfi[col2].values.reshape(-1, 1)
y = dfi[col1].values.reshape(-1, 1)
reg = LinearRegression().fit(x, y)
df.loc[i, new_col] = reg.coef_[0][0]
else:
raise ValueError(f"method {method} not support")
[docs]def rolling_scale(df: pd.DataFrame, col: str, window=300, min_periods=100, new_col=None, **kwargs):
"""对序列进行滚动归一化
:param df: pd.DataFrame, 待计算的数据
:param col: str, 待计算的列
:param window: int, 滚动窗口大小, 默认为300
:param min_periods: int, 最小计算周期, 默认为100
:param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_scale' 作为新列名
"""
if kwargs.get("copy", False):
df = df.copy()
df = df.sort_values("dt", ascending=True).reset_index(drop=True)
new_col = new_col if new_col else f'{col}_scale'
method = kwargs.get("method", "scale")
method_map = {
"scale": scale,
"minmax_scale": minmax_scale,
"maxabs_scale": maxabs_scale,
"robust_scale": robust_scale
}
assert method in method_map, f"method must be one of {list(method_map.keys())}"
scale_method = method_map[method]
if method == "minmax_scale":
df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: minmax_scale(x, feature_range=(-1, 1))[-1])
else:
df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: scale_method(x)[-1])
df[new_col] = df[new_col].fillna(0)
return df
[docs]def rolling_tanh(df: pd.DataFrame, col: str, window=300, min_periods=100, new_col=None, **kwargs):
"""对序列进行滚动 tanh 变换
双曲正切函数:https://baike.baidu.com/item/%E5%8F%8C%E6%9B%B2%E6%AD%A3%E5%88%87%E5%87%BD%E6%95%B0/15469414
:param df: pd.DataFrame, 待计算的数据
:param col: str, 待计算的列
:param window: int, 滚动窗口大小, 默认为300
:param min_periods: int, 最小计算周期, 默认为100
:param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_scale' 作为新列名
"""
if kwargs.get("copy", False):
df = df.copy()
new_col = new_col if new_col else f'{col}_tanh'
df = df.sort_values("dt", ascending=True).reset_index(drop=True)
df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(lambda x: np.tanh(scale(x))[-1]) # type: ignore
df[new_col] = df[new_col].fillna(0)
return df
[docs]def rolling_slope(df: pd.DataFrame, col: str, window=300, min_periods=100, new_col=None, **kwargs):
"""计算序列的滚动斜率
大于0表示序列的斜率向上,小于0表示序列的斜率向下,绝对值越大表示斜率越陡峭
:param df: pd.DataFrame, 待计算的数据
:param col: str, 待计算的列
:param window: int, 滚动窗口大小, 默认为300
:param min_periods: int, 最小计算周期, 默认为100
:param new_col: str, 新列名,默认为 None, 表示使用 f'{col}_slope' 作为新列名
:param kwargs:
- min_periods: int, 最小计算周期
- method: str, 计算方法
- linear: 使用线性回归计算斜率
- std/mean: 使用序列的 std/mean 计算斜率
- snr: 使用序列的 snr 计算斜率
"""
method = kwargs.get('method', 'linear')
new_col = new_col if new_col else f'{col}_slope_{method}'
if method == 'linear':
# 使用线性回归计算斜率
def __lr_slope(x):
return LinearRegression().fit(list(range(len(x))), x).coef_[0]
df[new_col] = df[col].rolling(window=window, min_periods=min_periods).apply(__lr_slope, raw=True)
elif method == 'std/mean':
# 用 window 内 std 的变化率除以 mean 的变化率,来衡量序列的斜率
# 如果 std/mean > 0, 则表示序列的斜率在变大,反之则表示序列的斜率在变小
df['temp_std'] = df[col].rolling(window=window, min_periods=min_periods).std().pct_change(window)
df['temp_mean'] = df[col].rolling(window=window, min_periods=min_periods).mean().pct_change(window)
df[new_col] = np.where(df['temp_mean'] != 0, df['temp_std'] / df['temp_mean'], 0)
# 加入变化率的正负号
df[new_col] = df[new_col] * np.sign(df[col].pct_change(window))
df.drop(['temp_std', 'temp_mean'], axis=1, inplace=True)
elif method == 'snr':
# 用 window 内的信噪比变化率来衡量序列的斜率
df[new_col] = df[col].diff(window) / df[col].diff().abs().rolling(window=window, min_periods=min_periods).sum()
else:
raise ValueError(f'Unknown method: {method}')
df[new_col] = df[new_col].fillna(0)
return df
def feature_adjust_V230101(df: pd.DataFrame, fcol, **kwargs):
"""特征调整函数:对特征进行调整,使其符合持仓权重的定义
方法说明:对因子进行滚动相关系数计算,然后对因子值用 maxabs_scale 进行归一化,最后乘以滚动相关系数的符号
:param df: pd.DataFrame, 必须包含 dt、symbol、price 列,以及因子列
:param fcol: str 因子列名
:param kwargs: dict
"""
window = kwargs.get("window", 1000)
min_periods = kwargs.get("min_periods", 200)
df = df.copy().sort_values("dt", ascending=True).reset_index(drop=True)
df['n1b'] = df['price'].shift(-1) / df['price'] - 1
df['corr'] = df[fcol].rolling(window=window, min_periods=min_periods).corr(df['n1b'])
df['corr'] = df['corr'].shift(5).fillna(0)
df = rolling_scale(df, col=fcol, window=window, min_periods=min_periods,
new_col='weight', method='maxabs_scale', copy=True)
df['weight'] = df['weight'] * np.sign(df['corr'])
df.drop(['n1b', 'corr'], axis=1, inplace=True)
return df
def feature_adjust_V240323(df: pd.DataFrame, fcol, **kwargs):
"""特征调整函数:对特征进行调整,使其符合持仓权重的定义
方法说明:对因子进行滚动相关系数计算,然后对因子值用 scale + tanh 进行归一化,最后乘以滚动相关系数的符号
:param df: pd.DataFrame, 必须包含 dt、symbol、price 列,以及因子列
:param fcol: str 因子列名
:param kwargs: dict
"""
window = kwargs.get("window", 1000)
min_periods = kwargs.get("min_periods", 200)
df = df.copy().sort_values("dt", ascending=True).reset_index(drop=True)
df['n1b'] = df['price'].shift(-1) / df['price'] - 1
df['corr'] = df[fcol].rolling(window=window, min_periods=min_periods).corr(df['n1b'])
df['corr'] = df['corr'].shift(5).fillna(0)
df = rolling_tanh(df, col=fcol, window=window, min_periods=min_periods, new_col='weight')
df['weight'] = df['weight'] * np.sign(df['corr'])
df.drop(['n1b', 'corr'], axis=1, inplace=True)
return df
[docs]def feature_adjust(df: pd.DataFrame, fcol, method, **kwargs):
"""特征调整函数:对特征进行调整,使其符合持仓权重的定义
:param df: pd.DataFrame, 待调整的数据
:param fcol: str, 因子列名
:param method: str, 调整方法
- V230101: 对因子进行滚动相关系数计算,然后对因子值用 maxabs_scale 进行归一化,最后乘以滚动相关系数的符号
- V240323: 对因子进行滚动相关系数计算,然后对因子值用 scale + tanh 进行归一化,最后乘以滚动相关系数的符号
:param kwargs: dict
- window: int, 滚动窗口大小
- min_periods: int, 最小计算周期
:return: pd.DataFrame, 新增 weight 列
"""
if method == "V230101":
return feature_adjust_V230101(df, fcol, **kwargs)
elif method == "V240323":
return feature_adjust_V240323(df, fcol, **kwargs)
else:
raise ValueError(f"Unknown method: {method}")