from typing import Any, Dict, List, Optional
import requests
from langchain_core.callbacks.manager import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
[docs]
class ZillizCloudPipelineRetriever(BaseRetriever):
    """`Zilliz Cloud Pipeline` retriever.
    Parameters:
        pipeline_ids: A dictionary of pipeline ids.
            Valid keys: "ingestion", "search", "deletion".
        token: Zilliz Cloud's token. Defaults to "".
        cloud_region: The region of Zilliz Cloud's cluster.
            Defaults to 'gcp-us-west1'.
    """
    pipeline_ids: Dict
    token: str = ""
    cloud_region: str = "gcp-us-west1"
    def _get_relevant_documents(
        self,
        query: str,
        top_k: int = 10,
        offset: int = 0,
        output_fields: List = [],
        filter: str = "",
        *,
        run_manager: CallbackManagerForRetrieverRun,
    ) -> List[Document]:
        """
        Get documents relevant to a query.
        Args:
            query: String to find relevant documents for
            top_k: The number of results. Defaults to 10.
            offset: The number of records to skip in the search result.
                Defaults to 0.
            output_fields: The extra fields to present in output.
            filter: The Milvus expression to filter search results.
                Defaults to "".
            run_manager: The callbacks handler to use.
        Returns:
            List of relevant documents
        """
        if "search" in self.pipeline_ids:
            search_pipe_id = self.pipeline_ids.get("search")
        else:
            raise Exception(
                "A search pipeline id must be provided in pipeline_ids to "
                "get relevant documents."
            )
        domain = (
            f"https://controller.api.{self.cloud_region}.zillizcloud.com/v1/pipelines"
        )
        headers = {
            "Authorization": f"Bearer {self.token}",
            "Accept": "application/json",
            "Content-Type": "application/json",
        }
        url = f"{domain}/{search_pipe_id}/run"
        params = {
            "data": {"query_text": query},
            "params": {
                "limit": top_k,
                "offset": offset,
                "outputFields": output_fields,
                "filter": filter,
            },
        }
        response = requests.post(url, headers=headers, json=params)
        if response.status_code != 200:
            raise RuntimeError(response.text)
        response_dict = response.json()
        if response_dict["code"] != 200:
            raise RuntimeError(response_dict)
        response_data = response_dict["data"]
        search_results = response_data["result"]
        return [
            Document(
                page_content=result.pop("text")
                if "text" in result
                else result.pop("chunk_text"),
                metadata=result,
            )
            for result in search_results
        ]
[docs]
    def add_texts(
        self, texts: List[str], metadata: Optional[Dict[str, Any]] = None
    ) -> Dict:
        """
        Add documents to store.
        Only supported by a text ingestion pipeline in Zilliz Cloud.
        Args:
            texts: A list of text strings.
            metadata: A key-value dictionary of metadata will
                be inserted as preserved fields required by ingestion pipeline.
                Defaults to None.
        """
        if "ingestion" in self.pipeline_ids:
            ingeset_pipe_id = self.pipeline_ids.get("ingestion")
        else:
            raise Exception(
                "An ingestion pipeline id must be provided in pipeline_ids to"
                " add documents."
            )
        domain = (
            f"https://controller.api.{self.cloud_region}.zillizcloud.com/v1/pipelines"
        )
        headers = {
            "Authorization": f"Bearer {self.token}",
            "Accept": "application/json",
            "Content-Type": "application/json",
        }
        url = f"{domain}/{ingeset_pipe_id}/run"
        metadata = {} if metadata is None else metadata
        params = {"data": {"text_list": texts}}
        params["data"].update(metadata)
        response = requests.post(url, headers=headers, json=params)
        if response.status_code != 200:
            raise Exception(response.text)
        response_dict = response.json()
        if response_dict["code"] != 200:
            raise Exception(response_dict)
        response_data = response_dict["data"]
        return response_data 
[docs]
    def add_doc_url(
        self, doc_url: str, metadata: Optional[Dict[str, Any]] = None
    ) -> Dict:
        """
        Add a document from url.
        Only supported by a document ingestion pipeline in Zilliz Cloud.
        Args:
            doc_url: A document url.
            metadata: A key-value dictionary of metadata will
                be inserted as preserved fields required by ingestion pipeline.
                Defaults to None.
        """
        if "ingestion" in self.pipeline_ids:
            ingest_pipe_id = self.pipeline_ids.get("ingestion")
        else:
            raise Exception(
                "An ingestion pipeline id must be provided in pipeline_ids to "
                "add documents."
            )
        domain = (
            f"https://controller.api.{self.cloud_region}.zillizcloud.com/v1/pipelines"
        )
        headers = {
            "Authorization": f"Bearer {self.token}",
            "Accept": "application/json",
            "Content-Type": "application/json",
        }
        url = f"{domain}/{ingest_pipe_id}/run"
        params = {"data": {"doc_url": doc_url}}
        metadata = {} if metadata is None else metadata
        params["data"].update(metadata)
        response = requests.post(url, headers=headers, json=params)
        if response.status_code != 200:
            raise Exception(response.text)
        response_dict = response.json()
        if response_dict["code"] != 200:
            raise Exception(response_dict)
        response_data = response_dict["data"]
        return response_data 
[docs]
    def delete(self, key: str, value: Any) -> Dict:
        """
        Delete documents. Only supported by a deletion pipeline in Zilliz Cloud.
        Args:
            key: input name to run the deletion pipeline
            value: input value to run deletion pipeline
        """
        if "deletion" in self.pipeline_ids:
            deletion_pipe_id = self.pipeline_ids.get("deletion")
        else:
            raise Exception(
                "A deletion pipeline id must be provided in pipeline_ids to "
                "add documents."
            )
        domain = (
            f"https://controller.api.{self.cloud_region}.zillizcloud.com/v1/pipelines"
        )
        headers = {
            "Authorization": f"Bearer {self.token}",
            "Accept": "application/json",
            "Content-Type": "application/json",
        }
        url = f"{domain}/{deletion_pipe_id}/run"
        params = {"data": {key: value}}
        response = requests.post(url, headers=headers, json=params)
        if response.status_code != 200:
            raise Exception(response.text)
        response_dict = response.json()
        if response_dict["code"] != 200:
            raise Exception(response_dict)
        response_data = response_dict["data"]
        return response_data