Spaces:
Runtime error
Runtime error
import json | |
import gradio as gr | |
import requests as req | |
code_nl = "function for db connection" | |
CT5_URL = "https://api-inference.huggingface.co/models/nielsr/codet5-small-code-summarization-ruby" | |
CT5_METHOD = 'POST' | |
API_URL = CT5_URL | |
headers = {"Authorization": "Bearer api_UhCKXKyqxJOpOcbvrZurQFqmVNZRTtxVfl"} | |
def query(payload): | |
response = req.post(API_URL, headers=headers, json=payload) | |
return response.json() | |
function_code = r""" | |
def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, | |
batch_size: int = 10_000, duplicate_documents: Optional[str] = None): | |
if index and not self.client.indices.exists(index=index): | |
self._create_document_index(index) | |
if index is None: | |
index = self.index | |
duplicate_documents = duplicate_documents or self.duplicate_documents | |
assert duplicate_documents in self.duplicate_documents_options, \ | |
f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}" | |
field_map = self._create_document_field_map() | |
document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] | |
document_objects = self._handle_duplicate_documents(documents=document_objects, | |
index=index, | |
duplicate_documents=duplicate_documents) | |
documents_to_index = [] | |
for doc in document_objects: | |
_doc = { | |
"_op_type": "index" if duplicate_documents == 'overwrite' else "create", | |
"_index": index, | |
**doc.to_dict(field_map=self._create_document_field_map()) | |
} # type: Dict[str, Any] | |
# cast embedding type as ES cannot deal with np.array | |
if _doc[self.embedding_field] is not None: | |
if type(_doc[self.embedding_field]) == np.ndarray: | |
_doc[self.embedding_field] = _doc[self.embedding_field].tolist() | |
# rename id for elastic | |
_doc["_id"] = str(_doc.pop("id")) | |
# don't index query score and empty fields | |
_ = _doc.pop("score", None) | |
_doc = {k:v for k,v in _doc.items() if v is not None} | |
# In order to have a flat structure in elastic + similar behaviour to the other DocumentStores, | |
# we "unnest" all value within "meta" | |
if "meta" in _doc.keys(): | |
for k, v in _doc["meta"].items(): | |
_doc[k] = v | |
_doc.pop("meta") | |
documents_to_index.append(_doc) | |
# Pass batch_size number of documents to bulk | |
if len(documents_to_index) % batch_size == 0: | |
bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type) | |
documents_to_index = [] | |
if documents_to_index: | |
bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type) | |
""" | |
task_code = f' Summarize Python: {function_code}' | |
real_docstring = r""" | |
Indexes documents for later queries in Elasticsearch. | |
Behaviour if a document with the same ID already exists in ElasticSearch: | |
a) (Default) Throw Elastic's standard error message for duplicate IDs. | |
b) If `self.update_existing_documents=True` for DocumentStore: Overwrite existing documents. | |
(This is only relevant if you pass your own ID when initializing a `Document`. | |
If don't set custom IDs for your Documents or just pass a list of dictionaries here, | |
they will automatically get UUIDs assigned. See the `Document` class for details) | |
:param documents: a list of Python dictionaries or a list of Haystack Document objects. | |
For documents as dictionaries, the format is {"content": "<the-actual-text>"}. | |
Optionally: Include meta data via {"content": "<the-actual-text>", | |
"meta":{"name": "<some-document-name>, "author": "somebody", ...}} | |
It can be used for filtering and is accessible in the responses of the Finder. | |
Advanced: If you are using your own Elasticsearch mapping, the key names in the dictionary | |
should be changed to what you have set for self.content_field and self.name_field. | |
:param index: Elasticsearch index where the documents should be indexed. If not supplied, self.index will be used. | |
:param batch_size: Number of documents that are passed to Elasticsearch's bulk function at a time. | |
:param duplicate_documents: Handle duplicates document based on parameter options. | |
Parameter options : ( 'skip','overwrite','fail') | |
skip: Ignore the duplicates documents | |
overwrite: Update any existing documents with the same ID when adding documents. | |
fail: an error is raised if the document ID of the document being added already | |
exists. | |
:raises DuplicateDocumentError: Exception trigger on duplicate document | |
:return: None | |
""" | |
def docgen_func(function_code): | |
req_data = {"inputs": task_code} | |
output = query(req_data) | |
return str(output) | |
def pygen_func(nl_code_intent): | |
# inputs = {'code_nl': code_nl} | |
# payload = json.dumps(inputs) | |
# prediction = req.request(CT5_METHOD, CT5_URL, data=payload) | |
# prediction = req.request(CT5_METHOD, CT5_URL, json=req_data) | |
# answer = json.loads(prediction.content.decode("utf-8")) | |
# return str(answer) | |
iface = gr.Interface( | |
# pygen_func, | |
docgen_func, | |
[ | |
# gr.inputs.Textbox(lines=7, label="Code Intent (NL)", default=task_code), | |
gr.inputs.Textbox(lines=7, label="Task + Code (PL)", default=task_code), | |
], | |
# gr.outputs.Textbox(label="Code Generated PL")) | |
gr.outputs.Textbox(label="Docstring Generated (NL)")) | |
iface.launch(share=True) |