# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2023/2/7 13:17
describe: 用于探索性分析的函数
"""
import loguru
import pandas as pd
import numpy as np
from sklearn.linear_model import Ridge, LinearRegression, Lasso
[docs]def vwap(price: np.array, volume: np.array, **kwargs) -> float:
"""计算成交量加权平均价
:param price: 价格序列
:param volume: 成交量序列
:return: 平均价
"""
return np.average(price, weights=volume)
[docs]def twap(price: np.array, **kwargs) -> float:
"""计算时间加权平均价
:param price: 价格序列
:return: 平均价
"""
return np.average(price)
[docs]def remove_beta_effects(df, **kwargs):
"""去除 beta 对因子的影响
:param df: DataFrame, 数据, 必须包含 dt、symbol、factor 和 betas 列
:param kwargs:
- factor: str, 因子列名
- betas: list, beta 列名列表
- linear_model: str, 线性模型,可选 ridge、linear 或 lasso
- linear_model_params: dict, 线性模型参数, 默认为空, 需要传入字典,根据模型不同参数不同
:return: DataFrame
"""
linear_model = kwargs.get("linear_model", "ridge")
linear_model_params = kwargs.get("linear_model_params", {})
linear = {
"ridge": Ridge,
"linear": LinearRegression,
"lasso": Lasso,
}
assert linear_model in linear.keys(), "linear_model 参数必须为 ridge、linear 或 lasso"
Model = linear[linear_model]
factor = kwargs.get("factor")
betas = kwargs.get("betas")
logger = kwargs.get("logger", loguru.logger)
assert factor is not None and betas is not None, "factor 和 betas 参数必须指定"
assert isinstance(betas, list), "betas 参数必须为列表"
assert factor in df.columns, f"数据中不包含因子 {factor}"
assert all([x in df.columns for x in betas]), f"数据中不包含全部 beta {betas}"
logger.info(f"去除 beta 对因子 {factor} 的影响, 使用 {linear_model} 模型, betas: {betas}")
rows = []
for dt, dfg in df.groupby("dt"):
dfg = dfg.copy().dropna(subset=[factor] + betas)
if dfg.empty:
continue
x = dfg[betas].values
y = dfg[factor].values
model = Model(**linear_model_params).fit(x, y)
dfg[factor] = y - model.predict(x)
rows.append(dfg)
dfr = pd.concat(rows, ignore_index=True)
return dfr
[docs]def cross_sectional_strategy(df, factor, **kwargs):
"""根据截面因子值构建多空组合
:param df: pd.DataFrame, 包含因子列的数据, 必须包含 dt, symbol, factor 列
:param factor: str, 因子列名称
:param kwargs:
- factor_direction: str, 因子方向,positive 或 negative
- long_num: int, 多头持仓数量
- short_num: int, 空头持仓数量
- logger: loguru.logger, 日志记录器
:return: pd.DataFrame, 包含 weight 列的数据
"""
factor_direction = kwargs.get("factor_direction", "positive")
long_num = kwargs.get("long_num", 5)
short_num = kwargs.get("short_num", 5)
logger = kwargs.get("logger", loguru.logger)
assert factor in df.columns, f"{factor} 不在 df 中"
assert factor_direction in ["positive", "negative"], f"factor_direction 参数错误"
df = df.copy()
if factor_direction == "negative":
df[factor] = -df[factor]
df['weight'] = 0
for dt, dfg in df.groupby("dt"):
if len(dfg) < long_num + short_num:
logger.warning(f"{dt} 截面数据量过小,跳过;仅有 {len(dfg)} 条数据,需要 {long_num + short_num} 条数据")
continue
dfa = dfg.sort_values(factor, ascending=False).head(long_num)
dfb = dfg.sort_values(factor, ascending=True).head(short_num)
if long_num > 0:
df.loc[dfa.index, "weight"] = 1 / long_num
if short_num > 0:
df.loc[dfb.index, "weight"] = -1 / short_num
return df
[docs]def judge_factor_direction(df: pd.DataFrame, factor, target='n1b', by='symbol', **kwargs):
"""判断因子的方向,正向还是反向
:param df: pd.DataFrame, 数据源,必须包含 symbol, dt, target, factor 列
:param factor: str, 因子名称
:param target: str, 目标名称,默认为 n1b,表示下一根K线的涨跌幅
:param by: str, 分组字段,默认为 symbol,表示按品种分组(时序);也可以按 dt 分组,表示按时间分组(截面)
:param kwargs: dict, 其他参数
- method: str, 相关系数计算方法,默认为 pearson,可选 pearson, kendall, spearman
:return: str, positive or negative
"""
assert by in df.columns, f"数据中不存在 {by} 字段"
assert factor in df.columns, f"数据中不存在 {factor} 字段"
assert target in df.columns, f"数据中不存在 {target} 字段"
if by == "dt" and df['symbol'].nunique() < 2:
raise ValueError("品种数量过少,无法在时间截面上计算因子有效性方向")
if by == "symbol" and df['dt'].nunique() < 2:
raise ValueError("时间序列数据量过少,无法在品种上计算因子有效性方向")
method = kwargs.get("method", "pearson")
dfc = df.groupby(by)[[factor, target]].corr(method=method).unstack().iloc[:, 1].reset_index()
return "positive" if dfc[factor].mean().iloc[0] >= 0 else "negative"
[docs]def monotonicity(sequence):
"""计算序列的单调性
原理:计算序列与自然数序列的相关系数,系数越接近1,表示单调递增;系数越接近-1,表示单调递减;接近0表示无序
:param sequence: list, tuple 序列
:return: float, 单调性系数
"""
from scipy.stats import spearmanr
return spearmanr(sequence, range(len(sequence)))[0]
[docs]def min_max_limit(x, min_val, max_val, digits=4):
"""限制 x 的取值范围在 min_val 和 max_val 之间
:param x: float, 输入值
:param min_val: float, 最小值
:param max_val: float, 最大值
:param digits: int, 保留小数位数
:return: float
"""
return round(max(min_val, min(max_val, x)), digits)
[docs]def rolling_layers(df, factor, n=5, **kwargs):
"""对时间序列数据进行分层
:param df: 因子数据,必须包含 dt, factor 列,其中 dt 为日期,factor 为因子值
:param factor: 因子列名
:param n: 分层数量,默认为10
:param kwargs:
- window: 窗口大小,默认为2000
- min_periods: 最小样本数量,默认为300
- mode: str, {'loose', 'strict'}, 分层模式,默认为 'loose';
loose 表示使用 rolling + rank 的方式分层,有一点点未来信息,存在一定的数据穿越问题;
strict 表示使用 rolling + qcut 的方式分层,无未来信息,但是执行速度较慢。
:return: df, 添加了 factor分层 列
"""
assert df[factor].nunique() > n * 2, "因子值的取值数量必须大于分层数量"
assert df[factor].isna().sum() == 0, "因子有缺失值,缺失数量为:{}".format(df[factor].isna().sum())
assert df['dt'].duplicated().sum() == 0, f"dt 列不能有重复值,存在重复值数量:{df['dt'].duplicated().sum()}"
window = kwargs.get("window", 600)
min_periods = kwargs.get("min_periods", 300)
# 不能有 inf 和 -inf
if df.loc[df[factor].isin([float("inf"), float("-inf")]), factor].shape[0] > 0:
raise ValueError(f"存在 {factor} 为 inf / -inf 的数据")
if kwargs.get('mode', 'loose') == 'loose':
# loose 模式,可能存在一点点未来信息
df['pct_rank'] = df[factor].rolling(window=window, min_periods=min_periods).rank(pct=True, ascending=True)
bins = [i/n for i in range(n+1)]
df['pct_rank_cut'] = pd.cut(df['pct_rank'], bins=bins, labels=False)
df['pct_rank_cut'] = df['pct_rank_cut'].fillna(-1)
# 第00层表示缺失值
df[f"{factor}分层"] = df['pct_rank_cut'].apply(lambda x: f"第{str(int(x+1)).zfill(2)}层")
df.drop(['pct_rank', 'pct_rank_cut'], axis=1, inplace=True)
else:
assert kwargs.get('mode', 'strict') == 'strict'
df[f"{factor}_qcut"] = (
df[factor].rolling(window=window, min_periods=min_periods)
.apply(lambda x: pd.qcut(x, q=n, labels=False, duplicates="drop", retbins=False).values[-1], raw=False)
)
df[f"{factor}_qcut"] = df[f"{factor}_qcut"].fillna(-1)
# 第00层表示缺失值
df[f"{factor}分层"] = df[f"{factor}_qcut"].apply(lambda x: f"第{str(int(x+1)).zfill(2)}层")
df.drop([f"{factor}_qcut"], axis=1, inplace=True)
return df