Skip to content

definitions

TaskGraph

Bases: Graph

Source code in src/lmflux/graphs/task/definitions.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class TaskGraph(Graph):
    # -------------
    #  Private methods 
    # -------------
    def __init__(self):
        super().__init__(draw_labels_around=True)

    # -------------
    #  Public API 
    # -------------

    def run(self, with_context:Context=None) -> Session:
        """
        Execute every node of the graph respecting the directed edges.
        Cycles raise a ``RuntimeError``.
        """
        if with_context:
            session=Session(with_context)
        else:
            session = Session()
        try:
            order = list(nx.topological_sort(self.G))
        except nx.NetworkXUnfeasible as exc:  # pragma: no cover
            raise RuntimeError(
                "The task graph contains a cycle and cannot be executed."
            ) from exc

        for nid in order:
            obj: RunnableNodeDefinition = self.G.nodes[nid]["obj"]
            if not isinstance(obj, RunnableNodeDefinition):
                raise RuntimeError(
                    f"Node {nid} is not of type RunnableNodeDefinition."
                )
            obj.__execute__(session)
        return session

    def connect_tasks(self, task_a:RunnableNodeDefinition, task_b:RunnableNodeDefinition):
        definition_a = self.__find_object_in_graph_by_name__(task_a.name)
        definition_b = self.__find_object_in_graph_by_name__(task_b.name)

        if not definition_a:
            self.__add_node__(task_a)
        if not definition_b:
            self.__add_node__(task_b)

        _metadata = {}
        self.__add_edge__(task_a, task_b, _metadata=_metadata)

run(with_context=None)

Execute every node of the graph respecting the directed edges. Cycles raise a RuntimeError.

Source code in src/lmflux/graphs/task/definitions.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def run(self, with_context:Context=None) -> Session:
    """
    Execute every node of the graph respecting the directed edges.
    Cycles raise a ``RuntimeError``.
    """
    if with_context:
        session=Session(with_context)
    else:
        session = Session()
    try:
        order = list(nx.topological_sort(self.G))
    except nx.NetworkXUnfeasible as exc:  # pragma: no cover
        raise RuntimeError(
            "The task graph contains a cycle and cannot be executed."
        ) from exc

    for nid in order:
        obj: RunnableNodeDefinition = self.G.nodes[nid]["obj"]
        if not isinstance(obj, RunnableNodeDefinition):
            raise RuntimeError(
                f"Node {nid} is not of type RunnableNodeDefinition."
            )
        obj.__execute__(session)
    return session

agentic_task(agent)

Decorator for creating an AgenticTask with a specific agent.

Usage

@agentic_task(my_agent) def my_task(agent: Agent, session: Session): ...

Source code in src/lmflux/graphs/task/definitions.py
124
125
126
127
128
129
130
131
132
133
134
135
136
def agentic_task(agent: Agent):
    """
    Decorator for creating an AgenticTask with a specific agent.

    Usage:
        @agentic_task(my_agent)
        def my_task(agent: Agent, session: Session):
            ...
    """
    def decorator(func: callable):
        check_compatible(func, "run", EXPECTED_AGENTIC_CALLBACK)
        return AgenticTask(func.__name__, agent, func)
    return decorator

transformer_task(func)

Decorator for creating an TransformerTask.

Usage

@transformer_task def my_task(session: Session): ...

Source code in src/lmflux/graphs/task/definitions.py
112
113
114
115
116
117
118
119
120
121
122
def transformer_task(func:callable):
    """
    Decorator for creating an TransformerTask.

    Usage:
        @transformer_task
        def my_task(session: Session):
            ...
    """
    check_compatible(func, "run", EXPECTED_TRANSFORMER_CALLBACK)
    return TransformerTask(func.__name__, func)