musarehmani100's picture
Upload folder using huggingface_hub
3206347 verified
import { Worker } from 'bullmq';
import process from 'node:process';
import * as Sentry from '../helpers/sentry.ee.js';
import redisConfig from '../config/redis.js';
import logger from '../helpers/logger.js';
import flowQueue from '../queues/flow.js';
import triggerQueue from '../queues/trigger.js';
import { processFlow } from '../services/flow.js';
import Flow from '../models/flow.js';
import {
REMOVE_AFTER_30_DAYS_OR_150_JOBS,
REMOVE_AFTER_7_DAYS_OR_50_JOBS,
} from '../helpers/remove-job-configuration.js';
export const worker = new Worker(
'flow',
async (job) => {
const { flowId } = job.data;
const flow = await Flow.query().findById(flowId).throwIfNotFound();
const user = await flow.$relatedQuery('user');
const allowedToRunFlows = await user.isAllowedToRunFlows();
if (!allowedToRunFlows) {
return;
}
const triggerStep = await flow.getTriggerStep();
const { data, error } = await processFlow({ flowId });
const reversedData = data.reverse();
const jobOptions = {
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS,
};
for (const triggerItem of reversedData) {
const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`;
const jobPayload = {
flowId,
stepId: triggerStep.id,
triggerItem,
};
await triggerQueue.add(jobName, jobPayload, jobOptions);
}
if (error) {
const jobName = `${triggerStep.id}-error`;
const jobPayload = {
flowId,
stepId: triggerStep.id,
error,
};
await triggerQueue.add(jobName, jobPayload, jobOptions);
}
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
});
worker.on('failed', async (job, err) => {
const errorMessage = `
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
\n ${err.stack}
`;
logger.error(errorMessage);
const flow = await Flow.query().findById(job.data.flowId);
if (!flow) {
await flowQueue.removeRepeatableByKey(job.repeatJobKey);
const flowNotFoundErrorMessage = `
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found!
`;
logger.error(flowNotFoundErrorMessage);
}
Sentry.captureException(err, {
extra: {
jobId: job.id,
},
});
});
process.on('SIGTERM', async () => {
await worker.close();
});