Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RemoteGraph's .stream() "messages" streamMode is not working #684

Open
Jronk opened this issue Nov 20, 2024 · 1 comment
Open

RemoteGraph's .stream() "messages" streamMode is not working #684

Jronk opened this issue Nov 20, 2024 · 1 comment
Assignees

Comments

@Jronk
Copy link

Jronk commented Nov 20, 2024

For example, following this how-to guide: How to stream LLM tokens from your graph, And using How to interact with the deployment using RemoteGraph, does not work and the messages are not properly streaming.

TL;DR, it appears the bug/issue happens here, where the otherwise valid messages to be streamed are getting short-circuited and the generator yields nothing instead of the messages it should. The values for chunk.event.split(CHECKPOINT_NAMESPACE_SEPARATOR)[0]) look like: messages/partial or messages/complete but since updatedStreamModes looks like ['messages', 'updates'] it will never match...

Screen Shot 2024-11-19 at 12 24 32 PM

For a proof of concept, building on the above two article's examples, here is an example simple graph and tests to show how local works, but remote does not:

import { StateGraph, END } from "@langchain/langgraph";
import { ToolNode } from "@langchain/langgraph/prebuilt";
import { StateAnnotation } from "./simple-messages-state.js";
import { ChatOpenAI } from "@langchain/openai";

import { tool } from "@langchain/core/tools";
import { z } from "zod";

import { AIMessage } from "@langchain/core/messages";

const model = new ChatOpenAI({
  model: "gpt-4o-mini",
  temperature: 0,
  streaming: true
});

const searchTool = tool((_) => {
  // This is a placeholder for the actual implementation
  return "Cold, with a low of 3℃";
}, {
  name: "search",
  description:
    "Use to surf the web, fetch current information, check the weather, and retrieve other information.",
  schema: z.object({
    query: z.string().describe("The query to use in your search."),
  }),
});

const tools = [searchTool];

const toolNode = new ToolNode(tools);

const boundModel = model.bindTools(tools);

const routeMessage = (state: typeof StateAnnotation.State) => {
  const { messages } = state;
  const lastMessage = messages[messages.length - 1] as AIMessage;
  // If no tools are called, we can finish (respond to the user)
  if (!lastMessage?.tool_calls?.length) {
    return END;
  }
  // Otherwise if there is, we continue and call the tools
  return "tools";
};

const callModel = async (
  state: typeof StateAnnotation.State,
) => {
  // For versions of @langchain/core < 0.2.3, you must call `.stream()`
  // and aggregate the message from chunks instead of calling `.invoke()`.
  const { messages } = state;
  const responseMessage = await boundModel.invoke(messages);
  return { messages: [responseMessage] };
};

// Define the graph
const workflow = new StateGraph(StateAnnotation)
  .addNode("agent", callModel)
  .addNode("tools", toolNode)
  .addEdge("__start__", "agent")
  .addConditionalEdges("agent", routeMessage)
  .addEdge("tools", "agent");

export const exampleSimpleGraph = workflow.compile();

exampleSimpleGraph.name = "Example Simple Graph";

and here are passing (local) and failing (remote) tests:

  Example Simple Streamed Graph
    ✓ should process input through the graph - local (2543 ms)
    ✕ should process input through the graph - remote (2431 ms)
import { isAIMessageChunk, isToolMessageChunk } from "@langchain/core/messages";
import { exampleSimpleGraph } from "../src/agent/example-simple-graph.js";

describe("Example Simple Streamed Graph", () => {
  async function testGraphStream(graph) {
    const options = { streamMode: "messages" };
    const stream = await graph.stream(
      { messages: [{ role: "user", content: "What's the current weather in Nepal?" }] },
      options
    );

    let streamedAIMessage = "";
    let streamedToolMessage = "";

    for await (const [message] of stream) {
      if (isToolMessageChunk(message)) {
        streamedToolMessage += message.content;
      } else if (isAIMessageChunk(message)) {
        streamedAIMessage += message.content;
      }
    }

    expect(streamedToolMessage).toBeDefined();
    expect(typeof streamedToolMessage).toBe("string");
    expect(streamedToolMessage).toBe("Cold, with a low of 3℃");

    expect(streamedAIMessage).toBeDefined();
    expect(typeof streamedAIMessage).toBe("string");
    expect(streamedAIMessage).toContain("cold");
  }

  it("should process input through the graph - local", async () => {
    await testGraphStream(exampleSimpleGraph);
  }, 30000); // Increased timeout to 30 seconds

  it("should process input through the graph - remote", async () => {
    const url = `http://0.0.0.0:8123`;
    const graphName = "example-simple-graph";
    const remoteGraph = new RemoteGraph({ graphId: graphName, url });

    await testGraphStream(remoteGraph);
  }, 30000); // Increased timeout to 30 seconds
});
@Jronk Jronk changed the title RemoteGraph's .stream() "message" streamMode is not working RemoteGraph's .stream() "messages" streamMode is not working Nov 20, 2024
@jacoblee93
Copy link
Collaborator

Agh shoot I need to fix this... Will try to find some time soon. Thanks for flagging!

@jacoblee93 jacoblee93 self-assigned this Nov 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants