Kafka、Flink、LangChain 和 OpenAI 结合使用的 GenAI 示例展示

发表时间: 2024-05-20 17:44

探索在云中使用 Apache Kafka 和 Flink 以及 Python、LangChain 和 OpenAI LLM API 进行开源数据流的架构和演示。

生成式人工智能 (GenAI) 可实现跨行业的自动化和创新。这篇博文探讨了 Python 的架构和演示,LangChain 与 OpenAI LLM、用于事件流和数据集成的 Apache Kafka 以及用于流处理的 Apache Flink 相结合。该用例展示了数据流和GenAI如何帮助关联来自Salesforce CRM的数据,在Google和LinkedIn等公共数据集中搜索潜在客户信息,并为销售代表推荐破冰对话。

生成式人工智能的出现

生成式人工智能 (GenAI) 是指一类人工智能 (AI) 系统和模型,用于生成新内容,通常为图像、文本、音频或其他类型的数据。这些模型可以理解和学习训练数据中存在的基本模式、样式和结构,然后自行生成新的、类似的内容。

生成式人工智能在各个领域都有应用,包括:

  • 图像生成:生成逼真的图像、艺术或图形
  • 文本生成:创建类似人类的文本,包括自然语言生成
  • 作曲:生成新的音乐作品或风格
  • 视频合成:创建逼真的视频内容
  • 数据增强:为机器学习模型生成额外的训练数据
  • 药物发现:为新药生成分子结构

生成式 AI 面临的一个关键挑战是在生产基础设施中部署时要考虑到上下文、可扩展性和数据隐私。让我们来探讨一个使用 CRM 和客户数据将 GenAI 集成到企业架构中以支持销售和营销的示例。

演示:LangChain + Kafka + Flink = 使用 Salesforce CRM 和 LinkedIn 数据自动拨打销售线索的电话

本文展示了一个演示,该演示将 Apache Kafka 和 Flink 提供支持的实时数据流与 LangChain 中 OpenAI 的大型语言模型相结合。如果您想了解更多关于 Kafka 和 Flink 以及生成式 AI 的数据流,请查看以下两篇文章:

  • Apache Kafka 作为 GenAI 任务关键型 Data Fabric 的用例
  • Apache Kafka + 带有向量数据库的 RAG + LLM = 实时 GenAI

以下演示是关于使用生成式 AI 支持销售代表或自动化工具:

  • Salesforce CRM通过其他界面或人工手动创建新的潜在客户。
  • 销售代表/SDR 实时接收潜在客户信息以致电潜在客户。
  • 一项特殊的GenAI服务利用潜在客户信息(姓名和公司)搜索网络(主要是LinkedIn),为潜在客户的冷门电话生成有用的内容,包括摘要,两个有趣的事实,一个感兴趣的话题,以及两个用于发起对话的创意破冰船。

感谢我的同事 Carsten Muetzlitz,他构建了演示。该代码可在 GitHub 上找到。下面是演示的架构:

演示中的技术和基础设施

以下技术和基础设施用于实现和部署 GenAI 演示。

  • Python:几乎每个数据工程师和数据科学家都使用的编程语言
  • LangChain:Python 框架实现应用程序以支持销售对话
  • OpenAI:语言模型和 API 有助于构建简单但功能强大的 GenAI 应用程序
  • Salesforce:云CRM工具存储潜在客户信息以及其他销售和营销数据
  • Apache Kafka:可扩展的实时数据中心,将数据源 (CRM) 和数据接收器(GenAI 应用程序和其他服务)解耦
  • Kafka Connect:通过 Salesforce CRM 的变更数据捕获 (CDC) 进行数据集成
  • Apache Flink:用于丰富CRM数据和数据质量的流处理
  • Confluent Cloud:完全托管的 Kafka(流和存储)、Flink(流程)和 Salesforce 连接器(集成)
  • SerpAPI:用潜在客户信息抓取 Google 和其他搜索引擎
  • proxyCurl:从 LinkedIn 中提取有关潜在客户的丰富数据,而无需担心扩展网络抓取和数据科学团队

缺少:LangChain演示中没有带有模型嵌入的向量数据库和RAG(带有模型嵌入)

此演示不使用先进的 GenAI 技术进行 RAG(检索增强生成)、模型嵌入或通过 Pinecone、Weaviate、MongoDB 或 Oracle 等矢量数据库 (Vector DB) 进行矢量搜索。

演示的原则是 KISS(“尽可能保持简单”)。这些技术可以而且将会集成到许多现实世界的架构中。

该演示在延迟和规模方面存在限制。Kafka 和 Flink 作为完全托管和弹性的 SaaS 运行。然而,LangChain周围的AI/ML部分可以通过使用SaaS进行托管以及与其他专用AI平台的集成来改善延迟。特别是数据密集型应用程序将需要向量数据库以及高级检索和语义搜索技术,如 RAG。

