Source code for czsc.signals.zdy

# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2023/3/13 22:45
describe: ZDY 信号函数集合
"""
import numpy as np
from loguru import logger
from collections import OrderedDict
from czsc.traders.base import CZSC, CzscTrader
from czsc.signals.tas import update_macd_cache
from czsc.utils import get_sub_elements, create_single_signal, sorted_freqs
from czsc.objects import Direction, Mark, Operate, ZS


[docs]def zdy_bi_end_V230406(c: CZSC, **kwargs) -> OrderedDict: """分型停顿判断K线结束 参数模板:"{freq}_D0停顿分型_BE辅助V230406" **信号逻辑:** 1. 当分型形成后,等待后续出现某一根 K 线的收盘价站住第三根 K 线(此处的第三根 K 线是指形成分型的第三根 K 线) 的极值,即可认为满足了停顿法的要求。 2. 如果是两次停顿确认笔结束,要求内部至少还有一个相应的分型停顿。 **信号列表:** - Signal('15分钟_D0停顿分型_BE辅助V230406_看空_任意_任意_0') - Signal('15分钟_D0停顿分型_BE辅助V230406_看多_任意_任意_0') - Signal('15分钟_D0停顿分型_BE辅助V230406_看多_内部底停顿_任意_0') - Signal('15分钟_D0停顿分型_BE辅助V230406_看空_内部顶停顿_任意_0') - Signal('15分钟_D0停顿分型_BE辅助V230406_看空_任意_顶分区间_0') - Signal('15分钟_D0停顿分型_BE辅助V230406_看多_任意_底分区间_0') - Signal('15分钟_D0停顿分型_BE辅助V230406_看空_内部顶停顿_顶分区间_0') - Signal('15分钟_D0停顿分型_BE辅助V230406_看多_内部底停顿_底分区间_0') **相关信号:** - :func:`czsc.signals.byi_bi_end_V230106` :param c: CZSC对象 :return: """ freq = c.freq.value k1, k2, k3 = f"{freq}_D0停顿分型_BE辅助V230406".split("_") v1 = "其他" if len(c.bi_list) < 3 or len(c.bars_ubi) > 6 or len(c.bars_ubi) < 4: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) last_bi = c.bi_list[-1] last_high = max([x.high for x in last_bi.fx_b.elements[-1].raw_bars]) last_low = min([x.low for x in last_bi.fx_b.elements[-1].raw_bars]) last_bar = c.bars_raw[-1] if last_bi.fx_b.elements[-1].dt >= last_bar.dt or last_bi.length < 7: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) last_bars = [y for x in c.bars_ubi[3:] for y in x.raw_bars] max_close = max([x.close for x in last_bars]) min_close = min([x.close for x in last_bars]) if last_bi.direction == Direction.Down and max_close > last_high: v1 = "看多" elif last_bi.direction == Direction.Up and min_close < last_low: v1 = "看空" if v1 == "其他": return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) # 笔内部再次查找停顿分型 v2 = "任意" if v1 == "看多" and len(last_bi.fxs) >= 4: for fx1, fx2 in zip(last_bi.fxs[:-1], last_bi.fxs[1:]): if ( fx1.mark == Mark.D and fx2.mark == Mark.G and max([x.close for x in fx2.raw_bars]) > fx1.elements[-1].high ): v2 = "内部底停顿" if v1 == "看空" and len(last_bi.fxs) >= 4: for fx1, fx2 in zip(last_bi.fxs[:-1], last_bi.fxs[1:]): if ( fx1.mark == Mark.G and fx2.mark == Mark.D and min([x.close for x in fx2.raw_bars]) < fx1.elements[-1].low ): v2 = "内部顶停顿" # 价格回到笔结束分型内部 v3 = "任意" if v1 == "看多": assert last_bi.fx_b.mark == Mark.D if last_bar.close < last_bi.fx_b.high: v3 = "底分区间" if v1 == "看空": assert last_bi.fx_b.mark == Mark.G if last_bar.close > last_bi.fx_b.low: v3 = "顶分区间" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2, v3=v3)
[docs]def zdy_bi_end_V230407(c: CZSC, **kwargs) -> OrderedDict: """分型停顿判断K线结束 参数模板:"{freq}_D0停顿分型_BE辅助V230407" **信号逻辑:** 1. 当分型形成后,等待后续出现某一根 K 线的收盘价站住第三根 K 线(此处的第三根 K 线是指形成分型的第三根 K 线) 的极值,且后续的所有K线收盘价都满足,即可认为满足了停顿法的要求。 2. 如果是两次停顿确认笔结束,要求内部至少还有一个相应的分型停顿。 **信号列表:** - Signal('15分钟_D0停顿分型_BE辅助V230407_看空_任意_任意_0') - Signal('15分钟_D0停顿分型_BE辅助V230407_看多_任意_任意_0') - Signal('15分钟_D0停顿分型_BE辅助V230407_看多_内部底停顿_任意_0') - Signal('15分钟_D0停顿分型_BE辅助V230407_看空_内部顶停顿_任意_0') :param c: CZSC对象 :return: """ freq = c.freq.value k1, k2, k3 = f"{freq}_D0停顿分型_BE辅助V230407".split("_") v1 = "其他" if len(c.bi_list) < 3 or len(c.bars_ubi) > 6 or len(c.bars_ubi) < 4: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) last_bi = c.bi_list[-1] last_high = max([x.high for x in last_bi.fx_b.elements[-1].raw_bars]) last_low = min([x.low for x in last_bi.fx_b.elements[-1].raw_bars]) last_bar = c.bars_raw[-1] if last_bi.fx_b.elements[-1].dt >= last_bar.dt or last_bi.length < 7: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) last_bars = [y for x in c.bars_ubi for y in x.raw_bars] last_bars = [x for x in last_bars if x.dt >= last_bi.fx_b.elements[-1].dt] if last_bi.direction == Direction.Down and last_bars[-1].close > last_high: _temp = [i for i, x in enumerate(last_bars) if x.close > last_high] if len(_temp) == 1 or (len(_temp) > 1 and _temp[-1] - _temp[0] == len(_temp) - 1): v1 = "看多" elif last_bi.direction == Direction.Up and last_bars[-1].close < last_low: _temp = [i for i, x in enumerate(last_bars) if x.close < last_low] if len(_temp) == 1 or (len(_temp) > 1 and _temp[-1] - _temp[0] == len(_temp) - 1): v1 = "看空" if v1 == "其他": return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) # 笔内部再次查找停顿分型 v2 = "任意" if v1 == "看多" and len(last_bi.fxs) >= 4: for fx1, fx2 in zip(last_bi.fxs[:-1], last_bi.fxs[1:]): if ( fx1.mark == Mark.D and fx2.mark == Mark.G and max([x.close for x in fx2.raw_bars]) > fx1.elements[-1].high ): v2 = "内部底停顿" if v1 == "看空" and len(last_bi.fxs) >= 4: for fx1, fx2 in zip(last_bi.fxs[:-1], last_bi.fxs[1:]): if ( fx1.mark == Mark.G and fx2.mark == Mark.D and min([x.close for x in fx2.raw_bars]) < fx1.elements[-1].low ): v2 = "内部顶停顿" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2)
[docs]def zdy_vibrate_V230406(cat: CzscTrader, freq1, freq2, **kwargs) -> OrderedDict: """中枢震荡短差操作 参数模板:"中枢震荡_{freq1}#{freq2}_BS辅助V230406" **信号逻辑:** 1、中枢笔数必须大于等于3笔。(排除掉盘背构成的中枢) 2、中枢上沿做空,中枢下沿做多(不区分中枢方向) 以开空仓为例(中枢上沿),当向上一笔出现次级别分型停顿后,定义次级别收盘价为 P,次级别顶分型高点为 H ,则需满足: 1. H>=本级别中枢上沿价格 2.(H-本级别中枢上沿价格)< 本级别中枢高度 3.(H-P)*3 < 中枢高度 4. 本级别MACD黄白线死叉确立(已经走出并列向下走势) 稍微解释一下几个条件的内在逻辑: 1. H>=本级别中枢上沿价格 : 确保是在中枢的上沿 2.(H-本级别中枢上沿价格)< 本级别中枢高度 : 确保中枢上沿不要太远的距离,太远了可能下跌就形成三买上去了。这种点不要。 3. (H-P)*3 < 中枢高度 : H-P,其实就是一个次级别分型停顿的高度了,这里就是为了排除掉奔走型中枢,区间特别小的中枢,没必要做。 **信号列表:** - Signal('中枢震荡_5分钟#60分钟_BS辅助V230406_看多_任意_任意_0') - Signal('中枢震荡_5分钟#60分钟_BS辅助V230406_看空_任意_任意_0') :param cat: 交易员对象 :param freq1: 次级别 :param freq2: 本级别 :param kwargs: :return: """ k1, k2, k3 = f"中枢震荡_{freq1}#{freq2}_BS辅助V230406".split("_") v1 = "其他" assert sorted_freqs.index(freq1) < sorted_freqs.index(freq2), "freq1 必须小于 freq2" c1: CZSC = cat.kas[freq1] c2: CZSC = cat.kas[freq2] cache_key = update_macd_cache(c2, fastperiod=12, slowperiod=26, signalperiod=9) if len(c2.bi_list) < 5: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) b1, b2, b3, b4 = c2.bi_list[-4:] zg = min(b1.high, b2.high, b3.high) zd = max(b1.low, b2.low, b3.low) if zd > zg: # 本级别中枢不成立,不做 return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) # 判断次级别分型是否停顿 c1_lbi = c1.bi_list[-1] c1_bar = c1.bars_raw[-1] if c1_bar.dt == c1_lbi.fx_b.raw_bars[-1].dt or len(c1.bars_ubi) > 6: # 次级别不具备分型停顿的条件,不做 return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) if ( c1_lbi.direction == Direction.Down and c1_bar.close > c1_bar.open and c1_bar.close > c1_lbi.fx_b.raw_bars[-1].high ): temp = "底分停顿" elif ( c1_lbi.direction == Direction.Up and c1_bar.close < c1_bar.open and c1_bar.close < c1_lbi.fx_b.raw_bars[-1].low ): temp = "顶分停顿" else: # 次级别分型没有停顿,不做 return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) # 判断中枢震荡买卖点 c2_bar = c2.bars_raw[-1] if temp == "顶分停顿" and c2_bar.cache[cache_key]["macd"] < 0: P = c1_bar.close H = c1_lbi.high if H >= zg and (H - zg) < (zg - zd) and (H - P) * 3 < (zg - zd): v1 = "看空" if temp == "底分停顿" and c2_bar.cache[cache_key]["macd"] > 0: P = c1_bar.close L = c1_lbi.low if L <= zd and (zd - L) < (zg - zd) and (P - L) * 3 < (zg - zd): v1 = "看多" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def zdy_stop_loss_V230406(cat: CzscTrader, **kwargs) -> OrderedDict: """笔操作止损逻辑 参数模板:"{freq1}_{pos_name}F{first_stop}_止损V230406" **信号逻辑:** 多头止损逻辑如下,反之为空头止损逻辑: 1. 进场后设定止损为固定BP止损。 2. 跌破开仓前最后一个分型的低点,止损。 3. 任何一笔在持仓状态下的上升笔结束后,若没有判断止盈走人,则将止损设定在该笔起始点的位置(或适当放低一些固定BP) **信号列表:** - Signal('日线_5日线多头F300_止损V230406_多头止损_任意_任意_0') - Signal('日线_5日线多头F300_止损V230406_空头止损_任意_任意_0') :param cat: CzscTrader对象 :return: """ pos_name = kwargs["pos_name"] freq1 = kwargs["freq1"] # 笔的观察周期 first_stop = int(kwargs.get("first_stop", 300)) # 进场点止损设置 k1, k2, k3 = f"{freq1}_{pos_name}F{first_stop}_止损V230406".split("_") v1, v2 = "其他", "其他" if hasattr(cat, "positions") is False: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) pos = [x for x in cat.positions if x.name == pos_name][0] if len(pos.operates) == 0 or pos.operates[-1]["op"] in [Operate.SE, Operate.LE]: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) c = cat.kas[freq1] op = pos.operates[-1] d3bi = c.bi_list[-3] bis = [x for x in c.bi_list if x.fx_b.dt >= op["dt"]] last_bar = c.bars_raw[-1] # 多头止损逻辑 if op["op"] == Operate.LO: open_base_fx = [x for x in c.fx_list if x.mark == Mark.D and x.dt < op["dt"]][-1] assert open_base_fx.mark == Mark.D, "开仓前最后一个分型必须是底分型" if last_bar.close < open_base_fx.low: v1 = "多头止损" v2 = "跌破分型低点" if (last_bar.close / op["price"] - 1) * 10000 <= -first_stop: v1 = "多头止损" v2 = "进场点止损" if ( len(bis) > 0 and bis[-1].direction == Direction.Up and bis[-1].high > d3bi.high and last_bar.close < op["price"] ): assert bis[-1].direction == Direction.Up v1 = "多头止损" v2 = "跌破成本价" if len(bis) > 1 and bis[-1].direction == Direction.Up and last_bar.close < bis[-2].fx_b.low: v1 = "多头止损" v2 = "跌破上个向下笔底" # 空头止损逻辑 if op["op"] == Operate.SO: open_base_fx = [x for x in c.fx_list if x.mark == Mark.G and x.dt < op["dt"]][-1] assert open_base_fx.mark == Mark.G, "开仓前最后一个分型必须是底分型" if last_bar.close > open_base_fx.high: v1 = "空头止损" v2 = "升破分型高点" if (1 - last_bar.close / op["price"]) * 10000 <= -first_stop: v1 = "空头止损" v2 = "进场点止损" if ( len(bis) > 0 and bis[-1].direction == Direction.Down and bis[-1].low < d3bi.low and last_bar.close > op["price"] ): assert d3bi.direction == Direction.Down v1 = "空头止损" v2 = "升破成本价" if len(bis) > 1 and bis[-1].direction == Direction.Down and last_bar.close > bis[-2].fx_b.high: v1 = "空头止损" v2 = "升破上个向上笔顶" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2)
[docs]def zdy_take_profit_V230406(cat: CzscTrader, **kwargs) -> OrderedDict: """笔操作止盈逻辑 参数模板:"{freq1}_{pos_name}_止盈V230406" **信号逻辑:** 多头止盈逻辑如下,反之为空头止盈逻辑: 1. 任何一笔在持仓状态下的上升笔结束,若升破前一下跌笔高点,继续持仓,如没有升破前一下跌笔高点,止盈走人。需要用两次停顿分型判断。 **信号列表:** - Signal('日线_5日线多头_止盈V230406_多头止盈_任意_任意_0') - Signal('日线_5日线多头_止盈V230406_空头止盈_任意_任意_0') :param cat: CzscTrader对象 :return: """ pos_name = kwargs["pos_name"] freq1 = kwargs["freq1"] # 笔的观察周期 k1, k2, k3 = f"{freq1}_{pos_name}_止盈V230406".split("_") v1, v2 = "其他", "其他" if not hasattr(cat, "positions"): return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) pos = [x for x in cat.positions if x.name == pos_name][0] if len(pos.operates) == 0 or pos.operates[-1]["op"] in [Operate.SE, Operate.LE]: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) op = pos.operates[-1] c = cat.kas[freq1] bis = [x for x in c.bi_list if x.fx_b.dt >= op["dt"]] # 多头止盈逻辑 if op["op"] == Operate.LO: if len(bis) > 1 and bis[-1].direction == Direction.Up and bis[-1].high < bis[-2].high: v1 = "多头止盈" v2 = "向上笔不创新高" # 空头止盈逻辑 if op["op"] == Operate.SO: if len(bis) > 1 and bis[-1].direction == Direction.Down and bis[-1].low > bis[-2].low: v1 = "空头止盈" v2 = "向下笔不创新低" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2)
[docs]def zdy_take_profit_V230407(cat: CzscTrader, **kwargs) -> OrderedDict: """笔操作止盈逻辑 参数模板:"{freq1}_{pos_name}_止盈V230407" **信号逻辑:** 多头止盈逻辑如下,反之为空头止盈逻辑: 1. 根据K线个数判断力度,提前止盈。(三买之后的上升笔中,如果笔数已经大于前一下跌笔的K线个数的1.5倍,并且还没有新高,直接止盈走人。 **信号列表:** - Signal('日线_5日线多头_止盈V230407_多头止盈_任意_任意_0') - Signal('日线_5日线多头_止盈V230407_空头止盈_任意_任意_0') :param cat: CzscTrader对象 :return: 信号字典 """ pos_name = kwargs["pos_name"] freq1 = kwargs["freq1"] # 笔的观察周期 k1, k2, k3 = f"{freq1}_{pos_name}_止盈V230407".split("_") v1, v2 = "其他", "其他" if not hasattr(cat, "positions"): return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) pos = [x for x in cat.positions if x.name == pos_name][0] if len(pos.operates) == 0 or pos.operates[-1]["op"] in [Operate.SE, Operate.LE]: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) op = pos.operates[-1] c = cat.kas[freq1] bis = [x for x in c.bi_list if x.fx_b.dt >= op["dt"]] d2bi = c.bi_list[-2] if len(bis) <= 0 or bis[-1].length < 1.5 * d2bi.length: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) # 多头止盈逻辑 if op["op"] == Operate.LO and bis[-1].direction == Direction.Up and bis[-1].high < d2bi.high: assert d2bi.direction == Direction.Down v1 = "多头止盈" v2 = "向上笔不创新高" # 空头止盈逻辑 if op["op"] == Operate.SO and bis[-1].direction == Direction.Down and bis[-1].low > d2bi.low: assert d2bi.direction == Direction.Up v1 = "空头止盈" v2 = "向下笔不创新低" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2)
[docs]def zdy_zs_V230423(c: CZSC, **kwargs): """约束中枢的形态和高度 参数模板:"{freq}_D{di}中枢形态_BS辅助V230423" **信号逻辑:** 以上涨中枢为例,反之为下跌中枢: 1. 进入笔和离开笔分别是高低点,笔的数量大于等于5 2. 中枢高度 >= 进入中枢那一笔高度 / 3 **信号列表:** - Signal('15分钟_D1中枢形态_BS辅助V230423_上涨_5笔_任意_0') - Signal('15分钟_D1中枢形态_BS辅助V230423_上涨_7笔_任意_0') - Signal('15分钟_D1中枢形态_BS辅助V230423_下跌_5笔_任意_0') - Signal('15分钟_D1中枢形态_BS辅助V230423_下跌_7笔_任意_0') - Signal('15分钟_D1中枢形态_BS辅助V230423_下跌_9笔_任意_0') - Signal('15分钟_D1中枢形态_BS辅助V230423_上涨_9笔_任意_0') :param c: 基础周期的 CZSC 对象 :param kwargs: 其他参数 - di: 倒数第 di 根 K 线 :return: 信号字典 """ di = int(kwargs.get("di", 1)) freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}中枢形态_BS辅助V230423".split("_") v1 = "其他" if len(c.bi_list) < 7 or len(c.bars_ubi) > 7: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) for n in (9, 7, 5): bis = get_sub_elements(c.bi_list, di=di, n=n) if len(bis) != n: continue bi1 = bis[0] # 假定离开中枢的都是一笔 zs = ZS(bis[1:-1]) if not (zs.is_valid and zs.zg - zs.zd > (bi1.high - bi1.low) / 3): continue min_low = min(x.low for x in bis) max_high = max(x.high for x in bis) if bi1.direction == Direction.Up and bi1.low == min_low and bis[-1].high == max_high: return create_single_signal(k1=k1, k2=k2, k3=k3, v1="上涨", v2=f"{n}笔") if bi1.direction == Direction.Down and bi1.high == max_high and bis[-1].low == min_low: return create_single_signal(k1=k1, k2=k2, k3=k3, v1="下跌", v2=f"{n}笔") return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def zdy_macd_bc_V230422(c: CZSC, **kwargs): """MACD面积背驰 参数模板:"{freq}_D{di}T{th}MACD面积背驰_BS辅助V230422" **信号逻辑:** 以上涨背驰为例,反之为下跌背驰: 1. 背驰段的相应macd面积之和 <= 进入中枢段的相应面积之和 * th / 100 2. 中枢把黄白线拉到0轴附近, 3. 离开中枢的一笔,黄白线大于0且不新高 **信号列表:** - Signal('15分钟_D1T50MACD面积背驰_BS辅助V230422_上涨_9笔_任意_0') - Signal('15分钟_D1T50MACD面积背驰_BS辅助V230422_上涨_7笔_任意_0') - Signal('15分钟_D1T50MACD面积背驰_BS辅助V230422_下跌_5笔_任意_0') - Signal('15分钟_D1T50MACD面积背驰_BS辅助V230422_上涨_5笔_任意_0') - Signal('15分钟_D1T50MACD面积背驰_BS辅助V230422_下跌_7笔_任意_0') - Signal('15分钟_D1T50MACD面积背驰_BS辅助V230422_下跌_9笔_任意_0') :param c: 基础周期的 CZSC 对象 :param kwargs: 其他参数 - di: 倒数第 di 根 K 线 - th: 背驰段的相应macd面积之和 <= 进入中枢段的相应面积之和 * th / 100 :return: 信号字典 """ cache_key = update_macd_cache(c, fastperiod=26, slowperiod=12, signalperiod=9) di = int(kwargs.get("di", 1)) th = int(kwargs.get("th", 50)) freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}T{th}MACD面积背驰_BS辅助V230422".split("_") v1 = "其他" if len(c.bi_list) < 7 or len(c.bars_ubi) > 7: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) for n in (9, 7, 5): bis = get_sub_elements(c.bi_list, di=di, n=n) if len(bis) != n: continue # 假定离开中枢的都是一笔 zs = ZS(bis[1:-1]) if not zs.is_valid: # 如果中枢不成立,往下进行 continue bi1, bi2 = bis[0], bis[-1] bi1_macd = [x.cache[cache_key]["macd"] for x in bi1.raw_bars[1:-1]] bi2_macd = [x.cache[cache_key]["macd"] for x in bi2.raw_bars[1:-1]] bi1_dif = bi1.raw_bars[-2].cache[cache_key]["dif"] bi2_dif = bi2.raw_bars[-2].cache[cache_key]["dif"] zs_fxb_raw = [y for x in zs.bis for y in x.fx_b.raw_bars] if bi1.direction == Direction.Up: bi1_area = sum([x for x in bi1_macd if x > 0]) bi2_area = sum([x for x in bi2_macd if x > 0]) dif_zero = min([x.cache[cache_key]["dif"] for x in zs_fxb_raw]) else: bi1_area = sum([x for x in bi1_macd if x < 0]) bi2_area = sum([x for x in bi2_macd if x < 0]) dif_zero = max([x.cache[cache_key]["dif"] for x in zs_fxb_raw]) if bi2_area > bi1_area * th / 100: # 如果面积背驰不成立,往下进行 continue min_low = min(x.low for x in bis) max_high = max(x.high for x in bis) if ( bi1.direction == Direction.Up and bi1.low == min_low and bi2.high == max_high and dif_zero < 0 and bi1_dif > bi2_dif > 0 ): return create_single_signal(k1=k1, k2=k2, k3=k3, v1="上涨", v2=f"{n}笔") if ( bi1.direction == Direction.Down and bi1.high == max_high and bi2.low == min_low and dif_zero > 0 and bi1_dif < bi2_dif < 0 ): return create_single_signal(k1=k1, k2=k2, k3=k3, v1="下跌", v2=f"{n}笔") return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def zdy_macd_bs1_V230422(c: CZSC, **kwargs): """MACD辅助判断第一类买卖点 参数模板:"{freq}_D{di}T{th}MACD_BS1辅助V230422" **信号逻辑:** 以上涨背驰为例,反之为下跌背驰: 1. 背驰段的相应macd面积之和 <= 进入中枢段的相应面积之和 * th / 100 2. 离开中枢的一笔,起点的黄白线在零轴附近,终点的黄白线在中枢黄白线的上方 **信号列表:** - Signal('5分钟_D1T50MACD_BS1辅助V230422_看空_上涨5笔_任意_0') - Signal('5分钟_D1T50MACD_BS1辅助V230422_看多_下跌7笔_任意_0') - Signal('5分钟_D1T50MACD_BS1辅助V230422_看多_下跌5笔_任意_0') - Signal('5分钟_D1T50MACD_BS1辅助V230422_看空_上涨9笔_任意_0') - Signal('5分钟_D1T50MACD_BS1辅助V230422_看空_上涨7笔_任意_0') - Signal('5分钟_D1T50MACD_BS1辅助V230422_看多_下跌9笔_任意_0') :param c: 基础周期的 CZSC 对象 :param kwargs: 其他参数 - di: 倒数第 di 根 K 线 - th: 背驰段的相应macd面积之和 <= 进入中枢段的相应面积之和 * th / 100 :return: 信号字典 """ cache_key = update_macd_cache(c, fastperiod=26, slowperiod=12, signalperiod=9) di = int(kwargs.get("di", 1)) th = int(kwargs.get("th", 50)) freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}T{th}MACD_BS1辅助V230422".split("_") v1 = "其他" if len(c.bi_list) < 7 or len(c.bars_ubi) > 9: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) for n in (13, 11, 9, 7, 5): bis = get_sub_elements(c.bi_list, di=di, n=n) if len(bis) != n: continue # 假定离开中枢的都是一笔 zs = ZS(bis[1:-1]) if not zs.is_valid: # 如果中枢不成立,往下进行 continue bi1, bi2 = bis[0], bis[-1] bi1_area = sum([abs(x.cache[cache_key]["macd"]) for x in bi1.raw_bars[1:-1]]) bi2_area = sum([abs(x.cache[cache_key]["macd"]) for x in bi2.raw_bars[1:-1]]) bi1_dif = bi1.raw_bars[-2].cache[cache_key]["dif"] bi2_dif = bi2.raw_bars[-2].cache[cache_key]["dif"] bi2_start_dif = bi2.raw_bars[1].cache[cache_key]["dif"] if bi1.direction == Direction.Up: # 计算 zs 中向上笔的 dif 最大值 up_bis = [x for x in zs.bis if x.direction == Direction.Up] zs_dif = max([y.cache[cache_key]["dif"] for x in up_bis for y in x.fx_b.raw_bars]) else: # 计算 zs 中向下笔的 dif 最小值 down_bis = [x for x in zs.bis if x.direction == Direction.Down] zs_dif = min([y.cache[cache_key]["dif"] for x in down_bis for y in x.fx_b.raw_bars]) if bi2_area > bi1_area * th / 100: # 如果面积背驰不成立,往下进行 continue min_low = min(x.low for x in bis) max_high = max(x.high for x in bis) if ( bi1.direction == Direction.Up and bi1.low == min_low and bi2.high == max_high and bi2_start_dif < abs(zs_dif) * 0.5 and bi1_dif > bi2_dif > zs_dif > 0 ): return create_single_signal(k1=k1, k2=k2, k3=k3, v1="看空", v2=f"上涨{n}笔") if ( bi1.direction == Direction.Down and bi1.high == max_high and bi2.low == min_low and bi2_start_dif > abs(zs_dif) * 0.5 and 0 > zs_dif > bi2_dif > bi1_dif ): return create_single_signal(k1=k1, k2=k2, k3=k3, v1="看多", v2=f"下跌{n}笔") return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def zdy_zs_space_V230421(c: CZSC, **kwargs): """中枢空间形态约束 参数模板:"{freq}_D{di}中枢空间_BS辅助V230421" **信号逻辑:** 以顶背驰为例:要求【离开笔】距离中枢上沿的高度,大于等于【进入笔】距离中枢下沿的高度 **信号列表:** - Signal('15分钟_D1中枢空间_BS辅助V230421_上涨_5笔_任意_0') - Signal('15分钟_D1中枢空间_BS辅助V230421_下跌_5笔_任意_0') - Signal('15分钟_D1中枢空间_BS辅助V230421_上涨_9笔_任意_0') - Signal('15分钟_D1中枢空间_BS辅助V230421_上涨_7笔_任意_0') - Signal('15分钟_D1中枢空间_BS辅助V230421_下跌_7笔_任意_0') - Signal('15分钟_D1中枢空间_BS辅助V230421_下跌_9笔_任意_0') :param c: 基础周期的 CZSC 对象 :param kwargs: 其他参数 - di: 倒数第 di 根 K 线 :return: 信号字典 """ di = int(kwargs.get("di", 1)) freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}中枢空间_BS辅助V230421".split("_") v1 = "其他" if len(c.bi_list) < 7 or len(c.bars_ubi) > 7: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) for n in (9, 7, 5): bis = get_sub_elements(c.bi_list, di=di, n=n) if len(bis) != n: continue # 假定离开中枢的都是一笔 zs = ZS(bis[1:-1]) if not zs.is_valid: continue bi1, bi2 = bis[0], bis[-1] min_low = min(x.low for x in bis) max_high = max(x.high for x in bis) if ( bi1.direction == Direction.Up and bi1.low == min_low and bi2.high == max_high and bi2.high - zs.zg >= zs.zd - bi1.low ): return create_single_signal(k1=k1, k2=k2, k3=k3, v1="上涨", v2=f"{n}笔") if ( bi1.direction == Direction.Down and bi1.high == max_high and bi2.low == min_low and zs.zd - bi2.low >= bi1.high - zs.zg ): return create_single_signal(k1=k1, k2=k2, k3=k3, v1="下跌", v2=f"{n}笔") return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def zdy_macd_dif_V230516(c: CZSC, **kwargs) -> OrderedDict: """MACD柱子与DIF的关系 参数模板:"{freq}_D{di}DIF走平_BS辅助V230516" **信号逻辑:** * DIF走平看多:1) DIF小于MACD柱子;2)DIF相比前一周期下跌小于阈值 * DIF走平看空:1) DIF大于MACD柱子;2)DIF相比前一周期上涨小于阈值 **信号列表:** - Signal('60分钟_D1DIF走平_BS辅助V230516_看空_红柱远离_任意_0') - Signal('60分钟_D1DIF走平_BS辅助V230516_看空_柱子否定_任意_0') - Signal('60分钟_D1DIF走平_BS辅助V230516_看多_绿柱远离_任意_0') - Signal('60分钟_D1DIF走平_BS辅助V230516_看多_柱子否定_任意_0') :param c: CZSC对象 :param kwargs: 参数字典 :return: 返回信号结果 """ di = int(kwargs.get("di", 1)) freq = c.freq.value cache_key = update_macd_cache(c, fastperiod=12, slowperiod=26, signalperiod=9) k1, k2, k3 = f"{freq}_D{di}DIF走平_BS辅助V230516".split("_") v1, v2 = "其他", "其他" if len(c.bars_raw) < 12 + di: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) bars = get_sub_elements(c.bars_raw, di=di, n=10) # macd = bars[-1].cache[cache_key]['macd'] * 2 dif = [x.cache[cache_key]["dif"] for x in bars] dif_th = sum([abs(x - y) for x, y in zip(dif, dif[1:])]) / len(dif) * 0.2 if dif[-1] - dif[-2] > -dif_th: v1 = "看多" min_macd = min([x.cache[cache_key]["macd"] for x in bars]) v2 = "绿柱远离" if dif[-1] < min_macd * 2.5 else "柱子否定" if dif[-1] - dif[-2] < dif_th: v1 = "看空" max_macd = max([x.cache[cache_key]["macd"] for x in bars]) v2 = "红柱远离" if dif[-1] > max_macd * 2.5 else "柱子否定" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2)
[docs]def zdy_macd_dif_V230517(c: CZSC, **kwargs) -> OrderedDict: """MACD三次开仓条件 参数模板:"{freq}_D{di}MACD开仓_BS辅助V230517" **信号逻辑:** 以多头开仓为例: 1. DIF 在零轴下方运行很长时间后首次升破零轴 2. DIF 在零轴上方首次出现与 DEA 的飞吻 3. DIF 在零轴上方首次出现与 MACD 金叉 **信号列表:** - Signal('60分钟_D1MACD开仓_BS辅助V230517_看空_DIF破零轴_任意_0') - Signal('60分钟_D1MACD开仓_BS辅助V230517_看空_MACD飞吻_任意_0') - Signal('60分钟_D1MACD开仓_BS辅助V230517_看空_MACD死叉_任意_0') - Signal('60分钟_D1MACD开仓_BS辅助V230517_看多_DIF破零轴_任意_0') - Signal('60分钟_D1MACD开仓_BS辅助V230517_看多_MACD飞吻_任意_0') - Signal('60分钟_D1MACD开仓_BS辅助V230517_看多_MACD金叉_任意_0') :param c: CZSC对象 :param kwargs: 参数字典 :return: 返回信号结果 """ di = int(kwargs.get("di", 1)) freq = c.freq.value cache_key = update_macd_cache(c, fastperiod=12, slowperiod=26, signalperiod=9) k1, k2, k3 = f"{freq}_D{di}MACD开仓_BS辅助V230517".split("_") if len(c.bars_raw) < 50: return create_single_signal(k1=k1, k2=k2, k3=k3, v1="其他") bars = get_sub_elements(c.bars_raw, di=di, n=20) macd = [x.cache[cache_key]["macd"] for x in bars] dif = [x.cache[cache_key]["dif"] for x in bars] if dif[-1] > 0: v1 = "看多" v2 = None if all([x < 0 for x in dif[:-1]]): v2 = "DIF破零轴" if macd[-1] > 0 and macd[-2] < 0: v2 = "MACD金叉" if macd[-5] > macd[-4] > macd[-3] > macd[-2] < macd[-1] and macd[-2] > 0: v2 = "MACD飞吻" if v2: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) if dif[-1] < 0: v1 = "看空" v2 = None if all([x > 0 for x in dif[:-1]]): v2 = "DIF破零轴" if macd[-1] < 0 and macd[-2] > 0: v2 = "MACD死叉" if macd[-5] < macd[-4] < macd[-3] < macd[-2] > macd[-1] and macd[-2] < 0: v2 = "MACD飞吻" if v2: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) return create_single_signal(k1=k1, k2=k2, k3=k3, v1="其他")
[docs]def zdy_macd_V230518(c: CZSC, **kwargs) -> OrderedDict: """MACD交叉次数 参数模板:"{freq}_D{di}MACD交叉N{n}_BS辅助V230518" **信号逻辑:** 1. MACD 大于0,金叉;MACD 小于0,死叉 2. 计算 MACD 连续大于0或者小于0的次数 **信号列表:** - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_死叉_第1次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_死叉_第2次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_死叉_第3次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_死叉_第4次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_死叉_第5次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_死叉_第6次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_死叉_第7次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_死叉_第8次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_死叉_第9次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_金叉_第1次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_金叉_第2次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_金叉_第3次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_金叉_第4次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_金叉_第5次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_金叉_第6次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_金叉_第7次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_金叉_第8次_任意_0') - Signal('60分钟_D1MACD交叉N9_BS辅助V230518_金叉_第9次_任意_0') :param c: CZSC对象 :param kwargs: 参数字典 :return: 返回信号结果 """ di = int(kwargs.get("di", 1)) n = int(kwargs.get("n", 9)) freq = c.freq.value cache_key = update_macd_cache(c, fastperiod=12, slowperiod=26, signalperiod=9) k1, k2, k3 = f"{freq}_D{di}MACD交叉N{n}_BS辅助V230518".split("_") if len(c.bars_raw) < 50: return create_single_signal(k1=k1, k2=k2, k3=k3, v1="其他") bars = get_sub_elements(c.bars_raw, di=di, n=n + 1) macd = [x.cache[cache_key]["macd"] for x in bars] v1 = "金叉" if macd[-1] > 0 else "死叉" count = 0 for m in macd[::-1]: if (m > 0 and macd[-1] > 0) or (m < 0 and macd[-1] < 0): count += 1 else: break if count == n + 1: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2="超计数范围") return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=f"第{count}次")
[docs]def zdy_macd_V230519(c: CZSC, **kwargs) -> OrderedDict: """MACD连续缩柱 参数模板:"{freq}_D{di}N{n}MACD缩柱_BS辅助V230519" **信号逻辑:** 1. 取 N 跟 K 线,如果 N 跟 K 线的 MACD 都在零轴上方,且 N 跟 K 线的 MACD 都比 N-1 跟 K-1 线的 MACD 小,则认为是多头连续缩柱 2. 反之为空头连续缩柱 **信号列表:** - Signal('60分钟_D1N3MACD缩柱_BS辅助V230519_多头连续缩柱_任意_任意_0') - Signal('60分钟_D1N3MACD缩柱_BS辅助V230519_空头连续缩柱_任意_任意_0') :param c: CZSC对象 :param kwargs: 参数字典 :return: 返回信号结果 """ di = int(kwargs.get("di", 1)) n = int(kwargs.get("n", 3)) freq = c.freq.value cache_key = update_macd_cache(c, fastperiod=12, slowperiod=26, signalperiod=9) k1, k2, k3 = f"{freq}_D{di}N{n}MACD缩柱_BS辅助V230519".split("_") v1 = "其他" if len(c.bars_raw) < 50: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) bars = get_sub_elements(c.bars_raw, di=di, n=n) macd = [x.cache[cache_key]["macd"] for x in bars] if all([x > 0 for x in macd]) and all([macd[i] < macd[i - 1] for i in range(1, n)]): v1 = "多头连续缩柱" if all([x < 0 for x in macd]) and all([macd[i] > macd[i - 1] for i in range(1, n)]): v1 = "空头连续缩柱" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def zdy_macd_dif_iqr_V230521(c: CZSC, **kwargs) -> OrderedDict: """MACD柱子与DIF的关系 参数模板:"{freq}_D{di}DIF走平IQR_BS辅助V230521" **信号逻辑:** * DIF走平看多:1) DIF小于MACD柱子;2)最近3个周期DIF相比前一周期变化在四分位距内 * DIF走平看空:1) DIF大于MACD柱子;2)最近3个周期DIF相比前一周期变化在四分位距内 **信号列表:** - Signal('60分钟_D1DIF走平IQR_BS辅助V230521_看空_红柱远离_任意_0') - Signal('60分钟_D1DIF走平IQR_BS辅助V230521_看空_柱子否定_任意_0') - Signal('60分钟_D1DIF走平IQR_BS辅助V230521_看多_绿柱远离_任意_0') - Signal('60分钟_D1DIF走平IQR_BS辅助V230521_看多_柱子否定_任意_0') :param c: CZSC对象 :param kwargs: 参数字典 :return: 返回信号结果 """ di = int(kwargs.get("di", 1)) freq = c.freq.value cache_key = update_macd_cache(c, fastperiod=12, slowperiod=26, signalperiod=9) k1, k2, k3 = f"{freq}_D{di}DIF走平IQR_BS辅助V230521".split("_") v1, v2 = "其他", "其他" if len(c.bars_raw) < 50: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) bars = get_sub_elements(c.bars_raw, di=di, n=100) macd = bars[-1].cache[cache_key]["macd"] * 2 dif = [x.cache[cache_key]["dif"] for x in bars] # 计算四分位距 Q3, Q1 = np.percentile(dif, [75, 25]) IQR = Q3 - Q1 if max(dif[-3:]) - min(dif[-3:]) < IQR and macd < 0: v1 = "看多" v2 = "绿柱远离" if dif[-1] < macd else "柱子否定" if max(dif[-3:]) - min(dif[-3:]) < IQR and macd > 0: v1 = "看空" v2 = "红柱远离" if dif[-1] > macd else "柱子否定" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2)
[docs]def zdy_macd_V230527(c: CZSC, **kwargs) -> OrderedDict: """DIF/DEA/MACD 远离零轴辅助判断买卖点 参数模板:"{freq}_{key}远离W{w}N{n}T{t}_BS辅助V230527" **信号逻辑:** 1. 获取最近 w 根K线,计算 DIF/DEA/MACD,计算绝对值的中位数和标准差 2. 如果最新的 n 个值的最大绝对值大于中位数 + t / 10 * 标准差,则认为远离零轴 **信号列表:** - Signal('60分钟_DIF远离W100N10T10_BS辅助V230527_多头远离_任意_任意_0') - Signal('60分钟_DIF远离W100N10T10_BS辅助V230527_空头远离_任意_任意_0') :param c: CZSC对象 :return: 信号识别结果 """ cache_key = update_macd_cache(c, fastperiod=12, slowperiod=26, signalperiod=9) n = int(kwargs.get("n", 10)) w = int(kwargs.get("w", 100)) t = int(kwargs.get("t", 20)) # 远离零轴的阈值,越大越远离零轴,比较的基准是窗口内的中位数和标准差;20表示2倍 key = kwargs.get("key", "dif").upper() assert key in ["DIF", "DEA", "MACD"] freq = c.freq.value k1, k2, k3 = f"{freq}_{key}远离W{w}N{n}T{t}_BS辅助V230527".split("_") v1 = "其他" if len(c.bi_list) < 3: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) bars = get_sub_elements(c.bars_raw, di=1, n=w) factors = [x.cache[cache_key][key.lower()] for x in bars] median = np.median(np.abs(factors)) std = np.std(np.abs(factors)) last_n_factors = factors[-n:] max_abs_factor = max(last_n_factors, key=abs) if abs(max_abs_factor) > median + t / 10 * std: v1 = f"{'多头' if max_abs_factor > 0 else '空头'}远离" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def zdy_dif_V230527(c: CZSC, **kwargs) -> OrderedDict: """DIF远离零轴辅助判断买卖点 参数模板:"{freq}_N{n}T{t}_DIF远离V230527" **信号逻辑:** 以多头远离为例:回溯N根K线,找到DIF最大值,然后绿柱回溯10跟找到MACD最大值,然后看最大值之间的距离判断原理。 **信号列表:** - Signal('5分钟_N10T30_DIF远离V230527_空头远离_任意_任意_0') - Signal('5分钟_N10T30_DIF远离V230527_多头远离_任意_任意_0') :param c: CZSC对象 :return: 信号识别结果 """ cache_key = update_macd_cache(c, fastperiod=12, slowperiod=26, signalperiod=9) n = int(kwargs.get("n", 10)) t = int(kwargs.get("t", 30)) # 远离零轴的阈值,越大越远离零轴;30表示3倍 freq = c.freq.value k1, k2, k3 = f"{freq}_N{n}T{t}_DIF远离V230527".split("_") v1 = "其他" if len(c.bars_raw) < 30 + n * 8: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) bars = get_sub_elements(c.bars_raw, di=1, n=n * 8) max_abs_dif_bar = max(bars[-n:], key=lambda x: abs(x.cache[cache_key]["dif"])) max_abs_dif = max_abs_dif_bar.cache[cache_key]["dif"] if max_abs_dif > 0: macd_seq = [x.cache[cache_key]["macd"] for x in bars if x.cache[cache_key]["macd"] > 0] if len(macd_seq) > n and abs(max_abs_dif) > max(macd_seq) * t / 10: v1 = "多头远离" elif max_abs_dif < 0: macd_seq = [abs(x.cache[cache_key]["macd"]) for x in bars if x.cache[cache_key]["macd"] < 0] if len(macd_seq) > n and abs(max_abs_dif) > max(macd_seq) * t / 10: v1 = "空头远离" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def zdy_dif_V230528(c: CZSC, **kwargs) -> OrderedDict: """DIF远离零轴辅助判断买卖点 参数模板:"{freq}_N{n}T{t}_DIF远离V230528" **信号逻辑:** 以多头远离为例:回溯1000根K线,找到DIF的所有峰谷值,如最近一个是峰值,且大于所有峰值的 T% 分位数。 **信号列表:** - Signal('5分钟_N20T70_DIF远离V230528_空头远离_任意_任意_0') - Signal('5分钟_N20T70_DIF远离V230528_多头远离_任意_任意_0') :param c: CZSC对象 :return: 信号识别结果 """ cache_key = update_macd_cache(c, fastperiod=12, slowperiod=26, signalperiod=9) n = int(kwargs.get("n", 20)) # 峰谷值的数量最小值 t = int(kwargs.get("t", 80)) # 峰谷排序的分位数 freq = c.freq.value k1, k2, k3 = f"{freq}_N{n}T{t}_DIF远离V230528".split("_") v1 = "其他" def _find_peaks_valleys(data): """在给定的一维数据中找到所有的峰值和谷值。 :param data: 输入的一维数据,应为一个列表或者类似列表的数据结构 :return: 峰值和谷值的索引和对应的值 """ peaks = {} valleys = {} # 检查数据是否至少有5个元素 if len(data) < 5: return {"peaks": peaks, "valleys": valleys} # 遍历数据,找到所有的峰值和谷值 for i in range(2, len(data) - 2): # 如果当前元素比它的左右两侧的元素都大,则它是一个峰值 if data[i - 2] < data[i - 1] < data[i] > data[i + 1] > data[i + 2]: peaks[i] = data[i] # 如果当前元素比它的左右两侧的元素都小,则它是一个谷值 if data[i - 2] > data[i - 1] > data[i] < data[i + 1] < data[i + 2]: valleys[i] = data[i] return peaks, valleys dif_values = [x.cache[cache_key]["dif"] for x in c.bars_raw[-1000:]] peaks, valleys = _find_peaks_valleys(dif_values) if len(peaks) < n or len(valleys) < n: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) peaks_n = np.percentile(list(peaks.values()), t) valleys_n = np.percentile(list(valleys.values()), 100 - t) if max(peaks.keys()) > max(valleys.keys()) and peaks[max(peaks.keys())] > peaks_n and dif_values[-1] > 0: v1 = "多头远离" if max(valleys.keys()) > max(peaks.keys()) and valleys[max(valleys.keys())] < valleys_n and dif_values[-1] < 0: v1 = "空头远离" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def pressure_support_V240222(c: CZSC, **kwargs) -> OrderedDict: """支撑压力线辅助V240222 参数模板:"{freq}_D{di}W{w}高低点验证_支撑压力V240222" **信号逻辑:** 给定窗口内,当前价格与前高前低的关系,判断当前价格的压力和支撑。以高点验证压力位为例: 1. 当前高点与前高的差值在 x 个标准差以内 2. 当前高点与前高分别在窗口的两端 3. 中间的最低价与高点的差值在 y 个标准差以外 **信号列表:** - Signal('60分钟_D1W20高低点验证_支撑压力V240222_支撑位_任意_任意_0') - Signal('60分钟_D1W20高低点验证_支撑压力V240222_压力位_任意_任意_0') :param c: CZSC对象 :param kwargs: 无 :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) w = int(kwargs.get("w", 20)) assert w > 10, "参数 w 必须大于10" freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}W{w}高低点验证_支撑压力V240222".split("_") v1 = "其他" if len(c.bars_raw) < w + 10: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) bars = get_sub_elements(c.bars_raw, di=di, n=w) max_high = max([x.high for x in bars]) min_low = min([x.low for x in bars]) n = int(len(bars) * 0.2) left_bars = bars[:n] right_bars = bars[-n:] gap = np.std([abs(x.high - x.low) for x in bars]) if max_high - min_low < gap * 0.3 * w: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) left_high = max([x.high for x in left_bars]) right_high = max([x.high for x in right_bars]) if max_high == max(left_high, right_high) and max_high - min(left_high, right_high) < gap: v1 = "压力位" left_low = min([x.low for x in left_bars]) right_low = min([x.low for x in right_bars]) if min_low == min(left_low, right_low) and max(left_low, right_low) - min_low < gap: v1 = "支撑位" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def pressure_support_V240402(c: CZSC, **kwargs) -> OrderedDict: """支撑压力线辅助V240402 参数模板:"{freq}_D{di}W{w}_支撑压力V240402" **信号逻辑:** 对于给定K线,判断是否存在支撑压力线,判断逻辑如下: 1. 当前收盘价落在5个以上的分型高低点区间内; 2. 当前收盘价在最近20根K线的最高价附近,认为是压力位;反之,认为是支撑位 **信号列表:** - Signal('60分钟_D1W60_支撑压力V240402_压力位_任意_任意_0') - Signal('60分钟_D1W60_支撑压力V240402_支撑位_任意_任意_0') :param c: CZSC对象 :param kwargs: 无 :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) w = int(kwargs.get("w", 60)) assert w > 10, "参数 w 必须大于10" freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}W{w}_支撑压力V240402".split("_") v1 = "其他" if len(c.bars_raw) < w + 10: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) fxs = c.fx_list[-50:] bars = get_sub_elements(c.bars_raw, di=di, n=w) gap = np.std([abs(x.high - x.low) for x in bars]) max_high = max([x.high for x in bars]) min_low = min([x.low for x in bars]) # 当前收盘价落在5个以上的分型高低点 near_fx = [fx for fx in fxs if fx.low <= bars[-1].close <= fx.high] if len(near_fx) < 5 or max_high - min_low < gap * 3: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) hl_gap = max_high - min_low if bars[-1].close > max_high - hl_gap * 0.2: v1 = "压力位" if bars[-1].close < min_low + hl_gap * 0.3: v1 = "支撑位" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def pressure_support_V240406(c: CZSC, **kwargs) -> OrderedDict: """支撑压力线辅助V240406 参数模板:"{freq}_D{di}W{w}_支撑压力V240406" **信号逻辑:** 对于给定K线,判断是否存在支撑压力线,判断逻辑如下: 1. 当前收盘价在最近20根K线的最高价附近,认为是压力位;反之,认为是支撑位 2. 辅助判断:窗口内最高价附近有3个以上的分型 **信号列表:** - Signal('60分钟_D1W60_支撑压力V240406_压力位_任意_任意_0') - Signal('60分钟_D1W60_支撑压力V240406_支撑位_任意_任意_0') :param c: CZSC对象 :param kwargs: 无 :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) w = int(kwargs.get("w", 60)) assert w > 10, "参数 w 必须大于10" freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}W{w}_支撑压力V240406".split("_") v1 = "其他" if len(c.bars_raw) < w + 10: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) fxs = c.fx_list[-50:] bars = get_sub_elements(c.bars_raw, di=di, n=w) gap = np.std([abs(x.high - x.low) for x in bars]) max_high = max([x.high for x in bars]) min_low = min([x.low for x in bars]) if max_high - min_low < gap * 3: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) # 当前收盘价 +- 0.5倍波动率范围内有5个以上的分型高低点 near_high_fx = [fx for fx in fxs if fx.low < max_high < fx.high] near_low_fx = [fx for fx in fxs if fx.low < min_low < fx.high] hl_gap = max_high - min_low if len(near_high_fx) >= 3 and bars[-1].close > max_high - hl_gap * 0.2: v1 = "压力位" if len(near_low_fx) >= 3 and bars[-1].close < min_low + hl_gap * 0.3: v1 = "支撑位" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def pressure_support_V240530(c: CZSC, **kwargs) -> OrderedDict: """支撑压力线辅助V240530 参数模板:"{freq}_D{di}W{w}N{n}_支撑压力V240530" **信号逻辑:** 对于给定K线,判断是否存在支撑压力线,判断逻辑如下: 1. 寻找关键K线的高低点,关键K线为最近w根K线中与其他K线重叠次数最多的K线 2. 当前K线收盘价在关键K线最高价的正负5个价位左右,认为是压力位;反之,认为是支撑位 **信号列表:** - Signal('60分钟_D1W20N5_支撑压力V240530_支撑位_任意_任意_0') - Signal('60分钟_D1W20N5_支撑压力V240530_压力位_任意_任意_0') :param c: CZSC对象 :param kwargs: 无 :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) w = int(kwargs.get("w", 20)) n = int(kwargs.get("n", 5)) assert w > 10, "参数 w 必须大于10" freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}W{w}N{n}_支撑压力V240530".split("_") v1 = "其他" if len(c.bars_raw) < w + 10: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) bars = get_sub_elements(c.bars_raw, di=di, n=w) # 获取关键K线的高低点 bar_overlap_count = {} for i in range(len(bars)): bar = bars[i] overlap_count = 0 for j in range(len(bars)): if i == j: continue bar2 = bars[j] # 判断两根K线是否重叠 if max(bar.low, bar2.low) < min(bar.high, bar2.high): overlap_count += 1 bar_overlap_count[i] = overlap_count # 如果最大重叠次数小于总数的一半,认为没有关键K线 if max(bar_overlap_count.values()) < 0.5 * w: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) key_bar = bars[max(bar_overlap_count, key=bar_overlap_count.get)] # 获取窗口内的 unique price 列表 prices = [y for x in c.bars_raw for y in [x.open, x.close, x.high, x.low]] prices = sorted(list(set(prices))) high_idx = prices.index(key_bar.high) low_idx = prices.index(key_bar.low) # 处理边界情况 if high_idx < n or low_idx < n: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) if high_idx + n >= len(prices) or low_idx + n >= len(prices): return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) # 判断压力位:当前收盘价在关键K线最高价的正负5个价位左右 pressure_h = prices[high_idx + n] pressure_l = prices[high_idx - n] if pressure_h > bars[-1].close > pressure_l: v1 = "压力位" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) # 判断支撑位:当前收盘价在关键K线最低价的正负5个价位左右 support_h = prices[low_idx + n] support_l = prices[low_idx - n] if support_h > bars[-1].close > support_l: v1 = "支撑位" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)