
LlamaIndex • 2024-08-01
引入工作流测试版:一种使用 LlamaIndex 创建复杂 AI 应用的新方式
我们很高兴推出 LlamaIndex 的全新测试版功能:工作流,这是一种在我们用户构建的日益复杂的 AI 应用中编排操作的机制。
随着大型语言模型(LLMs)的出现,最初只是一个趋势,现在已成为事实标准:AI 应用由不同组件实现的多项任务组成。市场上的开源框架通过为数据加载器、LLMs、向量数据库和重排序器等基础组件提供易于使用的抽象,甚至包括外部服务,来努力让 AI 工程师的生活更轻松。与此同时,所有这些框架也在寻求最佳的抽象方式来编排这些组件,研究对于 AI 开发者而言,如何实现将复合型 AI 系统整合在一起的逻辑是最直观和高效的。
两种潜在的编排模式是链(chains)和管道(pipelines),它们都是相同有向无环图(DAG)抽象的实现。我们在年初发布的 查询管道 中尝试了这一点——它是一个声明式 API,允许您为不同的用例(如问答、结构化提取和代理自动化)在您的数据上编排简单到高级的查询工作流。但是,当我们尝试在其基础上构建并实验性地添加循环以更好地支持更复杂的工作流时,我们注意到了一些问题,这促使我们反思为什么 DAG 可能不适合代理领域,以及我们可以在框架中引入哪些替代方案。
基于图的用户体验的局限性
DAG 的一个基本方面是其名称中的“A”:它们是“无环的”,这意味着没有循环。但在一个越来越以代理为中心的世界中,AI 应用逻辑中无法执行循环是不可接受的。例如,如果某个组件提供了不良结果,AI 开发者应该有办法告诉系统进行自我纠正并重试。
即使不向 DAG 添加循环,查询管道也存在一些明显的问题
- 出错时难以调试
- 它们模糊了组件和模块的执行方式
- 我们的管道编排器变得异常复杂,必须处理大量不同的边缘情况
- 对于复杂的管道来说,它们难以阅读
一旦我们在查询管道中添加循环,这些围绕图的开发者用户体验问题就被放大了。我们在以下方面亲身经历了开发者的痛点:
- 很多核心编排逻辑,如
if-else
语句和while
循环,都被嵌入到图的边中。定义这些边变得繁琐冗长。 - 处理关于可选值和默认值的边缘情况变得困难。作为框架,我们很难确定参数是否会从上游节点传递过来。
- 对于构建代理的开发者来说,定义带有循环的图并不总是感觉自然。代理封装了一个通用的 LLM 驱动实体,可以接收观察结果并生成响应。在这里,图的用户体验强制要求“代理”节点必须明确定义传入边和传出边,迫使用户定义冗长复杂的与其他节点的通信模式。
我们问:图真的是我们可以在复合型 AI 系统中编排组件的唯一抽象方式吗?
从图到 EDA:转向事件驱动
复合型 AI 系统可以使用 LlamaIndex 工作流来实现。工作流通过一系列称为步骤的 Python 函数来回分派事件。每个步骤可以看作是系统的一个组件:一个用于处理查询,一个用于与 LLM 对话,一个用于从向量数据库加载数据等等。每个步骤接收一个或多个事件进行处理,并且可以选择发送回事件,这些事件将根据需要转发给其他组件。
转向事件驱动架构导致设计上的根本性转变。在许多图实现中,图遍历算法负责确定下一个应该运行哪个组件以及应该传递什么数据。在事件驱动架构中,组件订阅特定类型的事件,并最终负责根据接收到的数据决定做什么。
在事件驱动系统中,输入的可选性和默认值等概念在组件级别处理,极大地简化了编排代码。
工作流入门
为了帮助阐明这个想法,让我们看一个例子。一个最小的 LlamaIndex 工作流看起来是这样的
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.llms.openai import OpenAI
class OpenAIGenerator(Workflow):
@step()
async def generate(self, ev: StartEvent) -> StopEvent:
query = ev.get("query")
llm = OpenAI()
response = await llm.acomplete(query)
return StopEvent(result=str(response))
w = OpenAIGenerator(timeout=10, verbose=False)
result = await w.run(query="What's LlamaIndex?")
print(result)
generate
函数使用 @step
装饰器标记为一个工作流步骤,并使用带有适当类型注解的方法签名声明它希望接收哪些事件以及将发送回哪些事件。为了运行一个工作流,我们创建一个 OpenAIGenerator
类的实例,传递一些配置参数(如期望的超时时间),然后调用 run
方法。传递给 run
的任何关键字参数都将被打包到一个特殊类型的 StartEvent
事件中,该事件将被转发给请求它的步骤(在此例中,仅是 generate
步骤)。generate
步骤返回一个特殊类型的 StopEvent
事件,它将向工作流发出信号,优雅地停止执行。StopEvent
携带了我们希望作为工作流结果返回给调用者的任何数据,在此例中是 LLM 的响应。
工作流可以循环
在事件驱动架构中,循环与通信有关,而不是拓扑。任何步骤都可以通过构建和发送适当的事件来决定多次调用另一个步骤。例如,我们来看一个自我纠正的循环(查看 notebook 获取完整代码)
class ExtractionDone(Event):
output: str
passage: str
class ValidationErrorEvent(Event):
error: str
wrong_output: str
passage: str
class ReflectionWorkflow(Workflow):
@step()
async def extract(
self, ev: StartEvent | ValidationErrorEvent
) -> StopEvent | ExtractionDone:
if isinstance(ev, StartEvent):
passage = ev.get("passage")
if not passage:
return StopEvent(result="Please provide some text in input")
reflection_prompt = ""
elif isinstance(ev, ValidationErrorEvent):
passage = ev.passage
reflection_prompt = REFLECTION_PROMPT.format(
wrong_answer=ev.wrong_output, error=ev.error
)
llm = Ollama(model="llama3", request_timeout=30)
prompt = EXTRACTION_PROMPT.format(
passage=passage, schema=CarCollection.schema_json()
)
if reflection_prompt:
prompt += reflection_prompt
output = await llm.acomplete(prompt)
return ExtractionDone(output=str(output), passage=passage)
@step()
async def validate(
self, ev: ExtractionDone
) -> StopEvent | ValidationErrorEvent:
try:
json.loads(ev.output)
except Exception as e:
print("Validation failed, retrying...")
return ValidationErrorEvent(
error=str(e), wrong_output=ev.output, passage=ev.passage
)
return StopEvent(result=ev.output)
w = ReflectionWorkflow(timeout=60, verbose=True)
result = await w.run(
passage="There are two cars available: a Fiat Panda with 45Hp and a Honda Civic with 330Hp."
)
print(result)
在此示例中,validate
步骤接收初步模式提取的结果作为事件,并且可以通过返回一个 ValidationErrorEvent
来决定再次尝试,该事件最终将传递给 extract
步骤,后者将执行另一次尝试。请注意,在此示例中,如果此提取/验证循环长时间提供不良结果,工作流可能会超时;但另一种策略可能是在精确尝试次数后放弃,这只是一个例子。
工作流保持状态
工作流在执行期间维护一个全局状态,该状态可以共享并根据请求传播到其步骤。这个共享状态被实现为一个 Context
对象,步骤可以使用它在迭代之间存储数据,也可以作为不同步骤之间的一种替代通信形式。我们来看一个更复杂的 RAG 示例的摘录,作为一个如何使用全局上下文的例子(查看 notebook 获取完整代码)
class RAGWorkflow(Workflow):
@step(pass_context=True)
async def ingest(self, ctx: Context, ev: StartEvent) -> Optional[StopEvent]:
dataset_name = ev.get("dataset")
_, documents = download_llama_dataset(dsname, "./data")
ctx.data["INDEX"] = VectorStoreIndex.from_documents(documents=documents)
return StopEvent(result=f"Indexed {len(documents)} documents.")
...
在这种情况下,ingest
步骤创建一个索引,并希望将其提供给工作流执行期间可能稍后需要它的任何其他步骤。在 LlamaIndex 工作流中实现这一点的方式是声明该步骤需要一个全局上下文的实例(@step(pass_context=True)
可以做到),并将索引存储在上下文本身中,使用一个预定义的键,其他步骤稍后可以访问它。
工作流可以定制
除了工作流之外,我们还将发布一组预定义的工作流,以便最常见的用例可以用一行代码实现。使用这些预定义的流程,用户可能仍然希望稍微修改一个预定义的工作流来引入一些自定义行为,而无需从头重写整个工作流。假设您想定制一个 RAG 工作流并使用自定义的重排序步骤,您只需继承一个假设的内置 RAGWorkflow
类并像这样覆盖 rerank
步骤即可
class MyWorkflow(RAGWorkflow):
@step(pass_context=True)
def rerank(
self, ctx: Context, ev: Union[RetrieverEvent, StartEvent]
) -> Optional[QueryResult]:
# my custom reranking logic here
w = MyWorkflow(timeout=60, verbose=True)
result = await w.run(query="Who is Paul Graham?")
工作流可以调试
您的工作流的复杂性将随着您的应用逻辑的复杂性而增长,有时仅仅查看 Python 代码很难理解事件在执行期间如何流动。为了便于理解复杂的工作流并支持工作流执行的调试,LlamaIndex 提供了两个函数
draw_all_possible_flows
生成一张图片,显示工作流中的所有步骤以及事件可能如何流动draw_most_recent_execution
生成一张类似的图片,只显示上次工作流执行期间实际发送的事件

除此之外,工作流可以手动执行,通过多次调用 run_step()
直到所有步骤完成。每次调用 run_step
后,可以检查工作流,查看任何中间结果或调试日志。
为什么您应该今天就开始使用工作流
尽管仍处于开发的早期阶段,但 LlamaIndex 工作流相较于查询管道已向前迈进了一步,扩展了其功能并增加了更多灵活性。最重要的是,工作流自带了一系列您通常会期望从更成熟软件中获得的特性
- 完全异步并支持流式处理
- 默认集成埋点,通过支持的集成提供一键可观测性
- 逐步执行,便于调试
- 事件驱动依赖关系的验证和可视化
- 事件实现为 pydantic 模型,便于定制和进一步开发新功能