Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Child workflow execution stuck when running in testing.WorkflowEnvironment and workflow update is involved. #661

Closed
Chengdyc opened this issue Oct 8, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@Chengdyc
Copy link

Chengdyc commented Oct 8, 2024

What are you really trying to do?

We're using Temporal to build a chat application and we have a parent-child workflow set up that models the conversation (session) and each conversation turn (request+response). The conversation is a long running entity workflow, and each input is submitted via a 'workflow update' which starts a child workflow to generate the response.

we'd want to write a unit test for the conversation workflow to verify it's behavior. In the unit test, we mock the child workflow so we don't need to mock the logic and activities within the child workflow.

Describe the bug

When we run the unit test, the test get stuck starting the child workflow. We added print statements before calling execute_child_workflow and at the start of the child workflow. It appears that Temporal test environment is unable to start the child workflow.

Minimal Reproduction

unit test to reproduce the bug. there are 3 test cases here.

  1. send 'workflow update' to parent workflow that starts a child workflow. this test will get stuck, remove the pytest.mark.skip annotation to run it
  2. send 'signal' to parent workflow that starts a child workflow. this test succeeds
  3. send 'workflow update' to workflow that starts an activity (no child workflow involved). this test succeeds.

I'm using Temporal Python SDK version 1.7.1

import asyncio
import logging
import uuid
from datetime import timedelta

import pytest
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

logger = logging.getLogger(__name__)

@workflow.defn
class ParentWorkflow:
    def __init__(self) -> None:
        self.input = None
        self.output = None

    @workflow.run
    async def run(self):
        while True:
            try:
                await workflow.wait_condition(
                    lambda: self.input is not None,
                    timeout=timedelta(minutes=60)
                )
            except TimeoutError:
                workflow.logger.info("we're done")
                return

            workflow.logger.info("exited wait, starting child workflow")
            self.output = await workflow.execute_child_workflow(
                ChildWorkflow.run,
                args=[self.input],
                task_queue=workflow.info().task_queue,
            )
            workflow.logger.info("child workflow complete, got output")
            self.input = None
    
    @workflow.update
    async def process_update(self, input: str):
        self.input = input
        self.output = None
        logger.info("waiting for output")
        await workflow.wait_condition(
            lambda: self.output is not None
        )
        return self.output
    
    @workflow.signal
    async def signal(self, input: str):
        self.input = input
        self.output = None
    

@workflow.defn
class ChildlessWorkflow:
    def __init__(self) -> None:
        self.input = None
        self.output = None

    @workflow.run
    async def run(self):
        while True:
            try:
                await workflow.wait_condition(
                    lambda: self.input is not None,
                    timeout=timedelta(minutes=60)
                )
            except TimeoutError:
                workflow.logger.info("we're done")
                return

            workflow.logger.info("exited wait, starting activity")
            self.output = await workflow.execute_activity_method(
                Activities.child_activity,
                args=[self.input],
                task_queue=workflow.info().task_queue,
                schedule_to_close_timeout=timedelta(
                    seconds=60
                ),
                retry_policy=RetryPolicy(maximum_attempts=1)
            )
            workflow.logger.info("child workflow complete, got output")
            self.input = None
    
    @workflow.update
    async def process_update(self, input: str):
        self.input = input
        self.output = None
        logger.info("waiting for output")
        await workflow.wait_condition(
            lambda: self.output is not None
        )
        return self.output


class Activities:
    @activity.defn
    async def child_activity(
        self,
        name: str
    ) -> str:
        return f"child activity {name}"


@workflow.defn
class ChildWorkflow:
    @workflow.run
    async def run(self, name: str):
        return await workflow.execute_activity_method(
            Activities.child_activity,
            args=[name],
            schedule_to_close_timeout=timedelta(
                seconds=60
            ),
            task_queue=workflow.info().task_queue,
            retry_policy=RetryPolicy(maximum_attempts=1)
       )


@workflow.defn(name="ChildWorkflow")
class MockChildWorkflow:
    @workflow.run
    async def run(self, name: str):
        await asyncio.sleep(60)
        return f"mock {name} done"
    
pytestmark = pytest.mark.asyncio


