File size: 2,555 Bytes
3206347 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
import { Worker } from 'bullmq';
import process from 'node:process';
import * as Sentry from '../helpers/sentry.ee.js';
import redisConfig from '../config/redis.js';
import logger from '../helpers/logger.js';
import flowQueue from '../queues/flow.js';
import triggerQueue from '../queues/trigger.js';
import { processFlow } from '../services/flow.js';
import Flow from '../models/flow.js';
import {
REMOVE_AFTER_30_DAYS_OR_150_JOBS,
REMOVE_AFTER_7_DAYS_OR_50_JOBS,
} from '../helpers/remove-job-configuration.js';
export const worker = new Worker(
'flow',
async (job) => {
const { flowId } = job.data;
const flow = await Flow.query().findById(flowId).throwIfNotFound();
const user = await flow.$relatedQuery('user');
const allowedToRunFlows = await user.isAllowedToRunFlows();
if (!allowedToRunFlows) {
return;
}
const triggerStep = await flow.getTriggerStep();
const { data, error } = await processFlow({ flowId });
const reversedData = data.reverse();
const jobOptions = {
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS,
};
for (const triggerItem of reversedData) {
const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`;
const jobPayload = {
flowId,
stepId: triggerStep.id,
triggerItem,
};
await triggerQueue.add(jobName, jobPayload, jobOptions);
}
if (error) {
const jobName = `${triggerStep.id}-error`;
const jobPayload = {
flowId,
stepId: triggerStep.id,
error,
};
await triggerQueue.add(jobName, jobPayload, jobOptions);
}
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
});
worker.on('failed', async (job, err) => {
const errorMessage = `
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
\n ${err.stack}
`;
logger.error(errorMessage);
const flow = await Flow.query().findById(job.data.flowId);
if (!flow) {
await flowQueue.removeRepeatableByKey(job.repeatJobKey);
const flowNotFoundErrorMessage = `
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found!
`;
logger.error(flowNotFoundErrorMessage);
}
Sentry.captureException(err, {
extra: {
jobId: job.id,
},
});
});
process.on('SIGTERM', async () => {
await worker.close();
});
|