Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The batch method is not implemented on AsyncBatchedStore #686

Open
ashburnham opened this issue Nov 24, 2024 · 9 comments
Open

The batch method is not implemented on AsyncBatchedStore #686

ashburnham opened this issue Nov 24, 2024 · 9 comments

Comments

@ashburnham
Copy link

import {
  LangGraphRunnableConfig
 } from '@langchain/langgraph';
import { StateSchema } from '../state';
import {
  Configuration
} from '../../../core/configurations/booking';
import { graph as backgroundStateManagerGraph } from '../../../graphs/background_state_manager/graph';

const manageStateInBackground = async (
  state: StateSchema,
  config: LangGraphRunnableConfig<Configuration>,
): Promise<StateSchema> => {
  void await backgroundStateManagerGraph.invoke(
    state,
    config
  );
  return state;
}

export default manageStateInBackground;

When I run a graph with this node, I get the following error:

Error: The `batch` method is not implemented on `AsyncBatchedStore`.
 Instead, it calls the `batch` method on the wrapped store.
 If you are seeing this error, something is wrong

Looking at the error, it seems to be because the store gets double wrapped...

This is what "store" looks like in the original graph

  AsyncBatchedStore {
    store: RemoteStore {},
  }

This is what "store" looks like in the parent graph

  AsyncBatchedStore {       // Outer AsyncBatchedStore
    store: AsyncBatchedStore {    // Inner AsyncBatchedStore
      store: RemoteStore {},
      ...
    },
    ...
  }

I think this is a bug, and if not the docs need to be updated! please can you advise...

@ashburnham
Copy link
Author

This is with @langchain/[email protected] btw

@ashburnham
Copy link
Author

This is what a log of the store looks like just before it gets used (and throws an error)

langgraph-api-1       |  AsyncBatchedStore {
langgraph-api-1       |   store: AsyncBatchedStore {
langgraph-api-1       |     store: RemoteStore {},
langgraph-api-1       |     queue: Map(0) {},
langgraph-api-1       |     nextKey: 0,
langgraph-api-1       |     running: true,
langgraph-api-1       |     processingTask: Promise { <pending> }
langgraph-api-1       |   },
langgraph-api-1       |   queue: Map(0) {},
langgraph-api-1       |   nextKey: 0,
langgraph-api-1       |   running: true,
langgraph-api-1       |   processingTask: Promise {
langgraph-api-1       |     <pending>,
langgraph-api-1       |     [Symbol(async_id_symbol)]: 7114,
langgraph-api-1       |     [Symbol(trigger_async_id_symbol)]: 6784,
langgraph-api-1       |     [Symbol(kResourceStore)]: RunTree {
langgraph-api-1       |       id: 'a....9',
langgraph-api-1       |       name: 'manage_state_in_background',
langgraph-api-1       |       run_type: 'chain',
langgraph-api-1       |       project_name: 'hirespace-langgraph-agent',
langgraph-api-1       |       parent_run: [RunTree],
langgraph-api-1       |       child_runs: [],
langgraph-api-1       |       start_time: 1732483314101,
langgraph-api-1       |       end_time: undefined,
langgraph-api-1       |       extra: [Object],
langgraph-api-1       |       tags: [Array],
langgraph-api-1       |       error: undefined,
langgraph-api-1       |       serialized: [Object],
langgraph-api-1       |       inputs: [Object],
langgraph-api-1       |       outputs: undefined,
langgraph-api-1       |       reference_example_id: undefined,
langgraph-api-1       |       client: [Client],
langgraph-api-1       |       events: [Array],
langgraph-api-1       |       trace_id: '1e...1a',
langgraph-api-1       |       dotted_order: '2...99',
langgraph-api-1       |       tracingEnabled: true,
langgraph-api-1       |       execution_order: 3,
langgraph-api-1       |       child_execution_order: 3,
langgraph-api-1       |       attachments: undefined,
langgraph-api-1       |       api_url: 'https://api.smith.langchain.com',
langgraph-api-1       |       api_key: 'lsv.....082',
langgraph-api-1       |       caller_options: {},
langgraph-api-1       |       parent_run_id: '1ef.1a'..
langgraph-api-1       |     }
langgraph-api-1       |   }
langgraph-api-1       | }

@jacoblee93
Copy link
Collaborator

