from pathlib import Path from transformers import T5ForConditionalGeneration, T5Tokenizer import torch import json import re import time import tqdm from label_studio_sdk.client import Client from getpass import getpass from huggingface_hub import login login(token = os.getenv('HF_TOKEN')) model_id = "RTLucassen/flan-t5-large-finetuned-melanocytic-lesion-reports" local_dir = "models" hf_hub_download( repo_id=repo_id, filename=model_id, local_dir = local_dir ) class ModelPredictor: def __init__(self, model_name, file_to_predict_on, data_dir = "./data/preprocessed_data/", model_dir="./models/"): """ Initializes the predictor with the model and the file for prediction. Parameters: - model_name: Name of the model directory. - file_to_predict_on: either training, validation or unlabeled data. """ self.base_dir = Path().resolve() self.data_dir = Path(data_dir) self.model_name = model_name self.preprocess_configurations = str("_") + model_name[model_name.find("context"):] self.path_model_file = self.base_dir / model_dir / self.model_name self.path_prediction_file = self.base_dir / data_dir / str(file_to_predict_on + "_set_input" + self.preprocess_configurations + ".json") # Load the model and tokenizer from the specified directory self.model = T5ForConditionalGeneration.from_pretrained(str(self.path_model_file)) self.tokenizer = T5Tokenizer.from_pretrained(str(self.path_model_file)) # Set device to CUDA if available, else CPU self.device = "cuda" if torch.cuda.is_available() else "cpu" self.model.to(self.device) def load_data(self, input_file): """ Loads the input data from a specified file. Parameters: - input_file: Path to the JSON file containing the data. Returns: A dictionary of the input instances. """ with open(input_file, "r") as f: input_instances = json.load(f) return input_instances def predict_sentence_batch(self, input_text): """ Predicts the output for a batch of sentences using the model. Parameters: - input_text: A list of sentences for prediction. Returns: A list of predicted sentences. """ # Tokenize input text; pad & truncate to handle variable lengths inputs = self.tokenizer(input_text, return_tensors="pt", padding=True, truncation=True, max_length=512) input_ids = inputs.input_ids.to(self.device) attention_mask = inputs.attention_mask.to(self.device) # Generate predictions with the model output_sequences = self.model.generate( input_ids=input_ids, attention_mask=attention_mask, max_length=512, early_stopping=False, # num_beams=6 ) # Decode the predictions into readable text return self.tokenizer.batch_decode(output_sequences, skip_special_tokens=True) def snug_class_labels(self, text): # Pattern to match spaces right before '<' or right after '>' and remove them pattern = r'\s+(?=<)|(?<=>)\s+' return re.sub(pattern, '', text) def predict(self, batch_size=10): """ Predicts the text for the entire dataset specified at initialization, in batches. Parameters: - batch_size: The number of sentences to predict in one go. Returns: A dictionary of predicted texts for each report. """ input_data = self.load_data(self.path_prediction_file) predicted_text = {} start = time.time() for id, report in tqdm.tqdm(input_data.items()): predicted_report_texts = [] current_batch = [] for sentence in report: current_batch.append(sentence) if len(current_batch) == batch_size: # Predict when the batch reaches the specified batch size batch_predictions = self.predict_sentence_batch(current_batch) predicted_report_texts.extend(batch_predictions) current_batch = [] # Reset the batch if current_batch: # Ensure any remaining sentences are processed batch_predictions = self.predict_sentence_batch(current_batch) predicted_report_texts.extend(batch_predictions) # Clean up class labels in the predicted text predicted_report_texts = [self.snug_class_labels(sentence) for sentence in predicted_report_texts] predicted_text[id] = predicted_report_texts print(f"Time taken: {time.time() - start} seconds") # Ensure the directory structure exists predictions_dir = self.base_dir / self.data_dir.parent / "predictions" predictions_dir.mkdir(parents=True, exist_ok=True) # Save the predictions to a file if "training" in str(self.path_prediction_file): output_path = self.base_dir / self.data_dir.parent / "predictions" / f"training_predicted{self.preprocess_configurations}.json" if "validation" in str(self.path_prediction_file): output_path = self.base_dir / self.data_dir.parent / "predictions" / f"validation_predicted{self.preprocess_configurations}.json" if "unlabeled" in str(self.path_prediction_file): output_path = self.base_dir / self.data_dir.parent / "predictions" / f"unlabeled_predicted{self.preprocess_configurations}.json" with open(output_path, "w") as f: json.dump(predicted_text, f, indent=4) return predicted_text BASE_DIR = Path().resolve() model_path = BASE_DIR / "models" # Find all models that are in the model folder models = [model.name for model in model_path.iterdir() if model.is_dir()] for model_name in models: # Predict on the unlabeled set example predictor = ModelPredictor(model_name, "unlabeled") predictor.predict() class LabelStudioClient: def __init__(self, api_key, url) -> None: if url is None: print("Label Studio URL not set as an environment variable") url = input("Enter the URL of the Label Studio instance: ") # secret input if api_key is None: print("Label Studio API key not set as an environment variable") api_key = getpass("Enter the API key of the Label Studio instance: ") # Create a client self.client = Client(url=url, api_key=api_key) def check_connection(self): try: self.client.check_connection() print("Connected to the Label Studio instance.") except Exception as e: print(f"Could not connect to the Label Studio instance. Error: {e}") return None def get_project(self, project_id): try: project = self.client.get_project(project_id) print(f"Project with id {project_id} found.") return project except Exception as e: print(f"Could not find the project with id {project_id}. Error: {e}") return None class PredictionResult: def __init__(self, id: int, start: int, end: int, score: float, text: str, labels: List[str], from_name: str = "label", to_name: str = "text", type: str = "labels"): self.id = id self.start = start self.end = end self.score = score self.text = text self.labels = labels self.from_name = from_name self.to_name = to_name self.type = type def to_dict(self): return { "id": self.id, "from_name": self.from_name, "to_name": self.to_name, "type": self.type, "value" : { "start": self.start, "end": self.end, "score": self.score, "text": self.text, "labels": self.labels } } class Prediction: def __init__(self, model_version: str, score: float, results: List[PredictionResult]): self.model_version = model_version self.score = score self.results = results def to_dict(self): return { "model_version": self.model_version, "score": self.score, "result": [result.to_dict() for result in self.results] } class DataEntry: def __init__(self, text: str, predictions: List[Prediction], id: int): self.task = id self.data = {"text": text} self.predictions = predictions def to_dict(self): return { "task": self.task, "data": self.data, "predictions": [prediction.to_dict() for prediction in self.predictions] } def object_to_json(obj): return json.dumps(obj, indent=3) def parse_labels(labeled_sentence): """ Parse the labels from the labeled sentence and maintain order information. """ parts = labeled_sentence.split("") labeled_texts = [] # Use a list to maintain order for part in parts: match = re.match(r"(.+)<(.{3,4})>", part) if match: text = match.group(1).strip() label = match.group(2) labeled_texts.append({'text': text, 'label': label}) return labeled_texts def reformat_predictions_for_import_existing_task(predictions: List[Dict]): """ If a task already exists in label studio, we can import the annotations to the existing task. Current format of "predictions"": [ { "task": task_id, "data": {"text": text}, "predictions": [{ "model_version": model_version, "score": score, "result": [annotation_results] }] } ] The following format is required: [ { "task": task_id, "result": [annotation_results] "model_version": model_version } ] """ new_predictions = [] for prediction in predictions: task_id = prediction["task"] result = prediction["predictions"][0]["result"] model_version = prediction["predictions"][0]["model_version"] new_predictions.append({ "task": task_id, "result": result, "model_version": model_version }) return new_predictions def import_annotations_existing_tasks(predictions: List[Dict], label_studio_project_ids: List[int], label_studio_api: str, label_studio_url: str): """ Import the annotations to existing tasks in label studio. https://github.com/HumanSignal/label-studio-sdk/blob/master/examples/import_preannotations/import_preannotations.ipynb Args: - predictions (List[Dict]): List of predictions in the format: [ { "task": task_id, "data": {"text": text}, "predictions": [{ "model_version": model_version, "score": score, "result": [annotation_results] }] } ] - label_studio_project_id List(int): List of project IDs in label studio - label_studio_api (str): The API key for label studio - label_studio_url (str): The URL of the label studio instance Returns: The response from the label studio API """ ls_client = Client(label_studio_url, label_studio_api) # Reformat the predictions new_predictions = reformat_predictions_for_import_existing_task(predictions) for label_studio_project_id in label_studio_project_ids: project = ls_client.get_project(int(label_studio_project_id)) task_ids = project.get_tasks_ids() prediction_for_this_project = [prediction for prediction in new_predictions if prediction["task"] in task_ids] if len(prediction_for_this_project) == 0: continue else: print(f"Importing annotations to project {label_studio_project_id}") print(project.create_predictions(prediction_for_this_project)) return "Annotations imported successfully." def segment_report(file): return '' input_files = gr.File() iface = gr.Interface( fn=segment_report, inputs=input_files, outputs=['text'], title='Segment Reports', description="This application helps segmenting medical report into meaningful fragments", theme=gr.themes.Soft(), ) iface.launch()