使用ray扩展python应用之流式处理应用

article/2025/6/27 9:10:20

流式处理就是数据一来,咱们就得赶紧处理,不能攒批再算。这里的实时不是指瞬间完成,而是要在数据产生的那一刻,或者非常接近那个时间点,就做出响应。这种处理方式,我们称之为流式处理。

流式处理的应用场景

流式处理到底能干啥?它应用场景非常广泛。

  • 日志分析。应用每天产生海量日志,边生产边分析,一旦发现异常,比如某个服务崩溃了,或者有安全事件发生,立刻就能报警,快速定位问题根源,大大缩短故障恢复时间。

  • 金融交易,流式处理就能实时监控每一笔交易,结合用户行为模式、地理位置、交易金额等多维度信息,通过规则引擎或者机器学习模型,秒级识别出异常交易。

  • 网络安全。实时监控网络流量、系统日志、用户登录行为等等。通过建立正常的安全基线,任何偏离这个基线的异常活动,比如大量未授权访问尝试、异常的数据包传输,都能被流式系统迅速捕捉到。

  • 物流行业。GPS信号、传感器数据源源不断地传入系统,通过流式处理,可以实时计算最优路径,避开拥堵路段,动态调整配送计划。这不仅提高了效率,还能降低油耗和运营成本。

  • 物联网IoT。无数的传感器设备,比如工厂里的机器、城市里的路灯、农田里的土壤湿度监测器,它们都在不停地产生数据。

  • 推荐引擎。每一次点击、浏览、搜索,都被实时记录下来,形成你的行为数据流。推荐系统实时分析这些数据,结合协同过滤、深度学习等算法,不断更新你的兴趣画像,然后给你推送最相关的商品或内容。

Ray如何实现流式处理

了解了流式应用的重要性,我们来看看如何在 Ray 中实现它们。目前主要有两种方式:

  1. 利用 Ray 提供的强大底层组件,比如 Actors、Task 并行、共享内存等,自己动手构建一套定制化的流式处理框架。这种方式灵活性高,但开发量也相对较大。

  2. 将 Ray 与现有的成熟流式引擎集成,比如 Apache Flink,通常会借助 Kafka 这样的消息中间件来连接数据源和处理逻辑。

Ray 的定位不是要做一个独立的、功能全面的流式系统,而是提供一个强大的计算平台,让开发者可以更方便地构建自己的流式应用。既然提到了集成,那为什么 Kafka 成为了流式应用中最受欢迎的消息中间件之一呢?Kafka 能够以惊人的吞吐量处理海量数据流,同时保证数据的持久化存储,这意味着你可以随时回溯历史数据进行分析。而且,Kafka 的水平扩展性非常好,可以通过增加 Broker 节点轻松应对数据量的增长。更重要的是,围绕 Kafka 已经形成了一个非常成熟的生态系统,各种工具和库层出不穷。

kafka和ray集成

这里只关注那些kafka与 Ray 集成时最相关的特性。很多人把 Kafka 当作消息队列,比如 RabbitMQ,但其实它本质上是一个分布式日志系统

在这里插入图片描述

它不像传统的队列那样,消息发出去就没了,Kafka 把每一条消息都当作一个记录,按顺序追加写入到日志文件中。每条记录可以包含 Key 和 Value,当然两者都是可选的。生产者总是往日志的末尾写入新消息。而消费者呢,它可以选择从哪个位置开始读取,这个位置叫做 Offset。这意味着,消费者可以读取任意历史消息,也可以只读最新的消息。

这种基于日志的设计,带来了几个关键区别。

  • 消息的生命周期。传统队列里的消息,一旦被消费者成功消费,通常就从队列里删除了,是临时的。而 Kafka 的消息是持久化的,会一直保存在磁盘上,直到达到配置的保留策略。这使得 Kafka 支持消息回溯。

  • 消费者管理。在队列系统里,通常是 Broker 来管理消费者的 Offset,告诉消费者下次该从哪里读。但在 Kafka 里,Offset 是由消费者自己负责管理的。Kafka 可以支持大量的消费者同时读取同一个 Topic,因为每个消费者只需要记录自己的 Offset 即可,互不干扰。

