Source code for czsc.utils.sig

# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2022/10/27 23:23
describe: 用于信号计算函数的各种辅助工具函数
"""
import numpy as np
from deprecated import deprecated
from collections import Counter, OrderedDict
from typing import List, Any, Dict, Union, Tuple
from czsc.enum import Direction
from czsc.objects import BI, RawBar, ZS, Signal


[docs]def create_single_signal(**kwargs) -> OrderedDict: """创建单个信号""" s = OrderedDict() k1, k2, k3 = kwargs.get("k1", "任意"), kwargs.get("k2", "任意"), kwargs.get("k3", "任意") v1, v2, v3 = kwargs.get("v1", "任意"), kwargs.get("v2", "任意"), kwargs.get("v3", "任意") v = Signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2, v3=v3, score=kwargs.get("score", 0)) s[v.key] = v.value return s
[docs]def is_symmetry_zs(bis: List[BI], th: float = 0.3) -> bool: """对称中枢判断:中枢中所有笔的力度序列,标准差小于均值的一定比例 https://pic2.zhimg.com/80/v2-2f55ef49eda01972462531ebb6de4f19_1440w.jpg :param bis: 构成中枢的笔序列 :param th: 标准差小于均值的比例阈值 :return: """ if len(bis) % 2 == 0: return False zs = ZS(bis=bis) if zs.zd > zs.zg or max([x.low for x in bis]) > min([x.high for x in bis]): return False zns = [x.power_price for x in bis] if np.std(zns) / np.mean(zns) <= th: return True else: return False
def check_cross_info(fast: [List, np.array], slow: [List, np.array]): """计算 fast 和 slow 的交叉信息 :param fast: 快线 :param slow: 慢线 :return: """ assert len(fast) == len(slow), "快线和慢线的长度不一样" if isinstance(fast, list): fast = np.array(fast) if isinstance(slow, list): slow = np.array(slow) length = len(fast) delta = fast - slow cross_info = [] last_i = -1 last_v = 0 temp_fast = [] temp_slow = [] for i, v in enumerate(delta): last_i += 1 last_v += abs(v) temp_fast.append(fast[i]) temp_slow.append(slow[i]) if i >= 2 and delta[i - 1] <= 0 < delta[i]: kind = "金叉" elif i >= 2 and delta[i - 1] >= 0 > delta[i]: kind = "死叉" else: continue cross_info.append( { "位置": i, "类型": kind, "快线": fast[i], "慢线": slow[i], "距离": last_i, "距今": length - i, "面积": round(last_v, 4), "价差": round(v, 4), "快线高点": max(temp_fast), "快线低点": min(temp_fast), "慢线高点": max(temp_slow), "慢线低点": min(temp_slow), } ) last_i = 0 last_v = 0 temp_fast = [] temp_slow = [] return cross_info
[docs]@deprecated(version="1.0.0", reason="分析方法不太合理,不再使用") def check_pressure_support(bars: List[RawBar], q_seq: List[float] = None) -> Dict: """检查 bars 中的支撑、压力信息 1. 通过 round 函数对 K 线价格序列进行近似,统计价格出现次数,取出现次数超过5次的价位 2. 在出现次数最多的价格序列上计算分位数序列作为关键价格序列 :param bars: K线序列,按时间升序 :param q_seq: 分位数序列 :return: """ assert len(bars) >= 300, "分析至少需要300根K线" min_low = min(x.low for x in bars) price_seq = [y for x in bars for y in (x.open, x.close, x.high, x.low)] price_seq = [round(x, 0) if min_low > 100 else round(x, 1) for x in price_seq] lines = sorted([x for x, v in Counter(price_seq).most_common() if v >= 5]) q_seq = q_seq if q_seq else [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1] key_price = [np.quantile(lines, i, method="nearest") for i in q_seq] kp_low = [x for x in key_price if x <= bars[-1].close] kp_high = [x for x in key_price if x >= bars[-1].close] info = { "关键位": key_price, "支撑位": kp_low, "压力位": kp_high, "第一支撑": kp_low[-1] if len(kp_low) >= 1 else -1, "第二支撑": kp_low[-2] if len(kp_low) >= 2 else -1, "第一压力": kp_high[0] if len(kp_high) >= 1 else -1, "第二压力": kp_high[1] if len(kp_high) >= 2 else -1, } return info
[docs]def check_gap_info(bars: List[RawBar]): """检查 bars 中的缺口信息 :param bars: K线序列,按时间升序 :return: """ gap_info = [] if len(bars) < 2: return gap_info for i in range(1, len(bars)): bar1, bar2 = bars[i - 1], bars[i] right = bars[i:] gap = None if bar1.high < bar2.low: delta = round(bar2.low / bar1.high - 1, 4) cover = "已补" if min(x.low for x in right) < bar1.high else "未补" gap = { "kind": "向上缺口", "cover": cover, "sdt": bar1.dt, "edt": bar2.dt, "high": bar2.low, "low": bar1.high, "delta": delta, } if bar1.low > bar2.high: delta = round(bar1.low / bar2.high - 1, 4) cover = "已补" if max(x.high for x in right) > bar1.low else "未补" gap = { "kind": "向下缺口", "cover": cover, "sdt": bar1.dt, "edt": bar2.dt, "high": bar1.low, "low": bar2.high, "delta": delta, } if gap: gap_info.append(gap) return gap_info
[docs]def fast_slow_cross(fast, slow): """计算 fast 和 slow 的交叉信息 :param fast: 快线 :param slow: 慢线 :return: """ assert len(fast) == len(slow), "快线和慢线的长度不一样" if isinstance(fast, list): fast = np.array(fast) if isinstance(slow, list): slow = np.array(slow) length = len(fast) delta = fast - slow cross_info = [] last_i = -1 last_v = 0 temp_fast = [] temp_slow = [] for i, v in enumerate(delta): last_i += 1 last_v += abs(v) temp_fast.append(fast[i]) temp_slow.append(slow[i]) if i >= 2 and delta[i - 1] <= 0 < delta[i]: kind = "金叉" elif i >= 2 and delta[i - 1] >= 0 > delta[i]: kind = "死叉" else: continue cross_info.append( { "位置": i, "类型": kind, "快线": fast[i], "慢线": slow[i], "距离": last_i, "距今": length - i, "面积": round(last_v, 4), "价差": round(v, 4), "快线高点": max(temp_fast), "快线低点": min(temp_fast), "慢线高点": max(temp_slow), "慢线低点": min(temp_slow), } ) last_i = 0 last_v = 0 temp_fast = [] temp_slow = [] return cross_info
[docs]def same_dir_counts(seq: [List, np.array]): """计算 seq 中与最后一个数字同向的数字数量 :param seq: 数字序列 :return: example ---------- >>>print(same_dir_counts([-1, -1, -2, -3, 0, 1, 2, 3, -1, -2, 1, 1, 2, 3])) >>>print(same_dir_counts([-1, -1, -2, -3, 0, 1, 2, 3])) """ s = seq[-1] c = 0 for num in seq[::-1]: if (num > 0 and s > 0) or (num < 0 and s < 0): c += 1 else: break return c
[docs]def count_last_same(seq: Union[List, np.array, Tuple]): """统计与seq列表最后一个元素相似的连续元素数量 :param seq: 数字序列 :return: """ s = seq[-1] c = 0 for _s in seq[::-1]: if _s == s: c += 1 else: break return c
[docs]def get_sub_elements(elements: List[Any], di: int = 1, n: int = 10) -> List[Any]: """获取截止到倒数第 di 个元素的前 n 个元素 :param elements: 全部元素列表 :param di: 指定结束元素为倒数第 di 个 :param n: 指定需要的元素个数 :return: 部分元素列表 >>>x = [1, 2, 3, 4, 5, 6, 7, 8, 9] >>>y1 = get_sub_elements(x, di=1, n=3) >>>y2 = get_sub_elements(x, di=2, n=3) """ assert di >= 1 if di == 1: se = elements[-n:] else: se = elements[-n - di + 1 : -di + 1] return se
[docs]def is_bis_down(bis: List[BI]): """判断 bis 中的连续笔是否是向下的""" if not bis or len(bis) < 3 or len(bis) % 2 == 0: return False assert bis[1].fx_b.dt > bis[0].fx_b.dt, "时间由远到近" if ( bis[-1].direction == Direction.Down and bis[0].high == max([x.high for x in bis]) and bis[-1].low == min([x.low for x in bis]) ): return True else: return False
[docs]def is_bis_up(bis: List[BI]): """判断 bis 中的连续笔是否是向上的""" if not bis or len(bis) < 3 and len(bis) % 2 == 0: return False assert bis[1].fx_b.dt > bis[0].fx_b.dt, "时间由远到近" if ( bis[-1].direction == Direction.Up and bis[-1].high == max([x.high for x in bis]) and bis[0].low == min([x.low for x in bis]) ): return True else: return False
def get_zs_seq(bis: List[BI]) -> List[ZS]: """获取连续笔中的中枢序列 :param bis: 连续笔对象列表 :return: 中枢序列 """ zs_list = [] if not bis: return [] for bi in bis: if not zs_list: zs_list.append(ZS(bis=[bi])) continue zs = zs_list[-1] if not zs.bis: zs.bis.append(bi) zs_list[-1] = zs else: if (bi.direction == Direction.Up and bi.high < zs.zd) or ( bi.direction == Direction.Down and bi.low > zs.zg ): zs_list.append(ZS(bis=[bi])) else: zs.bis.append(bi) zs_list[-1] = zs return zs_list def cross_zero_axis(n1: Union[List, np.ndarray], n2: Union[List, np.ndarray]) -> int: """判断两个数列的零轴交叉点 :param n1: 数列1 :param n2: 数列2 :return: 交叉点所在的索引位置 """ assert len(n1) == len(n2), "输入两个数列长度不等" axis_0 = np.zeros(len(n1)) n1 = np.flip(n1) n2 = np.flip(n2) x1 = np.where(n1[0] * n1 < axis_0, True, False) x2 = np.where(n2[0] * n2 < axis_0, True, False) num1 = np.argmax(x1[:-1] != x1[1:]) + 2 if np.any(x1) else 0 num2 = np.argmax(x2[:-1] != x2[1:]) + 2 if np.any(x2) else 0 return max(num1, num2) def cal_cross_num(cross: List, distance: int = 1) -> tuple: """使用 distance 过滤掉fast_slow_cross函数返回值cross列表中 不符合要求的交叉点,返回处理后的金叉和死叉数值 :param cross: fast_slow_cross函数返回值 :param distance: 金叉和死叉之间的最小距离 :return: jc金叉值 ,SC死叉值 """ if len(cross) == 0: return 0, 0 elif len(cross) == 1: cross_ = cross elif len(cross) == 2: if cross[-1]["距离"] < distance: cross_ = [] else: cross_ = cross else: if cross[-1]["距离"] < distance: last_cross = cross[-1] del cross[-2] re_cross = [i for i in cross if i["距离"] >= distance] re_cross.append(last_cross) else: re_cross = [i for i in cross if i["距离"] >= distance] cross_ = [] for i in range(0, len(re_cross)): if len(cross_) >= 1 and re_cross[i]["类型"] == re_cross[i - 1]["类型"]: # 不将上一个元素加入cross_ del cross_[-1] cross_.append(re_cross[i]) else: cross_.append(re_cross[i]) jc = len([x for x in cross_ if x["类型"] == "金叉"]) sc = len([x for x in cross_ if x["类型"] == "死叉"]) return jc, sc def down_cross_count(x1: Union[List, np.array], x2: Union[List, np.array]) -> int: """输入两个序列,计算 x1 下穿 x2 的次数 :param x1: list :param x2: list :return: int """ x = np.array(x1) < np.array(x2) num = 0 for i in range(len(x) - 1): b1, b2 = x[i], x[i + 1] if b2 and b1 != b2: num += 1 return num