import xml.etree.ElementTree as ET from modules.utils import class_dict, error, warning import streamlit as st from modules.utils import class_dict, rescale_boxes import copy from xml.dom import minidom import numpy as np def find_position(pool_index, BPMN_id): """ Find the position of the pool index in the BPMN_id list. Args: pool_index (str): The pool index to search for. BPMN_id (list): List of BPMN IDs. Returns: int: The index of the pool_index in BPMN_id, or None if not found. """ if pool_index in BPMN_id: position = BPMN_id.index(pool_index) else: position = None error(f"Problem with the pool index {pool_index} in the BPMN_id") return position # Calculate the center of each bounding box and group them by pool def calculate_centers_and_group_by_pool(pred, class_dict): """ Calculate the center coordinates of bounding boxes and group them by pool. Args: pred (dict): Dictionary containing prediction results, including 'pool_dict', 'boxes', and 'labels'. class_dict (dict): Dictionary mapping class indices to class names. Returns: dict: Dictionary grouping centers and their indices by pool index. """ pool_groups = {} for pool_index, element_indices in pred['pool_dict'].items(): pool_groups[pool_index] = [] for i in element_indices: if i >= len(pred['labels']): continue if class_dict[pred['labels'][i]] not in ['dataObject', 'dataStore']: x1, y1, x2, y2 = pred['boxes'][i] center = [(x1 + x2) / 2, (y1 + y2) / 2] # Compute the center of the bounding box pool_groups[pool_index].append((center, i)) return pool_groups # Group centers within a specified range def group_centers(centers, axis, range_=50): """ Group centers based on a specified range along an axis. Args: centers (list): List of center coordinates and their indices. axis (int): The axis (0 for x, 1 for y) to group centers along. range_ (int): Maximum distance to consider centers as part of the same group. Returns: list: List of groups, where each group is a list of centers and indices. """ groups = [] while centers: center, idx = centers.pop(0) group = [(center, idx)] for other_center, other_idx in centers[:]: if abs(center[axis] - other_center[axis]) <= range_: group.append((other_center, other_idx)) centers.remove((other_center, other_idx)) groups.append(group) return groups # Align the elements within each pool def align_elements_within_pool(modified_pred, pool_groups, class_dict, size): """ Align elements within each pool based on their centers. Args: modified_pred (dict): Dictionary containing the modified predictions. pool_groups (dict): Dictionary grouping centers and their indices by pool index. class_dict (dict): Dictionary mapping class indices to class names. size (dict): Dictionary containing element sizes. """ for pool_index, centers in pool_groups.items(): # Align elements based on y-coordinates y_groups = group_centers(centers.copy(), axis=1) align_y_coordinates(modified_pred, y_groups, class_dict, size) # Recalculate centers after y-alignment and then align based on x-coordinates centers = recalculate_centers(modified_pred, y_groups) x_groups = group_centers(centers.copy(), axis=0) align_x_coordinates(modified_pred, x_groups, class_dict, size) # Align the y-coordinates of the centers of grouped bounding boxes def align_y_coordinates(modified_pred, y_groups, class_dict, size): """ Align the y-coordinates of elements in each group. Args: modified_pred (dict): Dictionary containing the modified predictions. y_groups (list): List of groups of centers and their indices, grouped by y-coordinate. class_dict (dict): Dictionary mapping class indices to class names. size (dict): Dictionary containing element sizes. """ for group in y_groups: avg_y = sum([c[0][1] for c in group]) / len(group) # Compute the average y-coordinate for (center, idx) in group: label = class_dict[modified_pred['labels'][idx]] if label in size: new_center = (center[0], avg_y) modified_pred['boxes'][idx] = [ new_center[0] - size[label][0] / 2, new_center[1] - size[label][1] / 2, new_center[0] + size[label][0] / 2, new_center[1] + size[label][1] / 2 ] # Recalculate centers after alignment def recalculate_centers(modified_pred, groups): """ Recalculate the centers of bounding boxes after alignment. Args: modified_pred (dict): Dictionary containing the modified predictions. groups (list): List of groups of centers and their indices. Returns: list: List of recalculated centers and their indices. """ centers = [] for group in groups: for center, idx in group: x1, y1, x2, y2 = modified_pred['boxes'][idx] center = [(x1 + x2) / 2, (y1 + y2) / 2] # Recompute the center after alignment centers.append((center, idx)) return centers # Align the x-coordinates of the centers of grouped bounding boxes def align_x_coordinates(modified_pred, x_groups, class_dict, size): """ Align the x-coordinates of elements in each group. Args: modified_pred (dict): Dictionary containing the modified predictions. x_groups (list): List of groups of centers and their indices, grouped by x-coordinate. class_dict (dict): Dictionary mapping class indices to class names. size (dict): Dictionary containing element sizes. """ for group in x_groups: avg_x = sum([c[0][0] for c in group]) / len(group) # Compute the average x-coordinate for (center, idx) in group: label = class_dict[modified_pred['labels'][idx]] if label in size: new_center = (avg_x, center[1]) modified_pred['boxes'][idx] = [ new_center[0] - size[label][0] / 2, modified_pred['boxes'][idx][1], new_center[0] + size[label][0] / 2, modified_pred['boxes'][idx][3] ] # Expand the pool bounding boxes to fit the aligned elements def expand_pool_bounding_boxes(modified_pred, size_elements): """ Expand the bounding boxes of pools to fit aligned elements. Args: modified_pred (dict): Dictionary containing the modified predictions. size_elements (dict): Dictionary containing element sizes. """ for idx, (pool_index, keep_elements) in enumerate(modified_pred['pool_dict'].items()): if len(keep_elements) != 0: marge = size_elements['task'][1] // 2 else: marge = 0 position = find_position(pool_index, modified_pred['BPMN_id']) if keep_elements == [] and position is not None: min_x, min_y, max_x, max_y = modified_pred['boxes'][position] else: min_x, min_y, max_x, max_y = calculate_pool_bounds(modified_pred['boxes'], modified_pred['labels'], keep_elements, size_elements) pool_width = max_x - min_x pool_height = max_y - min_y if pool_width < 300 or pool_height < 30: error("The pool is maybe too small, please add more elements or increase the scale by zooming on the image.") continue # Update the pool bounding box with margin modified_pred['boxes'][position] = [min_x - marge, min_y - marge//2, min_x + pool_width + marge, min_y + pool_height + marge//2] # Adjust left and right boundaries of all pools def adjust_pool_boundaries(modified_pred, pred): """ Adjust the left and right boundaries of all pools to ensure they cover all elements. Args: modified_pred (dict): Dictionary containing the modified predictions. pred (dict): Dictionary containing original prediction results. """ min_left, max_right = 0, 0 for pool_index, element_indices in pred['pool_dict'].items(): position = find_position(pool_index, modified_pred['BPMN_id']) if position is None or position >= len(modified_pred['boxes']): continue x1, y1, x2, y2 = modified_pred['boxes'][position] left = x1 right = x2 if left < min_left: min_left = left if right > max_right: max_right = right for pool_index, element_indices in pred['pool_dict'].items(): position = find_position(pool_index, modified_pred['BPMN_id']) if position is None or position >= len(modified_pred['boxes']): continue x1, y1, x2, y2 = modified_pred['boxes'][position] if x1 > min_left: x1 = min_left if x2 < max_right: x2 = max_right # Update the pool bounding box with adjusted boundaries modified_pred['boxes'][position] = [x1, y1, x2, y2] # Main function to align boxes def align_boxes(pred, size, class_dict): """ Main function to align bounding boxes for the given prediction data. Args: pred (dict): Dictionary containing prediction results. size (dict): Dictionary containing element sizes. class_dict (dict): Dictionary mapping class indices to class names. Returns: list: List of aligned bounding boxes. """ modified_pred = copy.deepcopy(pred) pool_groups = calculate_centers_and_group_by_pool(pred, class_dict) align_elements_within_pool(modified_pred, pool_groups, class_dict, size) if len(pred['pool_dict']) > 1: expand_pool_bounding_boxes(modified_pred, size) adjust_pool_boundaries(modified_pred, pred) return modified_pred['boxes'] # Function to create a BPMN XML file from prediction results def create_XML(full_pred, text_mapping, size_scale, scale): """ Create a BPMN XML file from the prediction results. Args: full_pred (dict): Dictionary containing full prediction results. text_mapping (dict): Dictionary mapping BPMN IDs to text labels. size_scale (float): Scaling factor for element sizes. scale (float): Scaling factor for bounding boxes. Returns: str: Pretty-printed BPMN XML string. """ namespaces = { 'bpmn': 'http://www.omg.org/spec/BPMN/20100524/MODEL', 'bpmndi': 'http://www.omg.org/spec/BPMN/20100524/DI', 'di': 'http://www.omg.org/spec/DD/20100524/DI', 'dc': 'http://www.omg.org/spec/DD/20100524/DC', 'xsi': 'http://www.w3.org/2001/XMLSchema-instance' } definitions = ET.Element('bpmn:definitions', { 'xmlns:xsi': namespaces['xsi'], 'xmlns:bpmn': namespaces['bpmn'], 'xmlns:bpmndi': namespaces['bpmndi'], 'xmlns:di': namespaces['di'], 'xmlns:dc': namespaces['dc'], 'targetNamespace': "http://example.bpmn.com", 'id': "simpleExample" }) size_elements = get_size_elements(size_scale) # If there is no pool or lane, create a pool with all elements if len(full_pred['pool_dict']) == 0 or (len(full_pred['pool_dict']) == 1 and len(next(iter(full_pred['pool_dict'].values()))) == len(full_pred['labels'])): full_pred, text_mapping = create_big_pool(full_pred, text_mapping, size_elements) # Backup the original box positions old_boxes = copy.deepcopy(full_pred) # Create BPMN collaboration element collaboration = ET.SubElement(definitions, 'bpmn:collaboration', id='collaboration_1') # Create BPMN process elements process = [] for idx in range(len(full_pred['pool_dict'].items())): process_id = f'process_{idx+1}' process.append(ET.SubElement(definitions, 'bpmn:process', id=process_id, isExecutable='false')) bpmndi = ET.SubElement(definitions, 'bpmndi:BPMNDiagram', id='BPMNDiagram_1') bpmnplane = ET.SubElement(bpmndi, 'bpmndi:BPMNPlane', id='BPMNPlane_1', bpmnElement='collaboration_1') # Rescale and align bounding boxes full_pred['boxes'] = rescale_boxes(scale, old_boxes['boxes']) full_pred['boxes'] = align_boxes(full_pred, size_elements, class_dict) # Add diagram elements for each pool for idx, (pool_index, keep_elements) in enumerate(full_pred['pool_dict'].items()): pool_id = f'participant_{idx+1}' pool = ET.SubElement(collaboration, 'bpmn:participant', id=pool_id, processRef=f'process_{idx+1}', name=text_mapping[pool_index]) position = find_position(pool_index, full_pred['BPMN_id']) if position >= len(full_pred['boxes']): print("Problem with the index") continue min_x, min_y, max_x, max_y = full_pred['boxes'][position] pool_width = max_x - min_x pool_height = max_y - min_y add_diagram_elements(bpmnplane, pool_id, min_x, min_y, pool_width, pool_height) # Create BPMN elements for each pool for idx, (pool_index, keep_elements) in enumerate(full_pred['pool_dict'].items()): create_bpmn_object(process[idx], bpmnplane, text_mapping, definitions, size_elements, full_pred, keep_elements) # Create message flow elements message_flows = [i for i, label in enumerate(full_pred['labels']) if class_dict[label] == 'messageFlow'] for idx in message_flows: create_flow_element(bpmnplane, text_mapping, idx, size_elements, full_pred, collaboration, message=True) # Create sequence flow elements for idx, (pool_index, keep_elements) in enumerate(full_pred['pool_dict'].items()): for i in keep_elements: if i >= len(full_pred['labels']): print("Problem with the index") continue if full_pred['labels'][i] == list(class_dict.values()).index('sequenceFlow'): create_flow_element(bpmnplane, text_mapping, i, size_elements, full_pred, process[idx], message=False) # Generate pretty XML string tree = ET.ElementTree(definitions) rough_string = ET.tostring(definitions, 'utf-8') reparsed = minidom.parseString(rough_string) pretty_xml_as_string = reparsed.toprettyxml(indent=" ") # Restore the original box positions full_pred['boxes'] = rescale_boxes(1/scale, full_pred['boxes']) full_pred['boxes'] = old_boxes return pretty_xml_as_string # Function that creates a single pool with all elements def create_big_pool(full_pred, text_mapping, size_elements, marge=50): """ Create a single pool containing all elements if no pools or lanes are detected. Args: full_pred (dict): Dictionary containing full prediction results. text_mapping (dict): Dictionary mapping BPMN IDs to text labels. size_elements (dict): Dictionary containing element sizes. marge (int, optional): Margin to add around the pool. Defaults to 50. Returns: tuple: Updated full_pred and text_mapping. """ new_pool_index = 'pool_1' size_elements = get_size_elements(st.session_state.size_scale) elements_pool = list(range(len(full_pred['boxes']))) min_x, min_y, max_x, max_y = calculate_pool_bounds(full_pred['boxes'], full_pred['labels'], elements_pool, size_elements) box = [min_x - marge, min_y - marge//2, max_x + marge, max_y + marge//2] full_pred['boxes'] = np.append(full_pred['boxes'], [box], axis=0) full_pred['pool_dict'][new_pool_index] = elements_pool full_pred['BPMN_id'].append('pool_1') text_mapping['pool_1'] = 'Process' print(f"Created a big pool index {new_pool_index} with elements: {elements_pool}") return full_pred, text_mapping # Function that gives the size of the elements def get_size_elements(size_scale=1): """ Get the sizes of BPMN elements based on the scaling factor. Args: size_scale (float, optional): Scaling factor for element sizes. Defaults to 1. Returns: dict: Dictionary containing element sizes. """ size_elements = { 'event': (size_scale * 43.2, size_scale * 43.2), 'task': (size_scale * 120, size_scale * 96), 'message': (size_scale * 43.2, size_scale * 43.2), 'messageEvent': (size_scale * 43.2, size_scale * 43.2), 'exclusiveGateway': (size_scale * 60, size_scale * 60), 'parallelGateway': (size_scale * 60, size_scale * 60), 'dataObject': (size_scale * 48, size_scale * 72), 'dataStore': (size_scale * 72, size_scale * 72), 'subProcess': (size_scale * 144, size_scale * 108), 'eventBasedGateway': (size_scale * 60, size_scale * 60), 'timerEvent': (size_scale * 48, size_scale * 48), } return size_elements def rescale(scale, boxes): """ Rescale the bounding boxes by a given scaling factor. Args: scale (float): Scaling factor. boxes (list): List of bounding boxes. Returns: list: Rescaled bounding boxes. """ for i in range(len(boxes)): boxes[i] = [boxes[i][0] * scale, boxes[i][1] * scale, boxes[i][2] * scale, boxes[i][3] * scale] return boxes # Function to create the unique BPMN_id def create_BPMN_id(labels, pool_dict): """ Create unique BPMN IDs for each element based on their labels. Args: labels (list): List of labels for each element. pool_dict (dict): Dictionary containing pool indices and their elements. Returns: tuple: List of BPMN IDs and updated pool dictionary. """ #change the label to task if it's subProcess for i in range(len(labels)): if labels[i] == list(class_dict.values()).index('subProcess'): labels[i] = list(class_dict.values()).index('task') BPMN_id = [class_dict[labels[i]] for i in range(len(labels))] data_counter = 1 enums = { 'event': 1, 'task': 1, 'sequenceFlow': 1, 'messageFlow': 1, 'message_event': 1, 'exclusiveGateway': 1, 'parallelGateway': 1, 'dataAssociation': 1, 'pool': 1, 'timerEvent': 1, 'eventBasedGateway': 1 } BPMN_name = [class_dict[label] for label in labels] for idx, Bpmn_id in enumerate(BPMN_name): key = { 'event': 'event', 'task': 'task', 'dataObject': 'dataObject', 'sequenceFlow': 'sequenceFlow', 'messageFlow': 'messageFlow', 'messageEvent': 'message_event', 'exclusiveGateway': 'exclusiveGateway', 'parallelGateway': 'parallelGateway', 'dataAssociation': 'dataAssociation', 'pool': 'pool', 'dataStore': 'dataStore', 'timerEvent': 'timerEvent', 'eventBasedGateway': 'eventBasedGateway' }.get(Bpmn_id, None) if key: if key in ['dataObject', 'dataStore']: BPMN_id[idx] = f'{key}_{data_counter}' data_counter += 1 else: BPMN_id[idx] = f'{key}_{enums[key]}' enums[key] += 1 # Update the pool_dict keys with their corresponding BPMN_id values updated_pool_dict = {} for key, value in pool_dict.items(): if key < len(BPMN_id): new_key = BPMN_id[key] updated_pool_dict[new_key] = value return BPMN_id, updated_pool_dict def add_diagram_elements(parent, element_id, x, y, width, height): """ Utility to add BPMN diagram notation for elements. Args: parent (Element): The parent XML element. element_id (str): The ID of the BPMN element. x (float): The x-coordinate of the element. y (float): The y-coordinate of the element. width (float): The width of the element. height (float): The height of the element. """ shape = ET.SubElement(parent, 'bpmndi:BPMNShape', attrib={ 'bpmnElement': element_id, 'id': element_id + '_di' }) bounds = ET.SubElement(shape, 'dc:Bounds', attrib={ 'x': str(x), 'y': str(y), 'width': str(width), 'height': str(height) }) def add_diagram_edge(parent, element_id, waypoints): """ Utility to add BPMN diagram notation for sequence flows. Args: parent (Element): The parent XML element. element_id (str): The ID of the BPMN element. waypoints (list): List of waypoints for the sequence flow. """ edge = ET.SubElement(parent, 'bpmndi:BPMNEdge', attrib={ 'bpmnElement': element_id, 'id': element_id + '_di' }) for x, y in waypoints: if x is None or y is None: return ET.SubElement(edge, 'di:waypoint', attrib={ 'x': str(x), 'y': str(y) }) def check_status(link, keep_elements): """ Check the status of a link in terms of its position within the elements. Args: link (tuple): A tuple representing the start and end of the link. keep_elements (list): List of elements to keep. Returns: str: Status of the link ('middle', 'start', or 'end'). """ if link[0] in keep_elements and link[1] in keep_elements: return 'middle' elif link[0] is None and link[1] in keep_elements: return 'start' elif link[0] in keep_elements and link[1] is None: return 'end' else: return 'middle' def check_data_association(i, links, labels, keep_elements): """ Check data associations for an element. Args: i (int): Index of the current element. links (list): List of links between elements. labels (list): List of labels for each element. keep_elements (list): List of elements to keep. Returns: tuple: Status and indices of data associations. """ status, links_idx = [], [] for j, (k, l) in enumerate(links): if labels[j] == list(class_dict.values()).index('dataAssociation'): if k == i: status.append('output') links_idx.append(j) elif l == i: status.append('input') links_idx.append(j) return status, links_idx def create_data_Association(bpmn, data, size, element_id, current_idx, source_id, target_id): """ Create a data association in the BPMN diagram. Args: bpmn (Element): The parent XML element. data (dict): Dictionary containing prediction results. size (dict): Dictionary containing element sizes. element_id (str): The ID of the BPMN element. current_idx (int): Index of the current element. source_id (str): The source element ID. target_id (str): The target element ID. """ waypoints = calculate_waypoints(data, size, current_idx, source_id, target_id) if waypoints is not None: add_diagram_edge(bpmn, element_id, waypoints) def check_eventBasedGateway(i, links, labels): """ Check event-based gateway for an element. Args: i (int): Index of the current element. links (list): List of links between elements. labels (list): List of labels for each element. Returns: tuple: Status and indices of event-based gateway. """ status, links_idx = [], [] for j, (k, l) in enumerate(links): if labels[j] == list(class_dict.values()).index('sequenceFlow'): if k == i: status.append('output') links_idx.append(j) elif l == i: status.append('input') links_idx.append(j) return status, links_idx # Function to dynamically create and layout BPMN elements def create_bpmn_object(process, bpmnplane, text_mapping, definitions, size, data, keep_elements): """ Dynamically create and layout BPMN elements. Args: process (Element): The BPMN process element. bpmnplane (Element): The BPMN plane element. text_mapping (dict): Dictionary mapping BPMN IDs to text labels. definitions (Element): The BPMN definitions element. size (dict): Dictionary containing element sizes. data (dict): Dictionary containing prediction results. keep_elements (list): List of elements to keep. """ elements = data['BPMN_id'] positions = data['boxes'] links = data['links'] for i in keep_elements: if i >= len(elements): print("Problem with the index") continue element_id = elements[i] if element_id is None: continue element_type = element_id.split('_')[0] x, y = positions[i][:2] # Start Event if element_type == 'event': status = check_status(links[i], keep_elements) if status == 'start': element = ET.SubElement(process, 'bpmn:startEvent', id=element_id, name=text_mapping[element_id]) elif status == 'middle': element = ET.SubElement(process, 'bpmn:intermediateCatchEvent', id=element_id, name=text_mapping[element_id]) elif status == 'end': element = ET.SubElement(process, 'bpmn:endEvent', id=element_id, name=text_mapping[element_id]) add_diagram_elements(bpmnplane, element_id, x, y, size['event'][0], size['event'][1]) # Task elif element_type == 'task': element = ET.SubElement(process, 'bpmn:task', id=element_id, name=text_mapping[element_id]) status, datasAssociation_idx = check_data_association(i, data['links'], data['labels'], keep_elements) if len(status) != 0: for state, dataAssociation_idx in zip(status, datasAssociation_idx): # Handle Data Input Association if state == 'input': dataObject_idx = links[dataAssociation_idx][0] dataObject_name = elements[dataObject_idx] dataObject_ref = f'DataObjectReference_{dataObject_name.split("_")[1]}' ET.SubElement(element, 'bpmn:property', id=f'Property_{dataAssociation_idx}_{dataObject_ref.split("_")[1]}', name='__targetRef_placeholder') sub_element = ET.SubElement(element, 'bpmn:dataInputAssociation', id=f'dataInAsso_{dataAssociation_idx}_{dataObject_ref.split("_")[1]}') ET.SubElement(sub_element, 'bpmn:sourceRef').text = dataObject_ref ET.SubElement(sub_element, 'bpmn:targetRef').text = f"Property_{dataAssociation_idx}_{dataObject_ref.split('_')[1]}" create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], dataAssociation_idx, dataObject_name, element_id) # Handle Data Output Association elif state == 'output': dataObject_idx = links[dataAssociation_idx][1] dataObject_name = elements[dataObject_idx] dataObject_ref = f'DataObjectReference_{dataObject_name.split("_")[1]}' sub_element = ET.SubElement(element, 'bpmn:dataOutputAssociation', id=f'dataOutAsso_{dataAssociation_idx}_{dataObject_ref.split("_")[1]}') ET.SubElement(sub_element, 'bpmn:targetRef').text = dataObject_ref create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], dataAssociation_idx, element_id, dataObject_name) add_diagram_elements(bpmnplane, element_id, x, y, size['task'][0], size['task'][1]) # Message Events (Start, Intermediate, End) elif element_type == 'message': status = check_status(links[i], keep_elements) if status == 'start': element = ET.SubElement(process, 'bpmn:startEvent', id=element_id, name=text_mapping[element_id]) elif status == 'middle': element = ET.SubElement(process, 'bpmn:intermediateCatchEvent', id=element_id, name=text_mapping[element_id]) elif status == 'end': element = ET.SubElement(process, 'bpmn:endEvent', id=element_id, name=text_mapping[element_id]) status, datasAssociation_idx = check_data_association(i, data['links'], data['labels'], keep_elements) if len(status) != 0: for state, dataAssociation_idx in zip(status, datasAssociation_idx): # Handle Data Input Association if state == 'input': dataObject_idx = links[dataAssociation_idx][0] dataObject_name = elements[dataObject_idx] dataObject_ref = f'DataObjectReference_{dataObject_name.split("_")[1]}' sub_element = ET.SubElement(element, 'bpmn:dataInputAssociation', id=f'dataInAsso_{dataAssociation_idx}_{dataObject_ref.split("_")[1]}') ET.SubElement(sub_element, 'bpmn:sourceRef').text = dataObject_ref create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], dataAssociation_idx, dataObject_name, element_id) # Handle Data Output Association elif state == 'output': dataObject_idx = links[dataAssociation_idx][1] dataObject_name = elements[dataObject_idx] dataObject_ref = f'DataObjectReference_{dataObject_name.split("_")[1]}' sub_element = ET.SubElement(element, 'bpmn:dataOutputAssociation', id=f'dataOutAsso_{dataAssociation_idx}_{dataObject_ref.split("_")[1]}') ET.SubElement(sub_element, 'bpmn:targetRef').text = dataObject_ref create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], dataAssociation_idx, element_id, dataObject_name) ET.SubElement(element, 'bpmn:messageEventDefinition', id=f'MessageEventDefinition_{i+1}') add_diagram_elements(bpmnplane, element_id, x, y, size['message'][0], size['message'][1]) # Gateways (Exclusive, Parallel) elif element_type in ['exclusiveGateway', 'parallelGateway']: gateway_type = 'exclusiveGateway' if element_type == 'exclusiveGateway' else 'parallelGateway' element = ET.SubElement(process, f'bpmn:{gateway_type}', id=element_id) add_diagram_elements(bpmnplane, element_id, x, y, size[element_type][0], size[element_type][1]) elif element_type == 'eventBasedGateway': element = ET.SubElement(process, 'bpmn:eventBasedGateway', id=element_id) status, links_idx = check_eventBasedGateway(i, data['links'], data['labels']) if len(status) != 0: for state, link_idx in zip(status, links_idx): # Handle Data Input Association if state == 'input' : gateway_idx = links[link_idx][0] gateway_name = elements[gateway_idx] sub_element = ET.SubElement(element, 'bpmn:eventBasedGateway', id=f'eventBasedGateway_{link_idx}_{gateway_name.split("_")[1]}') create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], i, gateway_name, element_id) # Handle Data Output Association elif state == 'output': gateway_idx = links[link_idx][1] gateway_name = elements[gateway_idx] sub_element = ET.SubElement(element, 'bpmn:eventBasedGateway', id=f'eventBasedGateway_{link_idx}_{gateway_name.split("_")[1]}') create_data_Association(bpmnplane, data, size, sub_element.attrib['id'], i, element_id, gateway_name) add_diagram_elements(bpmnplane, element_id, x, y, size['eventBasedGateway'][0], size['eventBasedGateway'][1]) # Data Object elif element_type == 'dataObject' or element_type == 'dataStore': #print('ici dataObject', element_id) dataObject_idx = element_id.split('_')[1] dataObject_ref = f'DataObjectReference_{dataObject_idx}' if element_type == 'dataObject': ET.SubElement(process, 'bpmn:dataObjectReference', id=dataObject_ref, dataObjectRef=element_id, name=text_mapping[element_id]) ET.SubElement(process, f'bpmn:{element_type}', id=element_id) elif element_type == 'dataStore': ET.SubElement(process, 'bpmn:dataStoreReference', id=dataObject_ref, name=text_mapping[element_id]) add_diagram_elements(bpmnplane, dataObject_ref, x, y, size[element_type][0], size[element_type][1]) # Timer Event elif element_type == 'timerEvent': element = ET.SubElement(process, 'bpmn:intermediateCatchEvent', id=element_id, name=text_mapping[element_id]) ET.SubElement(element, 'bpmn:timerEventDefinition', id=f'TimerEventDefinition_{i+1}') add_diagram_elements(bpmnplane, element_id, x, y, size['timerEvent'][0], size['timerEvent'][1]) def calculate_pool_bounds(boxes, labels, keep_elements, size=None, class_dict=None): """ Calculate the bounding box for a pool. Args: boxes (list): List of bounding boxes. labels (list): List of labels for each element. keep_elements (list): List of elements to keep. size (dict, optional): Dictionary containing element sizes. Defaults to None. class_dict (dict, optional): Dictionary mapping class indices to class names. Defaults to None. Returns: tuple: Minimum and maximum x and y coordinates of the pool. """ min_x, min_y = float('inf'), float('inf') max_x, max_y = float('-inf'), float('-inf') for i in keep_elements: if i >= len(labels): print(f"Problem with the index: {i}") continue element = labels[i] if element in {None, 7, 13, 14, 15}: continue if size is None or class_dict is None: element_width = boxes[i][2] - boxes[i][0] element_height = boxes[i][3] - boxes[i][1] else: if labels[i] in class_dict: element_width, element_height = size[class_dict[labels[i]]] else: print(f"Class label {labels[i]} not found in class_dict.") continue x, y = boxes[i][:2] min_x = min(min_x, x) min_y = min(min_y, y) max_x = max(max_x, x + element_width) max_y = max(max_y, y + element_height) return min_x, min_y, max_x, max_y def calculate_pool_waypoints(idx, data, size, source_idx, target_idx, source_element, target_element): """ Calculate waypoints for connecting elements within a pool. Args: idx (int): Index of the current element. data (dict): Dictionary containing prediction results. size (dict): Dictionary containing element sizes. source_idx (int): Index of the source element. target_idx (int): Index of the target element. source_element (str): Source element type. target_element (str): Target element type. Returns: list: List of waypoints for the connection. """ # Get the bounding boxes of the source and target elements source_box = data['boxes'][source_idx] target_box = data['boxes'][target_idx] # Get the midpoints of the source element source_mid_x = (source_box[0] + source_box[2]) / 2 source_mid_y = (source_box[1] + source_box[3]) / 2 # Check if the connection involves a pool if source_element == 'pool': if target_element == 'pool': return [(source_mid_x, source_mid_y), (source_mid_x, source_mid_y)] pool_box = source_box element_box = (target_box[0], target_box[1], target_box[0]+size[target_element][0], target_box[1]+size[target_element][1]) element_mid_x = (element_box[0] + element_box[2]) / 2 element_mid_y = (element_box[1] + element_box[3]) / 2 # Connect the pool's bottom or top side to the target element's top or bottom center if pool_box[3] < element_box[1]: # Pool is above the target element waypoints = [(element_mid_x, pool_box[3]), (element_mid_x, element_box[1])] else: # Pool is below the target element waypoints = [(element_mid_x, element_box[3]), (element_mid_x, pool_box[1])] else: pool_box = target_box element_box = (source_box[0], source_box[1], source_box[0]+size[source_element][0], source_box[1]+size[source_element][1]) element_mid_x = (element_box[0] + element_box[2]) / 2 element_mid_y = (element_box[1] + element_box[3]) / 2 # Connect the element's bottom or top center to the pool's top or bottom side if pool_box[3] < element_box[1]: # Pool is above the target element waypoints = [(element_mid_x, element_box[1]), (element_mid_x, pool_box[3])] else: # Pool is below the target element waypoints = [(element_mid_x, element_box[3]), (element_mid_x, pool_box[1])] return waypoints def add_curve(waypoints, pos_source, pos_target, threshold=30): """ Add a single curve to the sequence flow by introducing a control point. The control point is added at an offset from the midpoint of the original waypoints. Args: waypoints (list): List of waypoints representing the path. pos_source (str): Position of the source element ('left', 'right', 'top', 'bottom'). pos_target (str): Position of the target element ('left', 'right', 'top', 'bottom'). threshold (int, optional): Minimum distance to consider for adding a curve. Defaults to 30. Returns: list: List of waypoints with the added control point if applicable. """ if len(waypoints) < 2: return waypoints # Extract start and end points start_point = waypoints[0] end_point = waypoints[1] start_x, start_y = start_point end_x, end_y = end_point pos_horizontal = ['left', 'right'] pos_vertical = ['top', 'bottom'] if abs(start_x - end_x) < threshold or abs(start_y - end_y) < threshold: return waypoints # Calculate the control point based on source and target positions if pos_source in pos_horizontal and pos_target in pos_horizontal: control_point = None elif pos_source in pos_vertical and pos_target in pos_vertical: control_point = None elif pos_source in pos_horizontal and pos_target in pos_vertical: control_point = (end_x, start_y) elif pos_source in pos_vertical and pos_target in pos_horizontal: control_point = (start_x, end_y) else: control_point = None # Create the curved path if control_point is not None: curved_waypoints = [start_point, control_point, end_point] else: curved_waypoints = [start_point, end_point] return curved_waypoints def calculate_waypoints(data, size, current_idx, source_id, target_id): """ Calculate waypoints for connecting two elements in the diagram. Args: data (dict): Data containing diagram information. size (dict): Dictionary of element sizes. current_idx (int): Index of the current element. source_id (str): ID of the source element. target_id (str): ID of the target element. Returns: list: List of waypoints for the connection. """ best_points = data['best_points'][current_idx] pos_source = best_points[0] pos_target = best_points[1] source_idx = data['BPMN_id'].index(source_id) target_idx = data['BPMN_id'].index(target_id) if source_idx == target_idx: warning() return None if source_idx is None or target_idx is None: warning() return None name_source = source_id.split('_')[0] name_target = target_id.split('_')[0] avoid_element = ['pool', 'sequenceFlow', 'messageFlow', 'dataAssociation'] if name_target in avoid_element or name_source in avoid_element: warning() return None # Get the position of the source and target source_x, source_y = data['boxes'][source_idx][:2] target_x, target_y = data['boxes'][target_idx][:2] if name_source == 'pool' or name_target == 'pool': warning() return [(source_x, source_y), (target_x, target_y)] # Adjust the source coordinates based on its position if pos_source == 'left': source_x = source_x source_y += size[name_source][1] / 2 elif pos_source == 'right': source_x += size[name_source][0] source_y += size[name_source][1] / 2 elif pos_source == 'top': source_x += size[name_source][0] / 2 source_y = source_y elif pos_source == 'bottom': source_x += size[name_source][0] / 2 source_y += size[name_source][1] # Adjust the target coordinates based on its position if pos_target == 'left': target_x = target_x target_y += size[name_target][1] / 2 elif pos_target == 'right': target_x += size[name_target][0] target_y += size[name_target][1] / 2 elif pos_target == 'top': target_x += size[name_target][0] / 2 target_y = target_y elif pos_target == 'bottom': target_x += size[name_target][0] / 2 target_y += size[name_target][1] waypoints = [(source_x, source_y), (target_x, target_y)] # Add curve if no obstacles are in the path if data['labels'][current_idx] == list(class_dict.values()).index('sequenceFlow'): curved_waypoints = add_curve(waypoints, pos_source, pos_target) else: curved_waypoints = waypoints return curved_waypoints def create_flow_element(bpmn, text_mapping, idx, size, data, parent, message=False): """ Create a BPMN flow element (sequence flow or message flow) and add it to the BPMN diagram. Args: bpmn (ET.Element): The BPMN diagram element. text_mapping (dict): Dictionary mapping element IDs to their text labels. idx (int): Index of the current element. size (dict): Dictionary of element sizes. data (dict): Data containing diagram information. parent (ET.Element): The parent element to which the flow element is added. message (bool, optional): Whether the flow is a message flow. Defaults to False. """ source_idx, target_idx = data['links'][idx] if source_idx is None or target_idx is None: warning() return source_id, target_id = data['BPMN_id'][source_idx], data['BPMN_id'][target_idx] if message: element_id = f'messageflow_{source_id}_{target_id}' else: element_id = f'sequenceflow_{source_id}_{target_id}' if message: if source_id.split('_')[0] == 'pool' or target_id.split('_')[0] == 'pool': waypoints = calculate_pool_waypoints(idx, data, size, source_idx, target_idx, source_id.split('_')[0], target_id.split('_')[0]) if source_id.split('_')[0] == 'pool': XML_source_id = f"participant_{source_id.split('_')[1]}" XML_target_id = target_id if target_id.split('_')[0] == 'pool': XML_target_id = f"participant_{target_id.split('_')[1]}" XML_source_id = source_id element = ET.SubElement(parent, 'bpmn:messageFlow', id=element_id, sourceRef=XML_source_id, targetRef=XML_target_id, name=text_mapping[data['BPMN_id'][idx]]) else: waypoints = calculate_waypoints(data, size, idx, source_id, target_id) if waypoints is None: return element = ET.SubElement(parent, 'bpmn:messageFlow', id=element_id, sourceRef=source_id, targetRef=target_id, name=text_mapping[data['BPMN_id'][idx]]) else: waypoints = calculate_waypoints(data, size, idx, source_id, target_id) if waypoints is None: return element = ET.SubElement(parent, 'bpmn:sequenceFlow', id=element_id, sourceRef=source_id, targetRef=target_id, name=text_mapping[data['BPMN_id'][idx]]) add_diagram_edge(bpmn, element_id, waypoints)