gitbook/Python核心技术与实战/docs/111814.md
2022-09-03 22:05:03 +08:00

13 KiB
Raw Permalink Blame History

37 | Kafka & ZMQ自动化交易流水线

你好,我是景霄。

在进行这节课的学习前,我们先来回顾一下,前面三节课,我们学了些什么。

第 34 讲,我们介绍了如何通过 RESTful API 在交易所下单;第 35 讲,我们讲解了如何通过 Websocket ,来获取交易所的 orderbook 数据;第 36 讲,我们介绍了如何实现一个策略,以及如何对策略进行历史回测。

事实上,到这里,一个简单的、可以运作的量化交易系统已经成型了。你可以对策略进行反复修改,期待能得到不错的 PnL。但是对于一个完善的量化交易系统来说只有基本骨架还是不够的。

在大型量化交易公司,系统一般是分布式运行的,各个模块独立在不同的机器上,然后互相连接来实现。即使是个人的交易系统,在进行诸如高频套利等算法时,也需要将执行层布置在靠近交易所的机器节点上。

所以,从今天这节课开始,我们继续回到 Python 的技术栈,从量化交易系统这个角度切入,为你讲解如何实现分布式系统之间的复杂协作。

中间件

我们先来介绍一下中间件这个概念。中间件,是将技术底层工具和应用层进行连接的组件。它要实现的效果则是,让我们这些需要利用服务的工程师,不必去关心底层的具体实现。我们只需要拿着中间件的接口来用就好了。

这个概念听起来并不难理解,我们再举个例子让你彻底明白。比如拿数据库来说,底层数据库有很多很多种,从关系型数据库 MySQL 到非关系型数据库 NoSQL从分布式数据库 Spanner 到内存数据库 Redis不同的数据库有不同的使用场景也有着不同的优缺点更有着不同的调用方式。那么中间件起什么作用呢

中间件,等于在这些不同的数据库上加了一层逻辑,这一层逻辑专门用来和数据库打交道,而对外只需要暴露同一个接口即可。这样一来,上层的程序员调用中间件接口时,只需要让中间件指定好数据库即可,其他参数完全一致,极大地方便了上层的开发;同时,下层技术栈在更新换代的时候,也可以做到和上层完全分离,不影响程序员的使用。

它们之间的逻辑关系,你可以参照下面我画的这张图。我习惯性把中间件的作用调侃为:没有什么事情是加一层解决不了的;如果有,那就加两层。

当然,这只是其中一个例子,也只是中间件的一种形式。事实上,比如在阿里,中间件主要有分布式关系型数据库 DRDS、消息队列和分布式服务这么三种形式。而我们今天主要会用到消息队列因为它非常符合量化交易系统的应用场景即事件驱动模型。

消息队列

那么,什么是消息队列呢?一如其名,消息,即互联网信息传递的个体;而队列,学过算法和数据结构的你,应该很清楚这个 FIFO先进先出的数据结构吧。如果算法基础不太牢建议你可以学习极客时间平台上王争老师的“数据结构与算法之美”专栏第 09讲即为队列知识)

简而言之,消息队列就是一个临时存放消息的容器,有人向消息队列中推送消息;有人则监听消息队列,发现新消息就会取走。根据我们刚刚对中间件的解释,清晰可见,消息队列也是一种中间件。

目前,市面上使用较多的消息队列有 RabbitMQ、Kafka、RocketMQ、ZMQ 等。不过今天,我只介绍最常用的 ZMQ 和 Kafka。

我们先来想想,消息队列作为中间件有什么特点呢?

首先是严格的时序性。刚刚说了,队列是一种先进先出的数据结构,你丢给它 1, 2, 3,然后另一个人从里面取数据,那么取出来的一定也是 1, 2, 3,严格保证了先进去的数据先出去,后进去的数据后出去。显然,这也是消息机制中必须要保证的一点,不然颠三倒四的结果一定不是我们想要的。

