宣布 LlamaCloud 全面上线(以及我们的 1900 万美元 A 轮融资)!
LlamaIndex

Jerry Liu 2023-08-29

在 LlamaIndex 中引入 Airbyte 数据源

作者:Joe Reuter,Airbyte 软件工程师

(转载自 Airbyte 博客;请此处查看!)

内容

现在可以直接在您的 LlamaIndex 应用程序中利用 Airbyte 的 GongHubspotSalesforceShopifyStripeTypeformZendesk Support 数据源,这些数据源已实现为数据加载器

例如,要加载用户的 Stripe 发票,您可以使用 AirbyteStripeLoader。安装它非常简单,当您在本地安装了 LlamaIndex 后,只需安装您感兴趣的数据源,即可开始使用。

pip install airbyte-source-stripe
pip install llama-hub

之后,只需下载加载器并传入配置以及您想要加载的数据流即可。

from llama_hub.airbyte_stripe.base import AirbyteStripeReader

config = {
  "client_secret": "<secret key>",
  "account_id": "<account id>",
  "start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020–10–20T00:00:00Z>"
}
reader = AirbyteStripeReader(config=config)
documents = reader.load_data(stream_name="invoices")

这有什么重要性?

这只是 Airbyte 的 300 多个数据源在 LlamaHub 中作为数据加载器可用的开始。

Airbyte 几乎可以将任何数据源的数据迁移到您的数据仓库或向量数据库,以支持您的 LLM 用例(请查看此教程,了解如何设置此类数据管道!)。这通常是通过使用 Airbyte Cloud 或本地 Airbyte 实例、设置连接并按计划(或通过 API 触发器)运行来完成的,以确保您的数据保持最新。

但如果您刚开始使用,并且所有内容都在本地运行,那么使用一个完整的 Airbyte 实例(包括 UI、调度服务、扩展能力等)可能就显得大材小用了。

通过本次发布,在 LlamaIndex 中直接在您的 Python 运行时内运行任何基于 Python 的数据源变得前所未有的简单——无需启动 Airbyte 实例或调用 Airbyte Cloud API。

在托管式和嵌入式 Airbyte 之间迁移

由于底层运行的是相同的代码,每个 Airbyte 构建的加载器都与 Airbyte 服务中的对应数据源兼容。这意味着将您的嵌入式加载管道轻松迁移到自托管的 Airbyte 安装或 Airbyte Cloud 实例中是轻而易举的事情。加载器配置对象的模式与输出记录的模式是 100% 兼容的。

在托管式 Airbyte 上运行同步意味着

  • 用户界面,用于跟踪正在运行的管道
  • 事件通知,包括对失败同步的警报或运行同步后操作
  • 轻松按计划运行管道
  • 横向扩展能力
  • 用于驱动程序化用例的 API
  • 开箱即用的连接状态管理
  • 支持
  • 等等

使用 LlamaIndex 加载器运行同步意味着

  • 无需额外运行服务所带来的开销
  • 完全控制时间和管道执行

将 Airbyte 加载器与索引和查询引擎结合使用

由于 Airbyte 加载器的工作方式与普通加载器类似,因此可以轻松地与所有 LlamaIndex 工具结合使用,以构建强大的基于 LLM 的应用程序。

relevant_keys = ["customer_name", "total", "currency"]
reader = AirbyteStripeReader(
    config=strip_config,
    record_handler=lambda record, id: Document(
        doc_id=id,
        text=record.data["description"] or "",
        extra_info={
            key: record.data[key] for key in relevant_keys if key in record.data
        },
    ),
)

index = ListIndex.from_documents(reader.load_data(stream_name="invoices"))
query_engine = index.as_query_engine()
question = input("What do you want to know about your customers?")
print(query_engine.query(question))

增量加载

由于您的 Python 应用程序基本上扮演着 Airbyte 平台的角色,因此您可以完全控制“同步”的执行方式。例如,如果您的数据流支持增量同步,您仍然可以通过访问加载器的“last_state”属性来从中受益。这使您只能加载自上次加载以来发生更改的文档,从而有效地更新现有向量数据库。

import airbyte_cdk.models.airbyte_protocol import AirbyteMessage
with open('stripe_sync_checkpoint.json', 'w') as file:
  file.write(reader.last_state.json())

# later
with open('stripe_sync_checkpoint.json', 'r') as file:
  current_state = AirbyteStateMessage.parse_raw(file.read())
new_docs = reader.load_data(stream_name="invoices", state=current_state)

将 Airbyte 记录映射到 LlamaIndex 文档

默认情况下,每个记录都会作为加载器的一部分被映射到一个 Document,记录中的各种字段都会成为 Document 的 `extra_info` 属性的一部分(`extra_info` 代表每个文档的结构化元数据)。文档的文本部分被设置为记录的 JSON 表示形式。默认情况下,Document 上定义的任何元数据都会在下游模块中与文本连接起来,因此记录中的所有字段都将用于 LlamaIndex 应用程序中的嵌入和合成。您可以传入一个记录处理器来自定义此行为,根据数据构建记录的文本部分。

def handle_record(record, id):
  return Document(doc_id=id, text=record.data["title"], extra_info=record.data)
reader = AirbyteGongReader(config=gong_config, record_handler=handle_record)

自定义数据源

目前,以下 Airbyte 数据源以 pip 包的形式提供(未来将会有更多)

  • Gong pip install airbyte-source-gong
  • Hubspot pip install airbyte-source-hubspot
  • Salesforce pip install airbyte-source-salesforce
  • Shopify pip install airbyte-source-shopify
  • Stripe pip install airbyte-source-stripe
  • Typeform pip install airbyte-source-typeform
  • Zendesk Support pip install airbyte-source-zendesk-support

然而,如果您已经实现了自己的自定义 Airbyte 数据源,也可以通过使用 AirbyteCDKReader 基类来集成它们,该基类与 Airbyte CDK 的 Source 接口协同工作。

from llama_index import download_loader
from my_source.source import MyCustomSource # plug in your own source here
AirbyteCDKReader = download_loader(AirbyteCDKReader)
config = {
  # your custom configuration
}
reader = AirbyteCDKReader(source_class=MyCustomSource, config=config)
documents = reader.load_data(stream_name="my-stream")

您还可以通过 git 直接安装主 Airbyte 仓库中的数据源——例如,要获取 Github 数据源,只需运行

pip install "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"

之后,该数据源就可以被集成到 AirbyteCDKLoader 中使用了。

from source_github.source import SourceGithub
issues_loader = AirbyteCDKReader(source_class=SourceGithub, config=config)
documents = reader.load_data(stream_name="issues")

查看连接器开发文档,了解如何开始编写自己的数据源——它们易于上手,并且可以让您根据需要无缝地从本地嵌入式加载器迁移到使用托管式 Airbyte 实例。

有任何问题吗?我们很乐意听取您的意见

如果您有兴趣利用 Airbyte 将数据传输到您的基于 LLM 的应用程序中,请花点时间填写我们的调查问卷,以便我们确保优先处理最重要的功能。

如果您有任何问题,或者对以这种方式公开为加载器的其他现有数据源感兴趣,请随时通过我们的社区 Slack 频道或 LlamaIndex discord 服务器上的integrations 频道与我们联系。