有趣的事实:当我搜索我的名字而不是 Carsten 的名字时,演示中断了。因为网络爬虫在网络上发现了太多关于我的内容,因此,LangChain应用程序崩溃了。对于像 Pinecone 或 MongoDB 这样的互补技术来说,这是一个引人注目的事件,这些技术可以大规模地进行索引、RAG 和语义搜索。这些技术提供了与 Confluent Cloud 的完全托管集成,因此可以轻松扩展演示。

LangChain在GenAI中的作用

LangChain是一个开源框架,用于开发由语言模型驱动的应用程序。LangChain也是该框架背后的商业供应商的名称。该工具为数据工程师提供了所需的“胶水代码”,以构建具有直观 API 的 GenAI 应用程序,用于将大型语言模型 (LLM)、带有上下文的提示、通过有状态对话推动决策的代理以及与外部接口集成的工具。

LangChain支持:

  • 上下文感知:将语言模型与上下文源(提示说明、少量示例、响应内容等)联系起来。
  • 原因:依靠语言模型进行推理(关于如何根据提供的上下文回答、采取什么行动等)

LangChain软件包的主要价值支柱是:

  1. 组件:用于处理语言模型的可组合工具和集成;组件是模块化的,易于使用,无论您是否使用LangChain框架的其余部分。
  2. 现成的链条:用于完成更高级别任务的内置组件组合

这些产品共同简化了整个应用程序生命周期:

  • 开发:用 LangChain/LangChain.js 编写应用程序。使用模板作为参考。
  • 生产化:使用 LangSmith 检查、测试和监控您的链条,以便您可以自信地不断改进和部署。
  • 部署:使用 LangServe 将任何链变成 API。

LangChain在演示中

该演示使用了多个LangChain概念,例如提示、聊天模型、使用LangChain表达式语言(LCEL)的链,以及使用语言模型选择要执行的一系列操作的代理。

以下是LangChain业务流程的逻辑流程:

  1. 获取新的潜在客户:从 Kafka 主题中实时收集 Salesforce CRM 潜在客户的全名和公司。
  2. 查找 LinkedIn 个人资料:使用 Google 搜索 API“SerpAPI”搜索潜在客户的 LinkedIn 个人资料的 URL。
  3. 收集有关潜在客户的信息:使用 Proxycurl 从 LinkedIn 收集有关潜在客户的所需信息。
  4. 为销售代表或自动脚本创建冷呼叫建议:通过 OpenAI API 将所有信息摄取到 ChatGPT LLM 中,并将生成的文本发送到 Kafka 主题。

以下屏幕截图显示了生成的内容的代码片段。它包括基于 LinkedIn 个人资料的特定上下文的破冰对话。在加入 Confluent 之前,Carsten 在 Oracle 工作了 24 年。LLM 使用 LangChain 提示符的以下上下文来生成相关内容:

Apache Kafka 在 GenAI 中的作用

Apache Kafka 是一个分布式流式处理平台,用于构建实时数据管道和流式处理应用程序。它在高效可靠地处理和管理大量数据流方面发挥着至关重要的作用。

生成式 AI 通常涉及用于创建新数据(例如图像、文本或其他类型的内容)的模型和算法。Apache Kafka 通过提供可扩展且有弹性的基础设施来管理数据流,从而支持生成式 AI。在生成式 AI 上下文中,Kafka 可用于:

  • 数据引入:Kafka 可以处理大型数据集的摄取,包括训练生成式 AI 模型所需的多样化和潜在的大量数据。
  • 实时数据处理:Kafka 的实时数据处理功能有助于在数据不断变化的场景中,允许生成式 AI 模型的快速更新和训练。
  • 活动来源:使用 Kafka 进行事件溯源可捕获并存储随时间推移发生的事件,从而提供数据更改的历史记录。这些历史数据对于训练和改进生成式 AI 模型很有价值。
  • 与其他工具集成:Kafka 可以集成到更大的数据处理和机器学习管道中,从而促进生成式 AI 工作流中涉及的不同组件和工具之间的数据流动。

虽然 Apache Kafka 本身是专为生成式 AI 设计的工具,但其特性和功能有助于提高数据基础设施的整体效率和可扩展性。Kafka 的功能在处理大型数据集和复杂的机器学习模型(包括生成式 AI 应用程序中使用的模型)时至关重要。

演示中的 Apache Kafka

Kafka 是连接所有不同应用程序的数据结构。确保数据一致性是 Kafka 的一个最佳点,无论数据源或接收器是实时的、批处理的还是请求-响应的 API。

