生成式人工智能 (GenAI) 可实现跨行业的自动化和创新。这篇博文探讨了 Python 的架构和演示,LangChain 与 OpenAI LLM、用于事件流和数据集成的 Apache Kafka 以及用于流处理的 Apache Flink 相结合。该用例展示了数据流和GenAI如何帮助关联来自Salesforce CRM的数据,在Google和LinkedIn等公共数据集中搜索潜在客户信息,并为销售代表推荐破冰对话。
生成式人工智能 (GenAI) 是指一类人工智能 (AI) 系统和模型,用于生成新内容,通常为图像、文本、音频或其他类型的数据。这些模型可以理解和学习训练数据中存在的基本模式、样式和结构,然后自行生成新的、类似的内容。
生成式人工智能在各个领域都有应用,包括:
生成式 AI 面临的一个关键挑战是在生产基础设施中部署时要考虑到上下文、可扩展性和数据隐私。让我们来探讨一个使用 CRM 和客户数据将 GenAI 集成到企业架构中以支持销售和营销的示例。
本文展示了一个演示,该演示将 Apache Kafka 和 Flink 提供支持的实时数据流与 LangChain 中 OpenAI 的大型语言模型相结合。如果您想了解更多关于 Kafka 和 Flink 以及生成式 AI 的数据流,请查看以下两篇文章:
以下演示是关于使用生成式 AI 支持销售代表或自动化工具:
感谢我的同事 Carsten Muetzlitz,他构建了演示。该代码可在 GitHub 上找到。下面是演示的架构:
以下技术和基础设施用于实现和部署 GenAI 演示。
此演示不使用先进的 GenAI 技术进行 RAG(检索增强生成)、模型嵌入或通过 Pinecone、Weaviate、MongoDB 或 Oracle 等矢量数据库 (Vector DB) 进行矢量搜索。
演示的原则是 KISS(“尽可能保持简单”)。这些技术可以而且将会集成到许多现实世界的架构中。
该演示在延迟和规模方面存在限制。Kafka 和 Flink 作为完全托管和弹性的 SaaS 运行。然而,LangChain周围的AI/ML部分可以通过使用SaaS进行托管以及与其他专用AI平台的集成来改善延迟。特别是数据密集型应用程序将需要向量数据库以及高级检索和语义搜索技术,如 RAG。
有趣的事实:当我搜索我的名字而不是 Carsten 的名字时,演示中断了。因为网络爬虫在网络上发现了太多关于我的内容,因此,LangChain应用程序崩溃了。对于像 Pinecone 或 MongoDB 这样的互补技术来说,这是一个引人注目的事件,这些技术可以大规模地进行索引、RAG 和语义搜索。这些技术提供了与 Confluent Cloud 的完全托管集成,因此可以轻松扩展演示。
LangChain是一个开源框架,用于开发由语言模型驱动的应用程序。LangChain也是该框架背后的商业供应商的名称。该工具为数据工程师提供了所需的“胶水代码”,以构建具有直观 API 的 GenAI 应用程序,用于将大型语言模型 (LLM)、带有上下文的提示、通过有状态对话推动决策的代理以及与外部接口集成的工具。
LangChain支持:
LangChain软件包的主要价值支柱是:
这些产品共同简化了整个应用程序生命周期:
该演示使用了多个LangChain概念,例如提示、聊天模型、使用LangChain表达式语言(LCEL)的链,以及使用语言模型选择要执行的一系列操作的代理。
以下是LangChain业务流程的逻辑流程:
以下屏幕截图显示了生成的内容的代码片段。它包括基于 LinkedIn 个人资料的特定上下文的破冰对话。在加入 Confluent 之前,Carsten 在 Oracle 工作了 24 年。LLM 使用 LangChain 提示符的以下上下文来生成相关内容:
Apache Kafka 是一个分布式流式处理平台,用于构建实时数据管道和流式处理应用程序。它在高效可靠地处理和管理大量数据流方面发挥着至关重要的作用。
生成式 AI 通常涉及用于创建新数据(例如图像、文本或其他类型的内容)的模型和算法。Apache Kafka 通过提供可扩展且有弹性的基础设施来管理数据流,从而支持生成式 AI。在生成式 AI 上下文中,Kafka 可用于:
虽然 Apache Kafka 本身是专为生成式 AI 设计的工具,但其特性和功能有助于提高数据基础设施的整体效率和可扩展性。Kafka 的功能在处理大型数据集和复杂的机器学习模型(包括生成式 AI 应用程序中使用的模型)时至关重要。
Kafka 是连接所有不同应用程序的数据结构。确保数据一致性是 Kafka 的一个最佳点,无论数据源或接收器是实时的、批处理的还是请求-响应的 API。
在此演示中,Kafka 将 Salesforce CRM 中的事件用作客户数据的主要数据源。不同的应用程序(Flink、LangChain、Salesforce)在业务流程的不同步骤中使用数据。Kafka Connect 提供数据集成功能,无需其他 ETL、ESB 或 iPaaS 工具。此演示使用 Confluent 的更改数据捕获 (CDC) 连接器实时使用 Salesforce 数据库中的更改以进行进一步处理。
在本演示中,完全托管的 Confluent Cloud 是整个 Kafka 和 Flink 生态系统的基础架构。开发人员的重点应该始终放在构建业务逻辑上,而不是担心操作基础设施。
虽然 Kafka 的核心是基于事件的、实时的和可扩展的,但它也支持开箱即用的领域驱动设计和数据网格企业架构。
Apache Flink 是一个开源的分布式流处理框架,用于实时分析和事件驱动的应用程序。它的主要重点是高效、大规模地处理连续的数据流。虽然 Apache Flink 本身不是生成式 AI 的特定工具,但它在支持生成式 AI 工作流的某些方面发挥着作用。以下是 Apache Flink 相关的几种方式:
Apache Flink 在生成式 AI 中的具体角色取决于特定的用例和整个系统的架构。
此演示利用 Apache Flink 对传入的 Salesforce CRM 事件进行流式处理 ETL(扩充、数据质量改进)。
FlinkSQL 提供了一种简单直观的方法,可以使用任何 Java 或 Python 代码实现 ETL。完全托管的 Confluent Cloud 是本演示中 Kafka 和 Flink 的基础架构。无服务器 FlinkSQL 允许您根据需要进行扩展,但如果没有使用和处理任何事件,也可以缩减到零。
演示只是一个起点。许多强大的应用程序都可以用 Apache Flink 构建。这包括流媒体 ETL,但也包括您在 Netflix、Uber 和许多其他科技巨头上找到的商业应用程序。
LangChain是一个易于使用的AI/ML框架,用于将大型语言模型连接到其他数据源并创建有价值的应用程序。灵活性和开放性使开发人员和数据工程师能够构建各种应用程序,从聊天机器人到回答您问题的智能系统。
使用 Apache Kafka 和 Flink 进行数据流处理,为数据管道和流处理提供了可靠且可扩展的数据结构。Kafka 的事件存储确保了实时、批处理和请求-响应 API 之间的数据一致性。由于这些原因,领域驱动的设计、微服务架构和数据产品在数据网格中构建,越来越多地利用 Kafka。
LangChain、OpenAI 等 GenAI 技术以及与 Kafka 和 Flink 的数据流相结合,为 AI 提供支持的实时特定情境决策提供了强大的组合。
大多数企业对 AI 用例都有云优先策略。数据流基础设施在 Confluent Cloud 等 SaaS 中可用,因此开发人员可以专注于业务逻辑,并加快上市时间。使用 Python(AI 的事实标准)构建 AI 应用程序有很多替代方案。例如,您可以在 FlinkSQL 应用程序中构建一个用户定义函数 (UDF),执行 Python 代码并从 Kafka 使用它。或者使用单独的应用程序开发框架和云平台,如 Quix Streams 或 Bytewax for Python 应用程序,而不是像 LangChain 这样的框架。
如何将 Python、LangChain 和 LLM 与 Kafka 和 Flink 等数据流技术相结合?让我们在LinkedIn上联系并讨论它!
原文标题:GenAI Demo With Kafka, Flink, LangChain and OpenAI
原文链接:https://dzone.com/articles/genai-demo-with-kafka-flink-langchain-and-openai
作者:Kai Wähner
编译:LCR