说到队列的特点,简单提一句,与“先进先出“相对的是栈这种数据结构,它是先进后出的,你丢给它 1, 2, 3,再从里面取出来的时候,拿到的就是3, 2, 1了,这一点一定要区分清楚。

其次,是分布式网络系统的老生常谈问题。如何保证消息不丢失?如何保证消息不重复?这一切,消息队列在设计的时候都已经考虑好了,你只需要拿来用就可以,不必过多深究。

不过,很重要的一点,消息队列是如何降低系统复杂度,起到中间件的解耦作用呢?我们来看下面这张图。

消息队列的模式是发布和订阅,一个或多个消息发布者可以发布消息,一个或多个消息接受者可以订阅消息。 从图中你可以看到,消息发布者和消息接受者之间没有直接耦合,其中,

  • 消息发布者将消息发送到分布式消息队列后,就结束了对消息的处理;
  • 消息接受者从分布式消息队列获取该消息后,即可进行后续处理,并不需要探寻这个消息从何而来。

至于新增业务的问题,只要你对这类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,所以也就实现了业务的可扩展性设计。

讲了这么多概念层的东西,想必你迫不及待地想看具体代码了吧。接下来,我们来看一下 ZMQ 的实现。

ZMQ

先来看 ZMQ这是一个非常轻量级的消息队列实现。

作者 Pieter Hintjens 是一位大牛他本人的经历也很传奇2010年诊断出胆管癌并成功做了手术切除。但2016年4月却发现癌症大面积扩散到了肺部已经无法治疗。他写的最后一篇通信模式是关于死亡协议的之后在比利时选择接受安乐死。

ZMQ 是一个简单好用的传输层,它有三种使用模式:

  • Request - Reply 模式;
  • Publish - Subscribe 模式;
  • Parallel Pipeline 模式。

第一种模式很简单client 发消息给 serverserver 处理后返回给 client完成一次交互。这个场景你一定很熟悉吧没错和 HTTP 模式非常像,所以这里我就不重点介绍了。至于第三种模式,与今天内容无关,这里我也不做深入讲解。

我们需要详细来看的是第二种即“PubSub”模式。下面是它的具体实现代码很清晰你应该很容易理解

# 订阅者 1
import zmq


def run():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:6666')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    print('client 1')
    while True:
        msg = socket.recv()
        print("msg: %s" % msg)


if __name__ == '__main__':
    run()

########## 输出 ##########

client 1
msg: b'server cnt 1'
msg: b'server cnt 2'
msg: b'server cnt 3'
msg: b'server cnt 4'
msg: b'server cnt 5'

# 订阅者 2
import zmq


def run():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:6666')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    print('client 2')
    while True:
        msg = socket.recv()
        print("msg: %s" % msg)


if __name__ == '__main__':
    run()

########## 输出 ##########

client 2
msg: b'server cnt 1'
msg: b'server cnt 2'
msg: b'server cnt 3'
msg: b'server cnt 4'
msg: b'server cnt 5'

# 发布者
import time
import zmq


def run():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind('tcp://*:6666')

    cnt = 1

    while True:
        time.sleep(1)
        socket.send_string('server cnt {}'.format(cnt))
        print('send {}'.format(cnt))
        cnt += 1


if __name__ == '__main__':
    run()

########## 输出 ##########

send 1
send 2
send 3
send 4
send 5

这里要注意的一点是,如果你想要运行代码,请先运行两个订阅者,然后再打开发布者。

接下来,我来简单讲解一下。

对于订阅者,我们要做的是创建一个 zmq Context连接 socket 到指定端口。其中setsockopt_string() 函数用来过滤特定的消息,而下面这行代码:

socket.setsockopt_string(zmq.SUBSCRIBE, '') 

则表示不过滤任何消息。最后,我们调用 socket.recv() 来接受消息就行了,这条语句会阻塞在这里,直到有新消息来临。

对于发布者,我们同样要创建一个 zmq Context绑定到指定端口不过请注意这里用的是 bind 而不是 connect。因为在任何情况下同一个地址端口 bind 只能有一个,但却可以有很多个 connect 链接到这个地方。初始化完成后,再调用 socket.send_string ,即可将我们想要发送的内容发送给 ZMQ。

当然,这里还有几个需要注意的地方。首先,有了 send_string我们其实已经可以通过 JSON 序列化,来传递几乎我们想要的所有数据结构,这里的数据流结构就已经很清楚了。

另外,把发布者的 time.sleep(1) 放在 while 循环的最后,严格来说应该是不影响结果的。这里你可以尝试做个实验,看看会发生什么。

你还可以思考下另一个问题,如果这里是多个发布者,那么 ZMQ 应该怎么做呢?

Kafka

接着我们再来看一下 Kafka。

通过代码实现你也可以发现ZMQ 的优点主要在轻量、开源和方便易用上,但在工业级别的应用中,大部分人还是会转向 Kafka 这样的有充足支持的轮子上。

相比而言Kafka 提供了点对点网络和发布订阅模型的支持,这也是用途最广泛的两种消息队列模型。而且和 ZMQ 一样Kafka 也是完全开源的,因此你也能得到开源社区的充分支持。

Kafka的代码实现和ZMQ大同小异这里我就不专门讲解了。关于Kafka的更多内容极客时间平台也有对 Kafka 的专门详细的介绍,对此有兴趣的同学,可以在极客时间中搜索“Kafka核心技术与实战”,这个专栏里,胡夕老师用详实的篇幅,讲解了 Kafka 的实战和内核,你可以加以学习和使用。

来自极客时间专栏“Kafka核心技术与实战”

基于消息队列的 Orderbook 数据流

最后回到我们的量化交易系统上。

量化交易系统中,获取 orderbook 一般有两种用途:策略端获取实时数据,用来做决策;备份在文件或者数据库中,方便让策略和回测系统将来使用。

如果我们直接单机监听交易所的消息,风险将会变得很大,这在分布式系统中叫做 Single Point Failure。一旦这台机器出了故障或者网络连接突然中断我们的交易系统将立刻暴露于风险中。

于是,一个很自然的想法就是,我们可以在不同地区放置不同的机器,使用不同的网络同时连接到交易所,然后将这些机器收集到的信息汇总、去重,最后生成我们需要的准确数据。相应的拓扑图如下:

当然,这种做法也有很明显的缺点:因为要同时等待多个数据服务器的数据,再加上消息队列的潜在处理延迟和网络延迟,对策略服务器而言,可能要增加几十到数百毫秒的延迟。如果是一些高频或者滑点要求比较高的策略,这种做法需要谨慎考虑。

但是对于低频策略、波段策略这种延迟换来的整个系统的稳定性和架构的解耦性还是非常值得的。不过你仍然需要注意这种情况下消息队列服务器有可能成为瓶颈也就是刚刚所说的Single Point Failure一旦此处断开依然会将系统置于风险之中。

事实上我们可以使用一些很成熟的系统例如阿里的消息队列AWS 的 Simple Queue Service 等等,使用这些非常成熟的消息队列系统,风险也将会最小化。

总结

这节课,我们分析了现代化软件工程领域中的中间件系统,以及其中的主要应用——消息队列。我们讲解了最基础的消息队列的模式,包括点对点模型、发布者订阅者模型,和一些其他消息队列自己支持的模型。

在真实的项目设计中,我们要根据自己的产品需求,来选择使用不同的模型;同时也要在编程实践中,加深对不同技能点的了解,对系统复杂性进行解耦,这才是设计出高质量系统的必经之路。

思考题

今天的思考题文中我也提到过这里再专门列出强调一下。在ZMQ 那里,我提出了两个问题:

  • 如果你试着把发布者的 time.sleep(1) 放在 while 循环的最后,会发生什么?为什么?
  • 如果有多个发布者ZMQ 应该怎么做呢?

欢迎留言写下你的思考和疑惑,也欢迎你把这篇文章分享给更多的人一起学习。