在此演示中,Kafka 将 Salesforce CRM 中的事件用作客户数据的主要数据源。不同的应用程序(Flink、LangChain、Salesforce)在业务流程的不同步骤中使用数据。Kafka Connect 提供数据集成功能,无需其他 ETL、ESB 或 iPaaS 工具。此演示使用 Confluent 的更改数据捕获 (CDC) 连接器实时使用 Salesforce 数据库中的更改以进行进一步处理。

在本演示中,完全托管的 Confluent Cloud 是整个 Kafka 和 Flink 生态系统的基础架构。开发人员的重点应该始终放在构建业务逻辑上,而不是担心操作基础设施。

虽然 Kafka 的核心是基于事件的、实时的和可扩展的,但它也支持开箱即用的领域驱动设计和数据网格企业架构。

Apache Flink 在 GenAI 中的作用

Apache Flink 是一个开源的分布式流处理框架,用于实时分析和事件驱动的应用程序。它的主要重点是高效、大规模地处理连续的数据流。虽然 Apache Flink 本身不是生成式 AI 的特定工具,但它在支持生成式 AI 工作流的某些方面发挥着作用。以下是 Apache Flink 相关的几种方式:

  1. 实时数据处理:Apache Flink 可以实时处理和分析数据,这对于生成式 AI 模型需要对流数据进行操作、适应变化并实时生成响应的场景非常有用。
  2. 事件时间处理:Flink 内置了对事件时间处理的支持,允许按照事件发生的顺序处理事件,即使它们到达的顺序是无序的。这在时间顺序至关重要的场景中可能很有用,例如在用于训练或应用生成式 AI 模型的数据序列中。
  3. 有状态处理:Flink 支持有状态处理,支持跨事件维护状态。这在生成式 AI 业务流程需要维护过去事件的上下文或记忆以生成连贯和上下文感知输出的情况下非常有用。
  4. 与机器学习库集成:虽然 Flink 本身不是一个机器学习框架,但它可以与机器学习中使用的其他工具和库集成,包括与生成式 AI 相关的工具和库。这种集成可以促进机器学习模型在基于 Flink 的流式处理应用程序中的部署和执行。

Apache Flink 在生成式 AI 中的具体角色取决于特定的用例和整个系统的架构。

Apache Flink 在 Demo 中

此演示利用 Apache Flink 对传入的 Salesforce CRM 事件进行流式处理 ETL(扩充、数据质量改进)。

FlinkSQL 提供了一种简单直观的方法,可以使用任何 Java 或 Python 代码实现 ETL。完全托管的 Confluent Cloud 是本演示中 Kafka 和 Flink 的基础架构。无服务器 FlinkSQL 允许您根据需要进行扩展,但如果没有使用和处理任何事件,也可以缩减到零。

演示只是一个起点。许多强大的应用程序都可以用 Apache Flink 构建。这包括流媒体 ETL,但也包括您在 Netflix、Uber 和许多其他科技巨头上找到的商业应用程序。

LangChain + 完全托管的 Kafka 和 Flink = 简单、强大的实时 GenAI

LangChain是一个易于使用的AI/ML框架,用于将大型语言模型连接到其他数据源并创建有价值的应用程序。灵活性和开放性使开发人员和数据工程师能够构建各种应用程序,从聊天机器人到回答您问题的智能系统。

使用 Apache Kafka 和 Flink 进行数据流处理,为数据管道和流处理提供了可靠且可扩展的数据结构。Kafka 的事件存储确保了实时、批处理和请求-响应 API 之间的数据一致性。由于这些原因,领域驱动的设计、微服务架构和数据产品在数据网格中构建,越来越多地利用 Kafka。

LangChain、OpenAI 等 GenAI 技术以及与 Kafka 和 Flink 的数据流相结合,为 AI 提供支持的实时特定情境决策提供了强大的组合。

大多数企业对 AI 用例都有云优先策略。数据流基础设施在 Confluent Cloud 等 SaaS 中可用,因此开发人员可以专注于业务逻辑,并加快上市时间。使用 Python(AI 的事实标准)构建 AI 应用程序有很多替代方案。例如,您可以在 FlinkSQL 应用程序中构建一个用户定义函数 (UDF),执行 Python 代码并从 Kafka 使用它。或者使用单独的应用程序开发框架和云平台,如 Quix Streams 或 Bytewax for Python 应用程序,而不是像 LangChain 这样的框架。

如何将 Python、LangChain 和 LLM 与 Kafka 和 Flink 等数据流技术相结合?让我们在LinkedIn上联系并讨论它!


原文标题:GenAI Demo With Kafka, Flink, LangChain and OpenAI

原文链接:https://dzone.com/articles/genai-demo-with-kafka-flink-langchain-and-openai

作者:Kai Wähner

编译:LCR