Kafka 也像消息队列一样,用 Topic 来组织数据。但 Kafka 的 Topic 是一个纯粹的逻辑概念,它下面实际上是由多个 Partition 组成的。你可以把 Partition 理解为 Topic 的物理分片。为什么要这样做?主要是为了实现水平扩展和并行处理。每个 Partition 内部的数据是有序的,但不同 Partition 之间的数据是无序的。生产者写入数据时,会根据一定的策略选择写入哪个 Partition。那么,生产者是怎么决定把消息写到哪个 Partition 的呢?主要有两种情况。

  • 如果你没有指定 Key,Kafka 默认会采用轮询的方式,均匀地把消息分配到不同的 Partition。这样可以保证负载均衡。
  • 你给消息指定一个 Key,比如用户的 ID 或者订单号。Kafka 默认会使用 Key 的 Hash 值来决定写入哪个 Partition。这样做的好处是,同一个 Key 的所有消息,都会被写入同一个 Partition,保证了该 Key 下消息的顺序性。
  • 如果有特殊需求,也可以实现自定义的 Partitioning 策略。

记住,Partition 内部消息是有序的,跨 Partition 的消息是无序的。有了 Partition,怎么让消费者高效地读取呢?这就引出了 Consumer Group 的概念。你可以把多个消费者组成一个组,让它们共同消费同一个 Topic 的消息。Kafka 会把这个 Topic 的所有 Partition 分配给这个 Consumer Group 里的消费者。

在这里插入图片描述

比如,一个 Topic 有 10 个 Partition,你在一个 Group 里放了 5 个消费者,那么 Kafka 会把每个消费者分配到 2 个 Partition。这样,每个消费者就可以并行地从自己的 Partition 里读取消息,大大提高了整体的消费速度。所以,想提升消费能力,要么增加消费者数量,要么增加 Partition 数量。Kafka 提供了丰富的 API 来支持各种操作。主要有五大类:

  • Producer API 用来发送消息;
  • Consumer API 用来读取消息;
  • AdminClient API 用来管理 Topic、Broker 等元数据;
  • Streams API 提供了更高级的流处理能力,可以直接在 Kafka 上做转换;
  • Connect API 则是用来连接 Kafka 和外部系统的,比如数据库、搜索引擎等。

Kafka 本身只关心字节数组,所以我们需要把实际的数据结构序列化成字节数组才能发送,这个过程叫做 Marshaling。常用的格式有很多,比如 Avro、Protobuf、JSON、甚至是 Python 的 Pickle。选择哪种格式取决于你的具体需求,比如性能、消息大小、是否需要 Schema 定义、扩展性以及语言兼容性。另外要注意一点,Kafka 本身不保证消息的唯一性,也就是说,可能会出现重复消息。所以,确保消息只被处理一次的责任落在了消费者身上,通常需要消费者自己记录 Offset 并提交。

示例代码

现在我们把 Kafka 和 Ray 结合起来。为什么用 Ray Actors 来封装 Kafka 的 Consumer 和 Producer 呢?

  • 对于 Kafka Consumer,它通常需要在一个无限循环里运行,不断拉取消息,并且需要记住自己已经读到哪里了,也就是维护 Offset。这正好符合 Ray Actor 的特点:一个 Actor 就是一个独立的状态服务。所以,把 Kafka Consumer 实现为一个 Ray Actor,非常自然。
  • 对于 Producer,虽然它本身不需要维护状态,但把它放在一个 Actor 里,我们可以方便地异步调用 produce 方法,向任何 Kafka Topic 发送消息,而无需为每个 Topic 创建一个独立的 Producer 实例,简化了管理。

这是一个简单的 Kafka Producer Actor 的实现。

@ray.remote
class KafkaProducer:def __init__(self, server: str = 'localhost:9092'):from confluent_kafka import Producerconf = {'bootstrap.servers': server}self.producer = Producer(**conf)def produce(self, data: dict, key: str = None, topic: str = 'test'):def delivery_callback(err, msg):if err:print(f'Message failed delivery: {err}')else:print(f'Message delivered to topic {msg.topic()} partition 'f'{msg.partition()} offset {msg.offset()}')binary_key = Noneif key is not None:binary_key = key.encode('UTF8')self.producer.produce(topic=topic, value=json.dumps(data).encode('UTF8'),key=binary_key, callback=delivery_callback)self.producer.poll(0)def destroy(self):self.producer.flush(30)

