Source code for czsc.signals.ang

# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2023/5/11 18:11
describe: 琅盎的信号函数
"""
from loguru import logger
try:
    import talib as ta
except:
    logger.warning("ta-lib 没有正确安装,相关信号函数无法正常执行。"
                   "请参考安装教程 https://blog.csdn.net/qaz2134560/article/details/98484091")
import numpy as np
from typing import List
from collections import OrderedDict
from czsc.analyze import CZSC, RawBar
from czsc.utils.sig import get_sub_elements, create_single_signal


[docs]def adtm_up_dw_line_V230603(c: CZSC, **kwargs) -> OrderedDict: """ADTM能量异动,贡献者:琅盎 参数模板:"{freq}_D{di}N{n}M{m}TH{th}_ADTMV230603" **信号逻辑:** 1. 如果今天的开盘价大于昨天的开盘价,取最高价 - 开盘价、开盘价 - 昨天的开盘价这二者中最大值, 再将取出的最大值求和;反之取0,形成up_sum 2. 如果今天的开盘价小于昨天的开盘价,取开盘价 - 最低价、昨天的开盘价 -开盘价这二者中最大值, 再将取出的最大值求和;么之取0,形成dw_sum 3. 当 up_sum > dw_sum 或 最大值的差值之商小于TH 看多,反之看空 **信号列表:** - Signal('日线_D1N30M20TH5_ADTMV230603_看空_任意_任意_0') - Signal('日线_D1N30M20TH5_ADTMV230603_看多_任意_任意_0') :param c: CZSC对象 :param kwargs: 参数字典 - :param di: 信号计算截止倒数第i根K线 - :param n: 获取K线的根数,默认为30 - :param m: 获取K线的根数,默认为20 - :param th: adtm阈值,默认为5,代表 5 / 10 = 0.5 :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) n = int(kwargs.get("n", 30)) m = int(kwargs.get("m", 20)) th = int(kwargs.get("th", 5)) freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}N{n}M{m}TH{th}_ADTMV230603".split('_') v1 = "其他" if len(c.bars_raw) < di + max(n, m) + 10: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) n_bars = get_sub_elements(c.bars_raw, di=di, n=n) m_bars = get_sub_elements(c.bars_raw, di=di, n=m) up_sum = np.sum([max(n_bars[i].high - n_bars[i].open, n_bars[i].open - n_bars[i - 1].open) for i in range(1, len(n_bars)) if n_bars[i].open > n_bars[i - 1].open]) dw_sum = np.sum([max(m_bars[i].open - m_bars[i].low, m_bars[i - 1].open - m_bars[i].open) for i in range(1, len(m_bars)) if m_bars[i].open < m_bars[i - 1].open]) adtm = (up_sum - dw_sum) / max(up_sum, dw_sum) if up_sum > dw_sum or adtm > th / 10: v1 = "看多" if up_sum < dw_sum or adtm < th / 10: v1 = "看空" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def amv_up_dw_line_V230603(c: CZSC, **kwargs) -> OrderedDict: """AMV能量异动,贡献者:琅盎 参数模板:"{freq}_D{di}N{n}M{m}_AMV能量V230603" **信号逻辑:** 用成交量作为权重对开盘价和收盘价的均值进行加权移动平均。成交量越大的价格对移动平均结果的影响越大, AMV 指标减小了成交量小的价格波动的影响。当短期 AMV 线上穿/下穿长期 AMV线时,产生买入/卖出信号。 **信号列表:** - Signal('日线_D1N30M120_AMV能量V230603_看多_任意_任意_0') - Signal('日线_D1N30M120_AMV能量V230603_看空_任意_任意_0') :param c: CZSC对象 :param kwargs: 参数字典 - :param di: 信号计算截止倒数第i根K线 - :param n: 获取K线的根数,默认为30 - :param m: 获取K线的根数,默认为20 :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) n = int(kwargs.get("n", 30)) m = int(kwargs.get("m", 120)) freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}N{n}M{m}_AMV能量V230603".split('_') if n > m or len(c.bars_raw) < di + m + 10: return create_single_signal(k1=k1, k2=k2, k3=k3, v1="其他") n_bars = get_sub_elements(c.bars_raw, di=di, n=n) m_bars = get_sub_elements(c.bars_raw, di=di, n=m) amov1 = np.sum([(n_bars[i].amount * (n_bars[i].open + n_bars[i].close) / 2) for i in range(len(n_bars))]) amov2 = np.sum([(m_bars[i].amount * (m_bars[i].open + m_bars[i].close) / 2) for i in range(len(m_bars))]) vol_sum1 = np.sum([n_bars[i].amount for i in range(len(n_bars))]) vol_sum2 = np.sum([m_bars[i].amount for i in range(len(m_bars))]) amv1 = amov1 / vol_sum1 amv2 = amov2 / vol_sum2 v1 = "看多" if amv1 > amv2 else "看空" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def asi_up_dw_line_V230603(c: CZSC, **kwargs) -> OrderedDict: """ASI多空分类,贡献者:琅盎 参数模板:"{freq}_D{di}N{n}P{p}_ASI多空V230603" **信号逻辑:** 由于 SI 的波动性比较大,所以我们一般对 SI 累计求和得到 ASI 并捕 捉 ASI 的变化趋势。一般我们不会直接看 ASI 的数值(对 SI 累计求 和的求和起点不同会导致求出 ASI 的值不同),而是会观察 ASI 的变 化方向。我们利用 ASI 与其均线的交叉来产生交易信号,上穿/下穿均 线时买入/卖出 **信号列表:** - Signal('日线_D1N30P120_ASI多空V230603_看多_任意_任意_0') - Signal('日线_D1N30P120_ASI多空V230603_看空_任意_任意_0') :param c: CZSC对象 :param kwargs: 参数字典 - :param di: 信号计算截止倒数第i根K线 - :param n: 获取K线的根数,默认为30 - :param p: 获取K线的根数,默认为20 :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) n = int(kwargs.get("n", 30)) p = int(kwargs.get("p", 120)) freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}N{n}P{p}_ASI多空V230603".split('_') v1 = "其他" if len(c.bars_raw) < di + p + 10: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) _bars = get_sub_elements(c.bars_raw, di=di, n=p) close_prices = np.array([bar.close for bar in _bars]) open_prices = np.array([bar.open for bar in _bars]) high_prices = np.array([bar.high for bar in _bars]) low_prices = np.array([bar.low for bar in _bars]) o = np.concatenate([[close_prices[0]], close_prices[:-1]]) a = np.abs(high_prices - o) b = np.abs(low_prices - o) c = np.abs(high_prices - np.concatenate([[low_prices[0]], low_prices[:-1]])) # type: ignore d = np.abs(o - np.concatenate([[open_prices[0]], open_prices[:-1]])) k = np.maximum(a, b) m = np.maximum(high_prices - low_prices, n) r1 = a + 0.5 * b + 0.25 * d r2 = b + 0.5 * a + 0.25 * d r3 = c + 0.25 * d r4 = np.where((a >= b) & (a >= c), r1, r2) r = np.where((c >= a) & (c >= b), r3, r4) if (r * k / m != 0).all(): si = 50 * (close_prices - c + (c - open_prices) + 0.5 * (close_prices - open_prices)) / (r * k / m) else: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) asi = np.cumsum(si) v1 = "看多" if asi[-1] > np.mean(asi[-p:]) else "看空" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def clv_up_dw_line_V230605(c: CZSC, **kwargs) -> OrderedDict: """CLV多空分类,贡献者:琅盎 参数模板:"{freq}_D{di}N{n}_CLV多空V230605" **信号逻辑:** CLV 用来衡量收盘价在最低价和最高价之间的位置。 当CLOSE=HIGH 时,CLV=1; 当 CLOSE=LOW 时,CLV=-1;当 CLOSE位于 HIGH 和 LOW 的中点时, CLV=0。CLV>0(<0),说明收盘价离最高(低)价更近。我们用 CLVMA 上穿/下穿 0 来产生买入/卖出信号 **信号列表:** - Signal('日线_D1N70_CLV多空V230605_看多_任意_任意_0') - Signal('日线_D1N70_CLV多空V230605_看空_任意_任意_0') :param c: CZSC对象 :param kwargs: 参数字典 - :param di: 信号计算截止倒数第i根K线 - :param n: 获取K线的根数,默认为60 :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) n = int(kwargs.get("n", 70)) freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}N{n}_CLV多空V230605".split('_') if len(c.bars_raw) < di + 100: return create_single_signal(k1=k1, k2=k2, k3=k3, v1="其他") _bars = get_sub_elements(c.bars_raw, di=di, n=n) close = np.array([bar.close for bar in _bars]) low = np.array([bar.low for bar in _bars]) high = np.array([bar.high for bar in _bars]) clv_ma = np.mean((2 * close - low - high) / (high - low)) v1 = "看多" if clv_ma > 0 else "看空" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
[docs]def cmo_up_dw_line_V230605(c: CZSC, **kwargs) -> OrderedDict: """CMO能量异动,贡献者:琅盎 参数模板:"{freq}_D{di}N{n}M{m}_CMO能量V230605" 信号逻辑:** CMO指标用过去N天的价格上涨量和价格下跌量得到,CMO>(<)0 表示当前处于上涨(下跌)趋势,CMO 越 大(小)则当前上涨(下跌)趋势越强。我们用 CMO 上穿 30/下穿-30来产生买入/卖出信号。 信号列表: - Signal('30分钟_D1N70M30_CMO能量V230605_看空_任意_任意_0') - Signal('30分钟_D1N70M30_CMO能量V230605_看多_任意_任意_0') :param c: CZSC对象 :param kwargs: 参数字典 - :param di: 信号计算截止倒数第i根K线 - :param n: 获取K线的根数,默认为60 - :param m: 信号预警轴,默认为30 :return: 信号识别结果 """ di = int(kwargs.get("di", 1)) n = int(kwargs.get("n", 70)) m = int(kwargs.get("m", 30)) freq = c.freq.value k1, k2, k3 = f"{freq}_D{di}N{n}M{m}_CMO能量V230605".split('_') v1 = "其他" if len(c.bars_raw) < di + n + 10: return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) _bars = get_sub_elements(c.bars_raw, di=di, n=n) up_sum = np.sum([_bars[i].close - _bars[i - 1].close for i in range(1, len(_bars)) if (_bars[i].close - _bars[i - 1].close) > 0]) dw_sum = np.sum([_bars[i - 1].close - _bars[i].close for i in range(1, len(_bars)) if (_bars[i - 1].close - _bars[i].close) > 0]) cmo = (up_sum - dw_sum) / (up_sum + dw_sum) * 100 if cmo > m: v1 = "看多" if cmo < -m: v1 = "看空" return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)