File size: 3,825 Bytes
46dd24e 80c0c76 a5ead65 46dd24e a8d670e 80c0c76 a8d670e 46dd24e a8d670e 46dd24e a8d670e 46dd24e a5ead65 a8d670e 36a99d9 a8d670e a5ead65 46dd24e a5ead65 11dd0a0 a5ead65 7dd6bbc 46dd24e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
from __future__ import annotations
from copy import deepcopy
from typing import Any, Dict
import hydra
from langchain.tools import BaseTool
from aiflows.base_flows import AtomicFlow
from aiflows.messages import FlowMessage
class LCToolFlow(AtomicFlow):
r""" A flow that runs a tool using langchain. For example, a tool could be to excute a query with the duckduckgo search engine.
*Configuration Parameters*:
- `name` (str): The name of the flow. Default: "search"
- `description` (str): A description of the flow. This description is used to generate the help message of the flow.
Default: "useful when you need to look for the answer online, especially for recent events."
- `keep_raw_response` (bool): If True, the raw response of the tool is kept. Default: False
- `clear_flow_namespase_on_run_end` (bool): If True, the flow namespace is cleared at the end of the run. Default: False
- `backend` (Dict[str, Any]): The configuration of the backend. Default: langchain.tools.DuckDuckGoSearchRun
- Other parameters are inherited from the default configuration of AtomicFlow (see AtomicFlow)
*Input Interface*:
- `query` (str): the query to run the tool on
*Output Interface*:
- `observation` (str): the observation returned by the tool
:param backend: The backend of the flow. It is a tool that is run by the flow. (e.g. duckduckgo search engine)
:type backend: BaseTool
:param \**kwargs: Additional arguments to pass to the flow. See :class:`aiflows.base_flows.AtomicFlow` for more details.
"""
REQUIRED_KEYS_CONFIG = ["backend"]
SUPPORTS_CACHING: bool = False
backend: BaseTool
def __init__(self, backend: BaseTool, **kwargs) -> None:
super().__init__(**kwargs)
self.backend = backend
@classmethod
def _set_up_backend(cls, config: Dict[str, Any]) -> BaseTool:
""" This method sets up the backend of the flow.
:param config: The configuration of the backend.
:type config: Dict[str, Any]
:return: The backend of the flow.
"""
if config["_target_"].startswith("."):
# assumption: cls is associated with relative data_transformation_configs
# for example, CF_Code and CF_Code.yaml should be in the same directory,
# and all _target_ in CF_Code.yaml should be relative
cls_parent_module = ".".join(cls.__module__.split(".")[:-1])
config["_target_"] = cls_parent_module + config["_target_"]
tool = hydra.utils.instantiate(config, _convert_="partial")
return tool
@classmethod
def instantiate_from_config(cls, config: Dict[str, Any]) -> LCToolFlow:
""" This method instantiates the flow from a configuration file
:param config: The configuration of the flow.
:type config: Dict[str, Any]
:return: The instantiated flow.
:rtype: LCToolFlow
"""
flow_config = deepcopy(config)
kwargs = {"flow_config": flow_config}
# ~~~ Set up LangChain backend ~~~
kwargs["backend"] = cls._set_up_backend(config["backend"])
# ~~~ Instantiate flow ~~~
return cls(**kwargs)
def run(self, input_message: FlowMessage):
""" This method runs the flow. It runs the backend on the input data.
:param input_message: The input message of the flow.
:type input_message: FlowMessage
"""
input_data = input_message.data
observation = self.backend.run(tool_input=input_data)
reply = self.package_output_message(
input_message=input_message,
response = {"observation": observation}
)
self.send_message(reply)
|