上文「部署基于 Webhook 的 Telegram Bot」实现了一个最为简单的命令和消息响应的 Bot。事实上,Telegram Bot 一个很实用的功能是作为实时消息推送源。原生 APP 通常在消息推送上存在着较大的延迟,对于一些实时性要求较高的消息推送往往很难起到实际的作用。

一种解决方案是使用基于微信公众号的消息推送机制,但不仅限制较多,可定制功能也相对比较少。相比之下,Telegram Bot API 则要强大很多(每秒 30 条的常规消息和每分钟 20 条群组消息的上限已经可以满足很多基本的需求)。本文将阐述如何利用 Telegram Bot 来进行实时消息推送,如行情价格的预警、自动化交易信息等,以及交互式的查询和设置功能。

服务器环境:香港 VPS;CentOS 7.4;Nginx v1.14.0;python-telegram-bot v10.1.0。

交互会话模块

  • 会话模块主要基于 telegram.ext 子模块中的会话句柄 ConversationHandler 类,其中需要定义以下几个参数:1)会话的入口点 entry_points,如接收 /start 命令来开启一个会话;2)会话的不同状态 states,与各自对应的回调函数;3)会话的回退函数 fallback,用于处理一些非预期的消息。

为了在一个会话中的不同状态之间进行切换,其回调函数必须返回一个值用来指向下一个状态。如果值为空,则不改变当前状态;如果值为 END-1 则表示结束当前会话。

  • 市场深度(Depth of Market,DOM)为例,我们定义了入口点命令为 /depth,其回调函数为 depth_start。它会列出所有可用的交易所供用户选择,这里通过动态改变键盘的方式来实现。然后会话进入「选择交易所」状态。
def depth_start(bot, update):
    reply = 'Select an *exchange* to start.\n'
    reply += '`{}`\nMore: /done'.format(SEP)
    keyboard = [EX_LIST[i:i + COLS] for i in range(0, len(EX_LIST), COLS)]
    markup = ReplyKeyboardMarkup(keyboard, one_time_keyboard=True)
    update.message.reply_text(reply, reply_markup=markup, quote=True,
                              parse_mode=ParseMode.MARKDOWN)
    return EXCHANGE

byBANu0EAljc7ZbN2.png

  • 在「选择交易所」状态下,Bot 接受用户通过键盘输入的交易所名称,其回调函数 depth_exchange 将列出该交易所下所有可用的交易代码(同样以改变键盘的方式),并进入「选择交易代码」状态。
def depth_exchange(bot, update, user_data):
    global g_exchanges
    user = update.message.from_user
    exchange = g_exchanges[update.message.text.lower()]
    user_data['exchange'] = exchange
    symbols = exchange.mypairs
    keyboard = [symbols[i:i + COLS] for i in range(0, len(symbols), COLS)]
    reply = 'Exchange: `{}`\n`{}`\n'.format(exchange.exid, SEP)
    reply += 'Select a *symbol* to retrieve.\n'
    reply += '`{}`\nMore: /back, /done'.format(SEP)
    markup = ReplyKeyboardMarkup(keyboard)
    update.message.reply_text(reply, reply_markup=markup, quote=True,
                              parse_mode=ParseMode.MARKDOWN)
    return SYMBOL

PyFWnbxNfkdzIr5U.png

由于这里我们需要在整个会话过程中维护一个用户选择的交易所和交易代码的信息,因此可以通过 pass_user_data 参数来传递用户数据,它指明了是否把维护的用户数据传递给回调函数。

  • 对于用户选择的交易代码,Bot 后端会返回当前市场深度中一些可能的支撑位和阻力位,并推送给用户。此时,用户可以停留在该状态下继续选择其他交易代码对,也可以通过 /back 命令返回上一个状态,即「选择交易所」状态。
def depth_fetch(bot, update, user_data):
    global g_exchanges
    user = update.message.from_user
    exchange = user_data['exchange']
    symbol = update.message.text
    # result = ... 统计市场深度信息
    reply = 'Exchange: `{}`\n'.format(exchange.exid)
    reply += 'Symbol: `{}`\n`{}`\n'.format(symbol, SEP)
    reply += result
    reply += '`{}`\nMore: /back, /done'.format(SEP)
    update.message.reply_text(reply, quote=True,
                              parse_mode=ParseMode.MARKDOWN)
    return SYMBOL

RYR8Av3KCn24T6aF.png

  • 最后,利用这些回调函数来实例化 ConversationHandler 类,即可实现了一个完整的查询市场深度信息的会话功能。
EXCHANGE, SYMBOL = range(2)
depth_conv = ConversationHandler(
    entry_points=[CommandHandler('depth', depth_start)],
    states={
        EXCHANGE: [MessageHandler(Filters.text, depth_exchange,
                                  pass_user_data=True)],
        SYMBOL: [MessageHandler(Filters.text, depth_fetch,
                                pass_user_data=True),
                 CommandHandler('back', depth_start)]
    },
    fallbacks=[CommandHandler('done', done)],
    # conversation_timeout=300
)

消息推送模块

  • 基于上一节同样的原理,我们创建了另一个设置消息预警的会话,其入口点命令为 /alarm。在该会话中,用户可以:1)查看所有开启和关闭的通知;2)为某个交易代码开启通知,当行情价格达到设置的价格区间后,会向用户推送预警信息;3)关闭某个交易代码的通知。效果图如下:

5Phl60lGDllKrJ2N.png

  • 为了实现实时预警功能,Bot 需要定时(比如每分钟)查询行情数据,然后根据用户的提醒设置,决定是否进行推送。因此,这里需要使用到 telegram.ext 子模块的另一个重要的类:JobQueue 类,它可以用来实现延迟或定期执行某项操作的功能。

类似 Dispatcher 类,实例化 Updater 类时同样也自动实例化了 JobQueue 类,并作为 Updater 实例化对象的 job_queue 属性。

  • 利用 run_repeating 方法,可以按指定的时间间隔周期性地向用户推送消息。这里我们每分钟检测账户余额和行情数据。当余额有较大变动,或者行情价格触发预警时,便进行消息推送。
updater = Updater(TOKEN)
jq = updater.job_queue

jq.run_repeating(send_balance, interval=60, first=0)
jq.run_repeating(send_alarm, interval=60, first=0)
def send_balance(bot, job):
    # when portfolio varies a lot
    msg = 'Exchange: `{}`\n'.format(exchange.exid)
    msg += '`{}` `{:.4f}`\n'.format(trend, portfolio)
    bot.send_message(
        chat_id=CHATID, text=msg, parse_mode=ParseMode.MARKDOWN)

另外两个常用的方法是:run_once()run_daily,分别对应不同的任务频率。

  • 至此,就构建了一个完整的包含预警消息推送和信息交互查询和设置的 Telegram Bot,可以满足个人或者小范围的实时性需求。其他主题,比如处理不同的消息类型,以及性能优化等,将不再赘述。

VZxMDtlq9utjWjD7.png

发表新评论

沪ICP备17018959号-3