CC @bracesproul

@bracesproul
Copy link
Member

@ashburnham are you running this in LangGraph studio? Also, could you give me more context as to what's happening in backgroundStateManagerGraph?

@ashburnham
Copy link
Author

Yup - this is running in Langgraph studio - the logs above are just copy/pasted from it.

backgroundStateManagerGraph is a sub-graph which as the purpose of updating the thread state with info from our database. The plan is that this is done on a short delay so it doesn't block the hot-path but I can't make that work atm (see #687).

I've copied the code for it below for completeness.

import {
  START,
  StateGraph,
  END
} from '@langchain/langgraph';
import {
  ConfigurationAnnotation
} from '../../core/configurations/booking';
import stateSchema from './state';
import {
  refreshState,
  truncateMessages,
  updateMemories
} from './nodes';

ConfigurationAnnotation.lc_graph_name = 'background_state_manager';

export const builder = new StateGraph(
  {
    stateSchema
  },
  ConfigurationAnnotation
)
  .addNode('refresh_state', refreshState)
  .addNode('truncate_messages', truncateMessages)
  .addNode('update_memories', updateMemories)
  .addEdge(START, 'refresh_state')
  .addEdge('refresh_state', 'truncate_messages')
  .addEdge('truncate_messages', 'update_memories')
  .addEdge('update_memories', END);

export const graph = builder.compile();
graph.name = 'background_state_manager';

This issue is happening with all sub-graphs though, it's not specific to this graph.

Just shout if you need anything else.

@bracesproul
Copy link
Member

@ashburnham are you using the built in store from LangGraph? (The store which is accessed via config.store) I'm asking because the AsyncBatchedStore is automatically added to support LangGraph's store, and if you're not trying to override it or do anything out of the ordinary that error shouldn't occur.

@ashburnham
Copy link
Author

I'm just using the store accessed via config.store and using patterns that I've seen in langchain examples.

Function to get the store:

import { AsyncBatchedStore, BaseStore, LangGraphRunnableConfig } from '@langchain/langgraph';
/**
 * Get the store from the configuration or throw an error.
 */
const getStoreFromConfigOrThrow = (
  config: LangGraphRunnableConfig
): BaseStore | AsyncBatchedStore => {
  if (!config.store) {
    throw new Error('Store not found in configuration');
  }

  return config.store;
}

export default getStoreFromConfigOrThrow;

Parent graph

import {
  START,
  StateGraph,
  END,
  MemorySaver
} from '@langchain/langgraph';
import {
  ConfigurationAnnotation
} from '../../core/configurations/booking';
import stateSchema from './state';
import {
  graph as eventManagerGraph
} from '../event_manager/graph';

ConfigurationAnnotation.lc_graph_name = 'relationship_manager';

export const builder = new StateGraph(
  {
    stateSchema
  },
  ConfigurationAnnotation
)
  .addNode('event_manager', eventManagerGraph)
  .addEdge(START, 'event_manager')
  .addEdge('event_manager', END);

export const graph = builder.compile({
  checkpointer: new MemorySaver() // Also tried this without defining a checkpointer
});
graph.name = 'relationship_manager';

Child graph

import {
  START,
  StateGraph,
  END
} from '@langchain/langgraph';
import {
  ConfigurationAnnotation
} from '../../core/configurations/booking';
import stateSchema from './state';
import {
  callModel,
} from './nodes';

ConfigurationAnnotation.lc_graph_name = 'event_manager';

export const builder = new StateGraph(
  {
    stateSchema
  },
  ConfigurationAnnotation
)
  .addNode('call_model', callModel)
  .addEdge(START, 'call_model')
  .addEdge('call_model', END);

export const graph = builder.compile();
graph.name = 'event_manager';

Node that throws error when it gets to the store..

import { LangGraphRunnableConfig } from '@langchain/langgraph';
import { initChatModel } from 'langchain/chat_models/universal';
// import { v4 as uuidv4 } from 'uuid';
import { StateSchema } from '../state';
import {
  getStoreFromConfigOrThrow,
  formatMemories,
  splitModelAndProvider
} from '../../../utils';
import initializeTools from '../initialise-tools';
import { ensureConfiguration } from '../../../core/configurations/booking';
import { SYSTEM } from '../prompts';

const llm = await initChatModel();

const callModel = async (
  state: StateSchema,
  config: LangGraphRunnableConfig
): Promise<StateSchema> => {
  const store = getStoreFromConfigOrThrow(config);
  const configurable = ensureConfiguration(config);
  console.log('event_manager callModel store', store);
  const memories = await store.search([
    'memories',
    configurable.customerId ?? config.metadata?.thread_id ?? uuidv4()
  ], {
    limit: 10,
  });

  const formatted = formatMemories(memories)

  const sys = SYSTEM
    .replace('{user_info}', formatted)
    .replace('{time}', new Date().toISOString());
  const tools = initializeTools(config);

  const boundLLM = llm.bind({
    metadata: {
      node: 'call_model',
      graph: 'event_manager'
    },
    tools,
    tool_choice: 'auto'
  });
  const messagesWithContent = state.messages.filter(
    ({ content }) => content?.length
  );

  const result = await boundLLM.invoke(
    [{ role: 'system', content: sys }, ...messagesWithContent],
    {
      configurable: splitModelAndProvider(configurable.model)
    }
  );

  return {
    messages: [result],
    lastSummarisedMessageId: undefined,
    booking: {},
    customer: {},
    company: {},
    supplierBookingLines: [],
    venueBookingLines: [],
    clientEmails: []
  };
};

export default callModel;

This is the error

langgraph-api-1       | error | Error: The `batch` method is not implemented on `AsyncBatchedStore`.
 Instead, it calls the `batch` method on the wrapped store.
 If you are seeing this error, something is wrong.
    at AsyncBatchedStore.batch (file:///api/langgraph_api/js/node_modules/@langchain/langgraph-checkpoint/dist/store/batch.js:48:15)
    at AsyncBatchedStore.processBatchQueue (file:///api/langgraph_api/js/node_modules/@langchain/langgraph-checkpoint/dist/store/batch.js:104:50)
langgraph-api-1       | error | Background run failed

Except from console log above..

  event_manager callModel store AsyncBatchedStore {
    store: AsyncBatchedStore { ... }
    ...
  }

Just let me know if you need anything else

@SquirrelDeveloper
Copy link

Yes, I have the same problem, it appeared when I "splitted" my graph into multiple subgraphs.
The parent original memory store doesnt seem to be passed to the subgraphs

@SquirrelDeveloper
Copy link

@bracesproul Here is the code to reproduce

import {
  Annotation,
  END,
  InMemoryStore,
  LangGraphRunnableConfig,
  MessagesAnnotation,
  START,
  StateGraph,
} from '@langchain/langgraph';

export const GraphState = Annotation.Root({
  ...MessagesAnnotation.spec,
});

const rootNode = (state: typeof GraphState.State, config: LangGraphRunnableConfig) => {
  return state;
};

const subGraphNode = async (state: typeof GraphState.State, config: LangGraphRunnableConfig) => {
  const userMainGoals = await config.store?.get(['test', 'preferences'], 'userMainGoals');
  return state;
};

const notWorkingSubWorkflow = new StateGraph(GraphState)
  .addNode('rootNode', rootNode)
  .addNode('subGraphNode', subGraphNode)
  .addEdge(START, 'rootNode')
  .addEdge('rootNode', 'subGraphNode')
  .addEdge('subGraphNode', END)
  .compile();

const notWorkingWorkflow = new StateGraph(GraphState)
  .addNode('rootNode', rootNode)
  .addNode('notWorkingSubWorkflow', notWorkingSubWorkflow)
  .addEdge(START, 'rootNode')
  .addEdge('rootNode', 'notWorkingSubWorkflow')
  .addEdge('notWorkingSubWorkflow', END);

const workingWorkflow = new StateGraph(GraphState)
  .addNode('rootNode', rootNode)
  .addNode('subGraphNode', subGraphNode)
  .addEdge(START, 'rootNode')
  .addEdge('rootNode', 'subGraphNode')
  .addEdge('subGraphNode', END);

export const testNotWorking = async () => {
  const store = new InMemoryStore();
  await notWorkingWorkflow.compile({ store: store }).invoke({});
};

export const testWorking = async () => {
  const store = new InMemoryStore();
  await workingWorkflow.compile({ store: store }).invoke({});
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants