Spaces:
Sleeping
Sleeping
import os | |
import openai | |
import time | |
import wikipedia | |
import random | |
import re | |
import requests | |
from bs4 import BeautifulSoup | |
import os | |
import glob | |
from natsort import natsorted | |
import requests | |
from bs4 import BeautifulSoup | |
import xml.etree.ElementTree as ET | |
from pytrials.client import ClinicalTrials | |
from Bio import Entrez | |
import pandas as pd | |
import numpy as np | |
import time | |
#from langchain.agents import create_pandas_dataframe_agent | |
from langchain_experimental.agents import create_pandas_dataframe_agent | |
#from langchain.llms import OpenAI | |
from langchain_community.llms import OpenAI | |
# APIキーの設定 | |
openai.api_key = os.environ['OPENAI_API_KEY'] | |
gptengine="gpt-3.5-turbo" | |
"""def get_selected_fileds(texts): | |
ct = ClinicalTrials() | |
input_name = texts.replace(' ' , "+") | |
corona_fields = ct.get_study_fields( | |
search_expr="%s SEARCH[Location](AREA[LocationCountry]Japan AND AREA[LocationStatus]Recruiting)"%(input_name), | |
fields=["NCTId", "Condition", "BriefTitle",'BriefSummary','EligibilityCriteria'], | |
max_studies=500, | |
fmt="csv") | |
return corona_fields""" | |
def get_retriever_str(fields): | |
retriever_str='' | |
for i in range(1,len(fields)): | |
colnames = fields[0] | |
targetCol = fields[i] | |
for f in range(len(fields[0])): | |
retriever_str+=colnames[f] + ":" + targetCol[f] +"\n" | |
retriever_str+='\n' | |
return retriever_str | |
def get_chanked_retriever(fields): | |
retriever_list =[] | |
for i in range(1,len(fields)): | |
retriever_str='' | |
colnames = fields[0] | |
targetCol = fields[i] | |
for f in range(len(fields[0])): | |
retriever_str+=colnames[f] + ":" + targetCol[f] +"\n" | |
retriever_list.append(retriever_str) | |
return retriever_list | |
from pytrials.client import ClinicalTrials | |
def get_selected_fields(texts, split_criteria=False, | |
split_word_number = False, split_number=700): | |
ct = ClinicalTrials() | |
input_name = texts.replace(' ', "+") | |
corona_fields = ct.get_study_fields( | |
search_expr="%s SEARCH[Location](AREA[LocationCountry]Japan AND AREA[LocationStatus]Recruiting)" % (input_name), | |
fields=["NCTId", "Condition", "BriefTitle", 'BriefSummary', 'EligibilityCriteria'], | |
max_studies=500, | |
fmt="csv") | |
if split_criteria: | |
new_fields = [] | |
# 検索対象の文字列 | |
target_string1 = 'Exclusion Criteria' | |
target_string2 = 'Exclusion criteria' | |
# 各要素で検索対象の文字列を探し、直前で分割して新しいリストに格納 | |
for corona_field in corona_fields: | |
new_list = [] | |
for item in corona_field: | |
if target_string1 in item: | |
split_position = item.index(target_string1) | |
new_list.append(item[:split_position]) | |
new_list.append(item[split_position:]) | |
elif target_string2 in item: | |
split_position = item.index(target_string2) | |
new_list.append(item[:split_position]) | |
new_list.append(item[split_position:]) | |
else: | |
new_list.append(item) | |
new_fields.append(new_list) | |
else: | |
new_fields = corona_fields | |
if split_word_number: | |
split_fields = [] | |
for new_field in new_fields: | |
new_list= [] | |
# 各要素を調べて、700文字以上であれば分割し、新しいリストに格納 | |
for item in new_field: | |
item_length = len(item) | |
if item_length > split_number: | |
num_parts = -(-item_length // split_number) # 向上の除算を用いて分割数を計算 | |
for i in range(num_parts): | |
start_index = i * split_number | |
end_index = min((i + 1) * split_number, item_length) # 文字列の終わりを超えないように調整 | |
new_list.append(item[start_index:end_index]) | |
else: | |
new_list.append(item) | |
split_fields.append(new_list) | |
new_fields = split_fields | |
return new_fields | |
def print_agent_results(df, Ids, | |
interesteds = ['Condition', 'BriefTitle', 'BriefSummary', 'EligibilityCriteria'], | |
translater=None): | |
results = "" | |
for Id in Ids: | |
print("%s\n"%Id) | |
sdf = df[df['NCTId'] == Id] | |
for interested in interesteds: | |
# 最初の要素を取得 | |
results += '%s: \n %s \n' % (interested, sdf[interested].iloc[0]) | |
#print('%s: \n %s \n' % (interested, sdf[interested].iloc[0])) | |
if translater: | |
to_be_printed = translater.translate(results) | |
else: | |
to_be_printed =results | |
print(to_be_printed) | |
def search(query): | |
Entrez.email = os.getenv('MAIL_ADRESS') | |
#Entrez.email='[email protected]' | |
handle = Entrez.esearch(db='pubmed', | |
sort = 'relevance', | |
retmax = '20', | |
retmode = 'xml', | |
term = query) | |
results = Entrez.read(handle) | |
return results | |
def fetch_details(id_list): | |
ids = ','.join(id_list) | |
Entrez.email = os.getenv('MAIL_ADRESS') | |
#Entrez.email = '[email protected]' | |
handle = Entrez.efetch(db = 'pubmed', | |
retmode = 'xml', | |
id = ids) | |
results = Entrez.read(handle) | |
return results | |
'''def generate(prompt,engine=None): | |
if engine is None: | |
engine=gptengine | |
while True: #OpenAI APIが落ちてる時に無限リトライするので注意 | |
try: | |
response = openai.ChatCompletion.create( | |
model=engine, | |
messages=[ | |
{"role": "system", "content": "You are useful assistant"}, | |
{"role": "user", "content":prompt}, | |
] | |
) | |
result=response["choices"][0]["message"]["content"] | |
return result | |
except Exception as e: | |
print(e) | |
print("リトライ") | |
time.sleep(30) | |
pass | |
''' | |
def generate(prompt,engine=None): | |
if engine is None: | |
engine=gptengine | |
while True: #OpenAI APIが落ちてる時に無限リトライするので注意 | |
try: | |
response = openai.chat.completions.create( | |
model=engine, | |
messages=[ | |
{"role": "system", "content": "You are useful assistant"}, | |
{"role": "user", "content":prompt}, | |
] | |
) | |
#result=response["choices"][0]["message"]["content"] | |
result=response.choices[0].message.content | |
return result | |
except Exception as e: | |
print(e) | |
print("リトライ") | |
time.sleep(30) | |
pass | |
def GetPubmedSummaryDf(studies): | |
title_list= [] | |
abstract_list=[] | |
journal_list = [] | |
language_list =[] | |
pubdate_year_list = [] | |
pubdate_month_list = [] | |
studiesIdList = studies['IdList'] | |
chunk_size = 10000 | |
for chunk_i in range(0, len(studiesIdList), chunk_size): | |
chunk = studiesIdList[chunk_i:chunk_i + chunk_size] | |
try: | |
papers = fetch_details(chunk) | |
for i, paper in enumerate(papers['PubmedArticle']): | |
title_list.append(paper['MedlineCitation']['Article']['ArticleTitle']) | |
try: | |
abstract_list.append(paper['MedlineCitation']['Article']['Abstract']['AbstractText'][0]) | |
except: | |
abstract_list.append('No Abstract') | |
journal_list.append(paper['MedlineCitation']['Article']['Journal']['Title']) | |
language_list.append(paper['MedlineCitation']['Article']['Language'][0]) | |
try: | |
pubdate_year_list.append(paper['MedlineCitation']['Article']['Journal']['JournalIssue']['PubDate']['Year']) | |
except: | |
pubdate_year_list.append('No Data') | |
try: | |
pubdate_month_list.append(paper['MedlineCitation']['Article']['Journal']['JournalIssue']['PubDate']['Month']) | |
except: | |
pubdate_month_list.append('No Data') | |
except: # occasionally a chunk might annoy your parser | |
pass | |
df = pd.DataFrame(list(zip( | |
title_list, abstract_list, journal_list, language_list, pubdate_year_list, | |
pubdate_month_list)), | |
columns=['Title', 'Abstract', 'Journal', 'Language', 'Year','Month']) | |
return df, abstract_list | |
def ClinicalAgent(fileds, verbose=False): | |
df = pd.DataFrame.from_records(fileds[1:], columns=fileds[0]) | |
return create_pandas_dataframe_agent(OpenAI(temperature=0, model='gpt-3.5-turbo-16k'), df, verbose=verbose) | |
def GetNCTID(results): | |
# NCTで始まる単語を検索する正規表現 | |
pattern = r'\bNCT\d+\b' | |
# 正規表現を使って単語を抽出 | |
nct_words = re.findall(pattern,results) | |
return nct_words |