它使用了 confluent_kafka 库,这是 Python 中常用的 Kafka 客户端。

  • 在 init 方法里,我们根据 broker 地址初始化一个 Kafka Producer 对象。produce 方法就是我们用来发送消息的接口,它接收数据、可选的 key 和 topic 名称。内部,它会调用 Kafka Producer 的 produce 方法,这里我们用了 json.dumps 把 Python 字典序列化成 JSON 字符串,再 encode 成字节。
  • delivery_callback 是一个回调函数,用来处理消息发送成功或失败的情况。
  • destroy 方法在 Actor 销毁前调用,确保所有待发送的消息都被 flush 出去。

这是 Kafka Consumer Actor 的实现。

@ray.remote
class KafkaConsumer:def __init__(self, callback, group: str = 'ray', server: str = 'localhost:9092',topic: str = 'test', restart: str = 'latest'):from confluent_kafka import Consumerfrom uuid import uuid4# Configurationconsumer_conf = {'bootstrap.servers': server,   # bootstrap server'group.id': group,                      # group ID'session.timeout.ms': 6000,            # session tmout'auto.offset.reset': restart}          # restart# Create Consumer instanceself.consumer = Consumer(consumer_conf)self.topic = topicself.callback = callbackself.id = str(uuid4())def start(self):self.run = Truedef print_assignment(consumer, partitions):print(f'Consumer: {self.id}')print(f'Assignment: {partitions}')# Subscribe to topicsself.consumer.subscribe([self.topic], on_assign = print_assignment)while self.run:msg = self.consumer.poll(timeout=1.0)if msg is None:continueif msg.error():print(f'Consumer error: {msg.error()}')continueelse:# Proper messageself.callback(self.id, msg)def stop(self):self.run = Falsedef destroy(self):self.consumer.close()

同样使用了 confluent_kafka 库。

  • init 方法里,除了 broker 地址,还需要配置 group.id、session.timeout.ms、auto.offset.reset 等参数。group.id 是 Consumer Group 的标识,auto.offset.reset 决定了消费者启动时没有 Offset 或者 Offset 不存在时的行为,比如 latest 表示从最新的消息开始读。

  • start 方法启动了一个无限循环,使用 consumer.poll 拉取消息。如果收到消息,就调用传入的 callback 函数进行处理。

  • stop 方法通过设置 run 为 False 来停止循环。

  • destroy 方法则关闭 Kafka Consumer 连接。

测试函数

def print_message(consumer_id: str, msg):print(f"Consumer {consumer_id} new message: topic={msg.topic()}  "f"partition= {msg.partition()}  offset={msg.offset()} "f"key={msg.key().decode('UTF8')}")print(json.loads(msg.value().decode('UTF8')))# Start Ray
ray.init()# Start consumers and producers
n_ = 5     # Number of consumers
consumers = [KafkaConsumer.remote(print_message) for _ in range(n_consumers)]
producer = KafkaProducer.remote()
refs = [c.start.remote() for c in consumers]# publish messages
user_name = 'john'
user_favorite_color = 'blue'try:while True:user = {'name': user_name,'favorite_color': user_favorite_color,'favorite_number': randint(0, 1000)}producer.produce.remote(user, str(randint(0, 100)))sleep(1)# end gracefully
except KeyboardInterrupt:for c in consumers:c.stop.remote()
finally:for c in consumers:c.destroy.remote()producer.destroy.remote()ray.kill(producer)

额外的阅读材料

  • https://www.anyscale.com/blog/serverless-kafka-stream-processing-with-ray

http://www.hkcw.cn/article/RTgtYuHCrq.shtml

相关文章

小米展台发放儿童节福利矿泉水 车展限定好礼相送

2025粤港澳大湾区车展将于5月31日至6月8日在深圳国际会展中心(宝安)举行。展会期间,小米汽车将带来“豪华高性能SUV”YU7的首次亮相,同时展示SU7和SU7 Ultra车型。这些车型将在6号馆06展位展出。观众可以在展台现场体验小米的“人车家全生态”,包括自由体验车辆选配及米家…

