|
import isEmpty from 'lodash/isEmpty.js'; |
|
|
|
import Flow from '../models/flow.js'; |
|
import { processTrigger } from '../services/trigger.js'; |
|
import triggerQueue from '../queues/trigger.js'; |
|
import globalVariable from './global-variable.js'; |
|
import QuotaExceededError from '../errors/quote-exceeded.js'; |
|
import { |
|
REMOVE_AFTER_30_DAYS_OR_150_JOBS, |
|
REMOVE_AFTER_7_DAYS_OR_50_JOBS, |
|
} from './remove-job-configuration.js'; |
|
|
|
export default async (flowId, request, response) => { |
|
const flow = await Flow.query().findById(flowId).throwIfNotFound(); |
|
const user = await flow.$relatedQuery('user'); |
|
|
|
const testRun = !flow.active; |
|
const quotaExceeded = !testRun && !(await user.isAllowedToRunFlows()); |
|
|
|
if (quotaExceeded) { |
|
throw new QuotaExceededError(); |
|
} |
|
|
|
const triggerStep = await flow.getTriggerStep(); |
|
const app = await triggerStep.getApp(); |
|
const isWebhookApp = app.key === 'webhook'; |
|
|
|
if (testRun && !isWebhookApp) { |
|
return response.status(404); |
|
} |
|
|
|
const connection = await triggerStep.$relatedQuery('connection'); |
|
|
|
const $ = await globalVariable({ |
|
flow, |
|
connection, |
|
app, |
|
step: triggerStep, |
|
testRun, |
|
request, |
|
}); |
|
|
|
const triggerCommand = await triggerStep.getTriggerCommand(); |
|
await triggerCommand.run($); |
|
|
|
const reversedTriggerItems = $.triggerOutput.data.reverse(); |
|
|
|
|
|
|
|
|
|
if (isEmpty(reversedTriggerItems)) { |
|
return response.status(204); |
|
} |
|
|
|
for (const triggerItem of reversedTriggerItems) { |
|
if (testRun) { |
|
await processTrigger({ |
|
flowId, |
|
stepId: triggerStep.id, |
|
triggerItem, |
|
testRun, |
|
}); |
|
|
|
continue; |
|
} |
|
|
|
const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`; |
|
|
|
const jobOptions = { |
|
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, |
|
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, |
|
}; |
|
|
|
const jobPayload = { |
|
flowId, |
|
stepId: triggerStep.id, |
|
triggerItem, |
|
}; |
|
|
|
await triggerQueue.add(jobName, jobPayload, jobOptions); |
|
} |
|
|
|
return response.status(204); |
|
}; |
|
|