@pytest.mark.skip(
    reason="test will get stuck"
)
async def test_workflow_update():
    """ Test to show that if we use workflow update together with child workflow execution we'll trigger the bug. This test will get stuck """
    tmp_task_queue = f"task_queue-{uuid.uuid4()}"
    activities = Activities()
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue=tmp_task_queue,
            workflows=[ParentWorkflow, MockChildWorkflow],
            activities=[activities.child_activity],
        ):
            # env is WorkflowEnvironment with time-skipping
            handle = await env.client.start_workflow(
                ParentWorkflow.run,
                args=[],
                task_queue=tmp_task_queue,
                id=str(uuid.uuid4())
            )
            logger.info("executing update")
            update_result = await handle.execute_update(
                ParentWorkflow.process_update,
                args=["my name"]
            )
            logger.info(f"got result: {update_result}, waiting for termination")
            # do some other verification with workflow handle
            # wait for workflow to finish
            result = await handle.result()
            print(result)


async def test_workflow_signal():
    """ Test to show that if we are NOT sending workflow update, the bug is not triggered. This test will succeed """
    tmp_task_queue = f"task_queue-{uuid.uuid4()}"
    activities = Activities()
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue=tmp_task_queue,
            workflows=[ParentWorkflow, MockChildWorkflow],
            activities=[activities.child_activity],
        ):
            # env is WorkflowEnvironment with time-skipping
            handle = await env.client.start_workflow(
                ParentWorkflow.run,
                args=[],
                task_queue=tmp_task_queue,
                id=str(uuid.uuid4())
            )
            logger.info("executing update")
            update_result = await handle.signal(
                ParentWorkflow.signal,
                args=["my name"]
            )
            logger.info(f"got result: {update_result}, waiting for termination")
            # do some other verification with workflow handle
            # wait for workflow to finish
            result = await handle.result()
            print(result)


async def test_childless_workflow():
    """ Test to show that if we use workflow update and there is no child workflow, the bug is not triggered. this test will succeed """
    tmp_task_queue = f"task_queue-{uuid.uuid4()}"
    activities = Activities()
    async with await WorkflowEnvironment.start_time_skipping() as env:
        async with Worker(
            env.client,
            task_queue=tmp_task_queue,
            workflows=[ChildlessWorkflow],
            activities=[activities.child_activity],
        ):
            # env is WorkflowEnvironment with time-skipping
            handle = await env.client.start_workflow(
                ChildlessWorkflow.run,
                args=[],
                task_queue=tmp_task_queue,
                id=str(uuid.uuid4())
            )
            logger.info("executing update")
            update_result = await handle.execute_update(
                ChildlessWorkflow.process_update,
                args=["my name"]
            )
            logger.info(f"got result: {update_result}, waiting for termination")
            # do some other verification with workflow handle
            # wait for workflow to finish
            result = await handle.result()
            print(result)

Command to run this:

poetry run pytest -rA -s test_mock_child.py

Environment/Versions

  • OS and processor: Linux (WSL2)
  • Temporal Version: 1.7.1
  • Are you using Docker or Kubernetes or building Temporal from source? No

Additional context

@Chengdyc Chengdyc added the bug Something isn't working label Oct 8, 2024
@cretz
Copy link
Member

cretz commented Oct 8, 2024

Thanks for the report! We will investigate.

@cretz
Copy link
Member

cretz commented Oct 9, 2024

Automatic time skipping only works when you're waiting on the workflow result. If you start the workflow and do not wait on the result, the automatic time skipping will not occur. From https://github.com/temporalio/sdk-python?tab=readme-ov-file#automatic-time-skipping:

Anytime a workflow result is waited on, the time-skipping server automatically advances to the next event it can

If, before logger.info("executing update") you had result_task = asyncio.create_task(handle.result()) that would wait for the result in the background and ensure automatic time skipping. Granted that would immediately trigger the 60s timeout. You can also enable auto-time-skipping with with env.time_skipping_unlocked():. I will confer with the team if we want to also enable automatic time skipping when waiting on update result.

@cretz
Copy link
Member

cretz commented Oct 21, 2024

We have opened #675 to enable auto time skipping when waiting on an update result. Closing this issue in favor of that one (you can still do the workaround of waiting on the workflow in the background to enable auto-skipping).

@cretz cretz closed this as not planned Won't fix, can't repro, duplicate, stale Oct 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants