Transformers documentation

¿Cómo puedo crear un pipeline personalizado?

Hugging Face's logo
Join the Hugging Face community

and get access to the augmented documentation experience

to get started

¿Cómo puedo crear un pipeline personalizado?

En esta guía, veremos cómo crear un pipeline personalizado y cómo compartirlo en el Hub o añadirlo a la biblioteca 🤗 Transformers.

En primer lugar, debes decidir las entradas que tu pipeline podrá recibir. Pueden ser strings, bytes, diccionarios o lo que te parezca que vaya a ser la entrada más apropiada. Intenta mantener estas entradas en un formato que sea tan Python puro como sea posible, puesto que esto facilita la compatibilidad (incluso con otros lenguajes de programación por medio de JSON). Estos serán los inputs (entradas) del pipeline (preprocess).

Ahora debes definir los outputs (salidas). Al igual que con los inputs, entre más simple el formato, mejor. Estas serán las salidas del método postprocess (posprocesamiento).

Empieza heredando la clase base Pipeline con los 4 métodos que debemos implementar: preprocess (preprocesamiento), _forward (ejecución), postprocess (posprocesamiento) y _sanitize_parameters (verificar parámetros).

from transformers import Pipeline


class MyPipeline(Pipeline):
    def _sanitize_parameters(self, **kwargs):
        preprocess_kwargs = {}
        if "maybe_arg" in kwargs:
            preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]
        return preprocess_kwargs, {}, {}

    def preprocess(self, inputs, maybe_arg=2):
        model_input = Tensor(inputs["input_ids"])
        return {"model_input": model_input}

    def _forward(self, model_inputs):
        # model_inputs == {"model_input": model_input}
        outputs = self.model(**model_inputs)
        # Quizá {"logits": Tensor(...)}
        return outputs

    def postprocess(self, model_outputs):
        best_class = model_outputs["logits"].softmax(-1)
        return best_class

La estructura de este desglose es así para garantizar una compatibilidad más o menos transparente con el uso de CPU/GPU y el pre/posprocesamiento en CPU en varios hilos.

preprocess tomará las entradas definidas originalmente y las convertirá en algo que se le pueda pasar al modelo. Podría contener más información y a menudo es un objeto Dict (diccionario).

_forward contiene los detalles de la implementación y no debería ser invocado de forma directa. forward es el método preferido a utilizar pues contiene verificaciones para asegurar que todo funcione en el dispositivo correcto. Cualquier cosa que esté relacionada con un modelo real debería ir en el método _forward, todo lo demás va en los métodos de preprocesamiento y posprocesamiento.

Los métodos postprocess reciben la salida _forward y la convierten en la salida final que decidimos anteriormente.

_sanitize_parameters existe para permitir a los usuarios pasar cualesquiera parámetros cuando lo deseen, ya sea al momento de inicializar el pipeline pipeline(...., maybe_arg=4) o al momento de invocarlo pipe = pipeline(...); output = pipe(...., maybe_arg=4).

El método _sanitize_parameters devuelve 3 diccionarios de kwargs que serán pasados directamente a preprocess, _forward y postprocess. No ingreses nada si el caller no se va a invocar con parámetros adicionales. Esto permite mantener los parámetros por defecto de la definición de la función, lo que es más “natural”.

Un ejemplo clásico sería un argumento top_k en el posprocesamiento de una tarea de clasificación.

>>> pipe = pipeline("my-new-task")
>>> pipe("This is a test")
[{"label": "1-star", "score": 0.8}, {"label": "2-star", "score": 0.1}, {"label": "3-star", "score": 0.05}
{"label": "4-star", "score": 0.025}, {"label": "5-star", "score": 0.025}]

>>> pipe("This is a test", top_k=2)
[{"label": "1-star", "score": 0.8}, {"label": "2-star", "score": 0.1}]

Para lograrlo, actualizaremos nuestro método postprocess con un valor por defecto de 5 y modificaremos _sanitize_parameters para permitir este nuevo parámetro.

def postprocess(self, model_outputs, top_k=5):
    best_class = model_outputs["logits"].softmax(-1)
    # Añade la lógica para manejar el top_k
    return best_class


def _sanitize_parameters(self, **kwargs):
    preprocess_kwargs = {}
    if "maybe_arg" in kwargs:
        preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]

    postprocess_kwargs = {}
    if "top_k" in kwargs:
        postprocess_kwargs["top_k"] = kwargs["top_k"]
    return preprocess_kwargs, {}, postprocess_kwargs

Intenta que las entradas y salidas sean muy simples e, idealmente, que puedan serializarse como JSON, pues esto hace el uso del pipeline muy sencillo sin que el usuario tenga que preocuparse por conocer nuevos tipos de objetos. También es relativamente común tener compatibilidad con muchos tipos diferentes de argumentos por facilidad de uso (por ejemplo, los archivos de audio pueden ser nombres de archivo, URLs o bytes).

Añadirlo a la lista de tareas

Para registrar tu new-task (nueva tarea) en la lista de tareas, debes añadirla al PIPELINE_REGISTRY (registro de pipelines):

from transformers.pipelines import PIPELINE_REGISTRY

PIPELINE_REGISTRY.register_pipeline(
    "new-task",
    pipeline_class=MyPipeline,
    pt_model=AutoModelForSequenceClassification,
)

Puedes especificar un modelo por defecto si lo deseas, en cuyo caso debe venir con una versión específica (que puede ser el nombre de un branch o hash de commit, en este caso usamos "abcdef"), así como el tipo:

