Closing shared redis connection causes infinite loop and crash [BUG]
Created by: dobesv
Description
If you share an ioredis instance between queues, then disconnect the redis client, the processing loop uses a lot of memory and CPU and spins forever.
I believe the issue occurs because of the way errors are "swallowed" in this function:
Queue.prototype.getNextJob = function() {
if (this.closing) {
return Promise.resolve();
}
if (this.drained) {
//
// Waiting for new jobs to arrive
//
return this.bclient
.brpoplpush(this.keys.wait, this.keys.active, this.settings.drainDelay)
.then(
jobId => {
if (jobId) {
return this.moveToActive(jobId);
}
},
err => {
// Swallow error
if (err.message !== 'Connection is closed.') {
console.error('BRPOPLPUSH', err);
}
}
);
} else {
return this.moveToActive();
}
};
Probably what needs to happen is that the queue should go into some sort of backoff loop waiting for either (1) this.closing === true
or (2) things start working again
A possible fix might look like this:
Queue.prototype.getNextJob = function () {
if (this.closing) {
return Promise.resolve();
}
if (this.drained) {
if (this.getNextJobBackoff) {
console.log('getNextJob() in backoff...');
return this.getNextJobBackoff;
}
//
// Waiting for new jobs to arrive
//
return this.bclient
.brpoplpush(this.keys.wait, this.keys.active, this.settings.drainDelay)
.then(
(jobId) => {
if (jobId) {
return this.moveToActive(jobId);
}
},
(err) => {
// Add a delay before the a next attempt
this.getNextJobBackoff = promisify(setTimeout)(
1000
).then(() => {
this.getNextJobBackoff = null;
});
// Swallow 'Connection is closed.' error
if (err.message !== 'Connection is closed.') {
throw err;
}
}
);
} else {
return this.moveToActive();
}
};
Note that this method handles the closing
case immediately because we do not delay the immediate return / error, we only delay if we come back into getNextJob
with this.closing
unset.
Minimal, Working Test code to reproduce the issue.
// reproduce.js
const Queue = require('bull');
const Redis = require('ioredis');
const Bluebird = require('bluebird');
const main = async () => {
const clients = {};
const createClient = (type, redisOpts) => {
const redisOptions = {
...redisOpts,
connectionName: ['bull', type].join('-'),
};
const k = JSON.stringify(redisOptions);
return clients[k] || (clients[k] = new Redis(redisOptions));
};
const q1 = new Queue('1', { createClient });
const q2 = new Queue('2', { createClient });
const q3 = new Queue('3', { createClient });
q1.process(1, job => console.log('q1', job.data)).catch(err =>
console.error(err),
);
q2.process(2, job => console.log('q2', job.data)).catch(err =>
console.error(err),
);
q3.process(3, job => console.log('q3', job.data)).catch(err =>
console.error(err),
);
console.log('processors ready');
await Bluebird.delay(1000);
await Promise.all([
q1.add({ q: 1 }),
q1.add({ q: 1.2 }),
q2.add({ q: 2 }),
q2.add({ q: 2.1 }),
q2.add({ q: 2.2 }),
q3.add({ q: 3 }),
q3.add({ q: 3.1 }),
q3.add({ q: 3.2 }),
q3.add({ q: 3.3 }),
q3.add({ q: 3.4 }),
]);
await Bluebird.delay(1000);
Object.values(clients).forEach(client => client.disconnect());
console.log('disconnected redis');
await q1.close();
console.log('closed q1');
await q2.close();
console.log('closed q2');
await q3.close();
console.log('closed q3');
console.log('disconnecting redis again');
Object.values(clients).forEach(client => client.disconnect());
console.log('end of main');
};
main();
$ NODE_OPTIONS=--max-old-space-size=100 node ./reproduce.js
processors ready
q1 { q: 1 }
q2 { q: 2 }
q2 { q: 2.1 }
q3 { q: 3 }
q3 { q: 3.1 }
q3 { q: 3.2 }
q1 { q: 1.2 }
q2 { q: 2.2 }
q3 { q: 3.3 }
q3 { q: 3.4 }
<--- Last few GCs --->
e [3316:0x3825890] 5861 ms: Mark-sweep 99.3 (100.6) -> 98.9 (100.9) MB, 54.1 / 0.0 ms (+ 5.7 ms in 8 steps since start of marking, biggest step 2.9 ms, walltime since start of marking 63 ms) (average mu = 0.122, current mu = 0.060) allocation failure s[3316:0x3825890] 5898 ms: Mark-sweep 99.5 (100.9) -> 99.1 (100.9) MB, 25.3 / 0.0 ms (+ 8.2 ms in 10 steps since start of marking, biggest step 3.2 ms, walltime since start of marking 38 ms) (average mu = 0.117, current mu = 0.110) allocation failure
<--- JS stacktrace --->
==== JS stack trace =========================================
0: ExitFrame [pc: 0x1374fd9]
1: StubFrame [pc: 0x1343cf8]
2: StubFrame [pc: 0x1311dc7]
3: EntryFrame [pc: 0x12f2b78]
4: ExitFrame [pc: 0x12f622d]
Security context: 0x3b56550008a1 <JSObject>
5: processTicksAndRejections [0xbab20a449d1] [internal/process/task_queues.js:~65] [pc=0x2ec6f29ec6b8](this=0x0bab20a42a09 <process map = 0x31243d9a8111>)
6: InternalFrame [pc: 0x12f2c1d]
7: EntryFrame [pc: 0x12f29f8]
=...
FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory
Writing Node.js report to file: report.20200522.172937.3316.0.001.json
Node.js report completed
1: 0x9da7c0 node::Abort() [node]
2: 0x9db976 node::OnFatalError(char const*, char const*) [node]
3: 0xb39f1e v8::Utils::ReportOOMFailure(v8::internal::Isolate*, char const*, bool) [node]
4: 0xb3a299 v8::internal::V8::FatalProcessOutOfMemory(v8::internal::Isolate*, char const*, bool) [node]
5: 0xce5635 [node]
6: 0xce5cc6 v8::internal::Heap::RecomputeLimits(v8::internal::GarbageCollector) [node]
7: 0xcf1b5a v8::internal::Heap::PerformGarbageCollection(v8::internal::GarbageCollector, v8::GCCallbackFlags) [node]
8: 0xcf2a65 v8::internal::Heap::CollectGarbage(v8::internal::AllocationSpace, v8::internal::GarbageCollectionReason, v8::GCCallbackFlags) [node]
9: 0xcf5478 v8::internal::Heap::AllocateRawWithRetryOrFail(int, v8::internal::AllocationType, v8::internal::AllocationAlignment) [node]
10: 0xcbbda7 v8::internal::Factory::NewFillerObject(int, bool, v8::internal::AllocationType) [node]
11: 0xff1e0b v8::internal::Runtime_AllocateInYoungGeneration(int, unsigned long*, v8::internal::Isolate*) [node]
12: 0x1374fd9 [node]
Aborted
Bull version
3.14.0