Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calling .astream inside of another graph's .astream_events will set stream_mode="value" regardless of config #2351

Open
5 tasks done
lucaslulucaslu opened this issue Nov 6, 2024 · 2 comments

Comments

@lucaslulucaslu
Copy link

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangGraph/LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangGraph/LangChain rather than my code.
  • I am sure this is better as an issue rather than a GitHub discussion, since this is a LangGraph bug and not a design question.

Example Code

cannot have code here due to security issue

Error Message and Stack Trace (if applicable)

No response

Description

I have a graph A streaming LLM tokens, I tested it and can confirm I get updates as streaming. Then I have another graph B which is invoked by B.astream_events, then invoke A.astream, I can only get values of every GraphState, no matter I set stream_mode to "value" or "update"

System Info

latest langgraph
mac
python 3.12

@vbarda
Copy link
Collaborator

vbarda commented Nov 6, 2024

@lucaslulucaslu could you write some pseudocode to illustrate this issue better?

@lucaslulucaslu
Copy link
Author

lucaslulucaslu commented Nov 9, 2024

Sure @vbarda

from langgraph.graph import StateGraph,START,END
from pydantic import BaseModel

class GraphState1(BaseModel):
    a: int = 0
    b: int = 0

def graph1_node1(state: GraphState1):
    a=state.a
    a+=1
    return {"a":a}
def graph1_node2(state: GraphState1):
    b=state.b
    b+=1
    return {"b":b}


graph1=StateGraph(GraphState1)
graph1.add_node(graph1_node1.__name__,graph1_node1)
graph1.add_node(graph1_node2.__name__,graph1_node2)
graph1.add_edge(START,graph1_node1.__name__)
graph1.add_edge(graph1_node1.__name__,graph1_node2.__name__)
graph1.add_edge(graph1_node2.__name__,END)
graph1_app= graph1.compile()

async for chunk in graph1_app.astream({"a":0,"b":0}):
    print(chunk)

This will output updates as expected:

{'graph1_node1': {'a': 1}}
{'graph1_node2': {'b': 1}}

However if we invoke this exactly same graph in another graph:

class GraphState2(BaseModel):
    c: int = 0
    d: int = 0

async def graph2_node1(state: GraphState2):
    async for chunk in graph1_app.astream({"a":0,"b":0},stream_mode="updates"):
        print(chunk)
    c=state.c
    c+=1
    return {"c":c}
def graph2_node2(state: GraphState2):
    d=state.d
    d+=1
    return {"d":d}

graph2=StateGraph(GraphState2)
graph2.add_node(graph2_node1.__name__,graph2_node1)
graph2.add_node(graph2_node2.__name__,graph2_node2)
graph2.add_edge(START,graph2_node1.__name__)
graph2.add_edge(graph2_node1.__name__,graph2_node2.__name__)
graph2.add_edge(graph2_node2.__name__,END)
graph2_app= graph2.compile()

async for chunk in graph2_app.astream_events({"c":0,"d":0},version="v2"):
    pass

graph2 does nothing but invoke graph1 in graph2_node1, and the print out result is:

{'a': 0, 'b': 0}
{'a': 1, 'b': 0}
{'a': 1, 'b': 1}

even I explicitly use async for chunk in graph1_app.astream({"a":0,"b":0},stream_mode="updates"): with stream_mode ="updates"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants