2 likes
·
1.4K reads
2 comments
Thanks for the article, it will save me a lot of time next week! 😅 I have some concerns.
1/ For the createProcessTicker
method:
try {
for (const record of records) {
await processRecord(record);
await cursorRepo.setPosition(record.id);
processedRecords++;
}
} catch (error) {
logger.error('Could not process record', {
processName,
record,
error,
});
} finally {
await cursorRepo.unlockCursor();
}
Should error
be forwarded in the catch
block? Otherwise, the cursor process won't terminate if failed to process a record.
2/ For this termination flow
} catch (error) {
logger.critical('Terminating server: cursor process error', {
processName: props.processName,
error,
}); // we use structured logging
await lifecycle.close();
process.exit(1);
}
When a node is terminated and the cursor lock is released, will the terminated process propagate to the remaining nodes?
I guess, there will be the scenario that container managers (CR, K8S) keep initialize new nodes to replace terminated nodes until the error is fixed.
For 1: We don't want this exception to bubble. Say the service we're messaging is down, in that case, the throw will prevent the cursor from moving past its current point. Then we unlock the cursor, and on the next tick, we try the same message again. This will keep happening until the service comes back up. That's why we also want alerts to trigger if there are too many errors or the cursor falls too far behind its target. The number of records retrieved each tick is our queue depth, so you could log that and setup an alert.
For 2: The error will terminate the current node process. If you are running it in the same process as the web server, as in my article, then yes CR, K8S, ECS, etc will terminate the pod and start a new one, which is intended.