PIPELINE_REGISTRY.register_pipeline(
    "new-task",
    pipeline_class=MyPipeline,
    pt_model=AutoModelForSequenceClassification,
    default={"pt": ("user/awesome_model", "abcdef")},
    type="text",  # tipo de datos que maneja: texto, audio, imagen, multi-modalidad
)

Comparte tu pipeline en el Hub

Para compartir tu pipeline personalizado en el Hub, solo tienes que guardar el código personalizado de tu sub-clase Pipeline en un archivo de Python. Por ejemplo, digamos que queremos usar un pipeline personalizado para la clasificación de duplas de oraciones de esta forma:

import numpy as np

from transformers import Pipeline


def softmax(outputs):
    maxes = np.max(outputs, axis=-1, keepdims=True)
    shifted_exp = np.exp(outputs - maxes)
    return shifted_exp / shifted_exp.sum(axis=-1, keepdims=True)


class PairClassificationPipeline(Pipeline):
    def _sanitize_parameters(self, **kwargs):
        preprocess_kwargs = {}
        if "second_text" in kwargs:
            preprocess_kwargs["second_text"] = kwargs["second_text"]
        return preprocess_kwargs, {}, {}

    def preprocess(self, text, second_text=None):
        return self.tokenizer(text, text_pair=second_text, return_tensors=self.framework)

    def _forward(self, model_inputs):
        return self.model(**model_inputs)

    def postprocess(self, model_outputs):
        logits = model_outputs.logits[0].numpy()
        probabilities = softmax(logits)

        best_class = np.argmax(probabilities)
        label = self.model.config.id2label[best_class]
        score = probabilities[best_class].item()
        logits = logits.tolist()
        return {"label": label, "score": score, "logits": logits}

La implementación es independiente del framework y funcionará con modelos de PyTorch y TensorFlow. Si guardamos esto en un archivo llamado pair_classification.py, podemos importarlo y registrarlo de la siguiente manera:

from pair_classification import PairClassificationPipeline
from transformers.pipelines import PIPELINE_REGISTRY
from transformers import AutoModelForSequenceClassification, TFAutoModelForSequenceClassification

PIPELINE_REGISTRY.register_pipeline(
    "pair-classification",
    pipeline_class=PairClassificationPipeline,
    pt_model=AutoModelForSequenceClassification,
    tf_model=TFAutoModelForSequenceClassification,
)

Una vez hecho esto, podemos usarlo con un modelo pre-entrenado. Por ejemplo, al modelo sgugger/finetuned-bert-mrpc se le hizo fine-tuning con el dataset MRPC, en el cual se clasifican duplas de oraciones como paráfrasis o no.

from transformers import pipeline

classifier = pipeline("pair-classification", model="sgugger/finetuned-bert-mrpc")

Ahora podemos compartirlo en el Hub usando el método save_pretrained:

classifier.push_to_hub("test-dynamic-pipeline")

Esto copiará el archivo donde definiste PairClassificationPipeline dentro de la carpeta "test-dynamic-pipeline", y además guardará el modelo y el tokenizer del pipeline, antes de enviar todo al repositorio {your_username}/test-dynamic-pipeline. Después de esto, cualquier persona puede usarlo siempre que usen la opción trust_remote_code=True (confiar en código remoto):

from transformers import pipeline

classifier = pipeline(model="{your_username}/test-dynamic-pipeline", trust_remote_code=True)

Añadir el pipeline a 🤗 Transformers

Si quieres contribuir tu pipeline a la biblioteca 🤗 Transformers, tendrás que añadirlo a un nuevo módulo en el sub-módulo pipelines con el código de tu pipeline. Luego, debes añadirlo a la lista de tareas definidas en pipelines/__init__.py.

A continuación tienes que añadir las pruebas. Crea un nuevo archivo llamado tests/test_pipelines_MY_PIPELINE.py basándote en las pruebas existentes.

La función run_pipeline_test será muy genérica y se correrá sobre modelos pequeños escogidos al azar sobre todas las arquitecturas posibles definidas en model_mapping y tf_model_mapping.

Esto es muy importante para probar compatibilidades a futuro, lo que significa que si alguien añade un nuevo modelo para XXXForQuestionAnswering entonces el pipeline intentará ejecutarse con ese modelo. Ya que los modelos son aleatorios, es imposible verificar los valores como tales, y es por eso que hay un helper ANY que simplemente intentará que la salida tenga el mismo tipo que la salida esperada del pipeline.

También debes implementar 2 (preferiblemente 4) pruebas:

  • test_small_model_pt : Define un (1) modelo pequeño para este pipeline (no importa si los resultados no tienen sentido) y prueba las salidas del pipeline. Los resultados deberían ser los mismos que en test_small_model_tf.
  • test_small_model_tf : Define un (1) modelo pequeño para este pipeline (no importa si los resultados no tienen sentido) y prueba las salidas del pipeline. Los resultados deberían ser los mismos que en test_small_model_pt.
  • test_large_model_pt (optional): Prueba el pipeline en una tarea real en la que los resultados deben tener sentido. Estas pruebas son lentas y deben marcarse como tales. El objetivo de esto es ejemplificar el pipeline y asegurarse de que no haya divergencias en versiones futuras.
  • test_large_model_tf (optional): Prueba el pipeline en una tarea real en la que los resultados deben tener sentido. Estas pruebas son lentas y deben marcarse como tales. El objetivo de esto es ejemplificar el pipeline y asegurarse de que no haya divergencias en versiones futuras.
< > Update on GitHub