adispatch_custom_event#

async langchain_core.callbacks.manager.adispatch_custom_event(name: str, data: Any, *, config: RunnableConfig | None = None) None[source]#

Dispatch an adhoc event to the handlers.

Parameters:
  • name (str) – The name of the adhoc event.

  • data (Any) – The data for the adhoc event. Free form data. Ideally should be JSON serializable to avoid serialization issues downstream, but this is not enforced.

  • config (Optional[RunnableConfig]) – Optional config object. Mirrors the async API but not strictly needed.

Return type:

None

Example

from langchain_core.callbacks import (
    AsyncCallbackHandler,
    adispatch_custom_event
)
from langchain_core.runnable import RunnableLambda

class CustomCallbackManager(AsyncCallbackHandler):
    async def on_custom_event(
        self,
        name: str,
        data: Any,
        *,
        run_id: UUID,
        tags: Optional[List[str]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        print(f"Received custom event: {name} with data: {data}")

callback = CustomCallbackManager()

async def foo(inputs):
    await adispatch_custom_event("my_event", {"bar": "buzz})
    return inputs

foo_ = RunnableLambda(foo)
await foo_.ainvoke({"a": "1"}, {"callbacks": [CustomCallbackManager()]})

Example: Use with astream events

from langchain_core.callbacks import (
    AsyncCallbackHandler,
    adispatch_custom_event
)
from langchain_core.runnable import RunnableLambda

class CustomCallbackManager(AsyncCallbackHandler):
    async def on_custom_event(
        self,
        name: str,
        data: Any,
        *,
        run_id: UUID,
        tags: Optional[List[str]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        print(f"Received custom event: {name} with data: {data}")

callback = CustomCallbackManager()

async def foo(inputs):
    await adispatch_custom_event("event_type_1", {"bar": "buzz})
    await adispatch_custom_event("event_type_2", 5)
    return inputs

foo_ = RunnableLambda(foo)

async for event in foo_.ainvoke_stream(
    {"a": "1"},
    version="v2",
    config={"callbacks": [CustomCallbackManager()]}
):
    print(event)

Added in version 0.2.15.