端午蕴藏的家国情怀何以愈发鲜活 穿越千年的精神纽带

龙舟竞渡,岁至端午。门前艾蒲青翠,天淡纸鸢漫舞。五月的熏风挟裹着两千年的沉吟,将汨罗江畔的悲怆与荆楚大地的刚烈,酿成中华民族血脉里最醇厚的家国情怀。端午节这个古老节日,在艾香与糯米的缠绵中,跳动着炽热的赤子之心。战国时期的楚国诗人屈原以“长太息以掩涕兮,哀…

科普博主谈安徽山东等地疑现火流星 实为卫星残骸再入大气层

5月30日20:45左右,安徽和山东的许多网友目睹了一道“火光”划过天空。与常见的火流星不同,这道“火光”的移动速度较慢。从一些拍摄较为清晰的视频中可以看到,在这道“火光”旁边还有一道较小的“火光”。烟台莱州的杨先生记录下了这一景象。“尾巴那里碎出来一些小的,十多…

Ubuntu系统入门指南:从操作系统基础到虚拟机安装

知不足而奋进 望远山而前行 目录 前言: ubuntu系统简介 操作系统 1. 操作系统的定义 2.操作系统作用 3. 常见的操作系统 linux内核和发行版本 linux内核 发行版本 vmware安装 1. 准备文件 2. 安装 导入ubuntu虚拟机 1. 准备文件 2. 导入 结尾&#…

增强LangChain交互体验:消息历史(记忆)功能详解

背景 在构建聊天机器人时,将对话状态传入和传出链至关重要。 LangGraph 实现了内置的持久层,允许链状态自动持久化在内存或外部后端(如 SQLite、Postgres 或 Redis)中。在本文我们将演示如何通过将任意 LangChain runnables 包装在最小的 LangGraph 应用程序中来添加持久性…

win10电脑时间同步失败的解决方法

win10电脑时间同步失败 问题如下: 解决方法如下: 搜索里搜索:控制面板,然后选择时钟和区域 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/d734b28553514f6699d559d4218e5e99.png 此处输入:ntp.aliyun.com 然后时间就同步完成了~ 可以…

FastAPI docs接口文档的认证

API接口文档中实现OAuth2认证,OAuth不是一个API或者服务,而是一个认证授权(Authorization)的开放标准。 自定义OAuth2PasswordBearer 在fastapi中,官方提供了OAuth2PasswordBearer依赖。可以在接口文档中出现认证的图标。 接口文档中出现了…

历年复旦大学计算机保研上机真题

2025复旦大学计算机保研上机真题 2024复旦大学计算机保研上机真题 2023复旦大学计算机保研上机真题 在线测评链接:https://pgcode.cn/problem?classification1 最大公共子串 题目描述 输入 3 个子串,输出这 3 个子串的最大公共子串。 输入格式 输入…

田径亚锦赛第四日中国队收获3金2银 多项目刷新个人最佳

第26届亚洲田径锦标赛在韩国龟尾市进入第四个比赛日,共产生了7枚金牌。中国队在这天的比赛中收获了3金2银。男子跳远比赛中,中国队选手舒衡最后一跳跳出8.22米,刷新个人最好成绩,夺得金牌。中国台北队选手林昱堂和日本队选手山浦溪斗分别获得银牌和铜牌。另一位中国队选手张…

白宫办公厅主任手机被入侵 黑客冒充引发调查

继美国政府高官使用非机密聊天软件讨论高度敏感的军事计划,并误将记者拉进群聊的丑闻后,白宫再度被曝存在信息安全隐患。美国总统特朗普最密切的顾问之一、白宫办公厅主任苏茜怀尔斯的私人手机遭黑客入侵,并以她的名义,在最近几周给政府官员、国会议员、大型企业高管和其他…

老人锯断20年香樟树枝干:拿我怎样 私自修剪引发争议

