jobPromise.cancel() gets clobbered for jobs with timeouts and promises that aren't native Promises
Created by: cincodenada
Description
There are a couple situations where calling jobPromise.cancel()
in the active
event callback doesn't work:
- If the job returns a promise that is not a native Promise (e.g. if using Bluebird or another cancelable promise library)
- If the job is given a timeout
Minimal, Working Test code to reproduce the issue.
const Queue = require('bull')
const delay = require('delay')
const Bluebird = require('bluebird')
Bluebird.config({ cancellation: true })
// A simple demo processor that runs for `jobData.delay` milliseconds
// It provides a cancelable promise, and stops processing early if canceled
async function tickProgress (job, state) {
for (let i = 0; i < 10; i++) {
if (state.canceled) {
console.log('Job canceled!')
return
}
job.progress(i * 10)
await delay(job.data.delay / 10)
}
job.progress(100)
}
function bluebirdProcessor (job) {
return new Bluebird((resolve, reject, onCancel) => {
const state = { canceled: false }
onCancel(() => { state.canceled = true })
tickProgress(job, state).then(resolve).catch(reject)
})
}
function nativeProcessor (job) {
const state = { canceled: false }
const promise = tickProgress(job, state)
promise.cancel = function () { state.canceled = true }
return promise
}
// replace with nativeProcessor to un-break it
const processor = bluebirdProcessor
;(async () => {
// Demonstrating that cancel works fine without the queue...
console.log('Running job processor outside of queue...')
const jobPromise = processor({
data: { delay: 5000 },
progress: progress => console.log('Progress:', progress)
})
// Wait a bit...
await delay(2000)
// Cancel
jobPromise.cancel()
// Wait for it to see the cancellation
await delay(500)
// Cancel does not work when wrapped in a queue job
const queue = new Queue('test')
queue.process(processor)
queue.on('error', async (error) => {
console.log('Queue error:', error)
})
queue.on('progress', function (job, progress) {
console.log('Progress at', progress)
})
queue.on('completed', async (job, result) => {
queue.close()
})
// When a job starts, wait 3.5 seconds and then attempt to cancel it
queue.on('active', async (job, jobPromise) => {
console.log('Job started:', jobPromise)
await delay(2000)
console.log('Attempting to cancel...')
jobPromise.cancel()
})
// Start the job
console.log('Adding job...')
queue.add({ delay: 5000 }, {
// And uncomment this to break it even with native promises
// timeout: 10000
})
})()
Bull version
4.1.0