最近,家住浦东新区上南山水苑一期的业主王先生反映,小区内一群平均年龄七十多岁的老人以提高小区绿化环境为由,私自圈占公共绿化变为私人花园已有2年。期间物业居委多次劝阻,但老人们不仅不听劝,花还越种越多。5月27日上午,其中一位老人甚至用自己网购的斧头和电锯砍伐了…

深蓝汽车再回应车机广告争议 承诺不再推送

近日,有车主在微博上投诉深蓝汽车在未提前告知的情况下,在车机系统中强制投放了开机广告,时长约三秒。网友认为这种行为侵犯了他们对车辆的所有权,还可能影响驾驶安全。根据车主提供的照片显示,车机屏幕上出现了“感恩回馈,面向首任车主发放专属购车券”的宣传语。一些车…

Linux进程

进程概念 获取进程pid方式(概念) 1.获取所有的pid信息 部分获取ps ajx | head -1 && ps axj | grep 文件名 ps ajx | head -1 && ps axj | grep 文件名 | grep -v grep可以过滤掉grep中携带进程部分信息的内容 因为我没有运行test.o所…

【通关文件操作(下)】--文件的顺序读写(续),sprintf和sscanf函数,文件的随机读写,文件缓冲区,更新文件

目录 四.文件的顺序读写(续) 4.8--fwrite函数 4.9--fread函数 五.sprintf函数和sscanf函数 5.1--函数对比 5.2--sprintf函数 5.3--sscanf函数 六.文件的随机读写 6.1--fseek函数 6.2--ftell函数 6.3--rewind函数 七.文件缓冲区 7.1--fflush函数 八.更新文件 &…

铁路行业数字化应用建设方案

数字化转型面临的挑战 铁路行业正处于数字化转型的关键时期,铁路行业应用场景复杂,数据量巨大,传统信息化建设模式难以满足日益增长的业务需求。铁路企业亟需引入敏捷高效的数字化工具,加速推进业务创新,实现提质增效…

冬夏女王喜欢蒯铎 复杂情仇揭秘

电视剧《藏海传》中,藏海成功利用天象算计死褚怀明。事实上,褚怀明可以说是自己作死。藏海差点被褚怀明死前反击弄死,是庄芦隐冲入火场救了他。这一段剧情令人匪夷所思。事件之后,庄芦隐在藏海的劝说下,准备让庄之行参军,幻想庄家一文一武,将来可以互相扶持,重振侯府荣…

余承东称尊界订单70%百万顶配 上市1小时大定破千台

鸿蒙智行宣布尊界 S800 上市仅1小时,大定订单就突破了1000台。华为常务董事、终端BG董事长余承东透露,其中70%的订单选择了百万顶配版本。鸿蒙智行参与了2025粤港澳大湾区车展,展示了全系八大车型。展会期间,余承东与江淮汽车董事长项兴初及主持人汪涵进行了交流。此前一天…

媒体:建百万豪华公厕是政绩观扭曲 民生工程变形象工程

吉林通榆县内部装修豪华的公共厕所,每个厕所面积在110至140平方米之间,配有中央空调和排风系统、烘手器、净化开水机、自动喷香机、洗手液等多种洗漱物品,并安装了WiFi和音箱等设备。这些配置看起来更像是星级酒店的卫生间,但它们却出现在一个刚脱贫不久的县城里。日前,吉…

不演戏就“消失”的辛柏青 低调哀悼爱妻

这段时间,全网都在为朱媛媛的离世感到惋惜。她是一位德艺双馨的好人,在物欲横流的社会中显得尤为珍贵。姚晨曾评价她:“这一生虽未尽兴,却也不枉此行。”朱媛媛去世后,网友们更加心疼她的丈夫辛柏青。这对影坛伉俪在大学时一见钟情,用一袋洗衣粉定情,尽管经历了许多波折…

曾舜晞演吴老狗超级加辈 挑战与突破并存

《老九门》曾风靡一时,成为茶余饭后的热门话题。张大佛爷的戎装和二月红的痴情让无数观众津津乐道。如今,《九门》作为续篇或前传再次回归,陈伟霆再次出演佛爷,让不少老粉丝感到安心。然而,也有人担心时光流逝,陈伟霆能否再现当年的风采。毕竟,从三十而立到近不惑之年,…