Skip to content

Scheduled Processing

After the messages are stored in the database, they get processed triggered by a scheduler. The processing is performed not for every message, but on message batches. A message batch is a collection of messages with the same CorrelationHint. This means that all messages from one batch will be correlated with the same process instance.

The configuration of the scheduler is an important setting of the library and influences the time for correlation, error detection and error recovery. The following sections describe the configuration properties controlling the timing of the correlation.

Configuration summary#

Here is a configuration example:

correlate:
  batch:
    mode: all 
    query:    # query scheduler
      pollInitialDelay: PT10S
      pollInterval: PT6S
    cleanup:  # cleanup of expired messages
      pollInitialDelay: PT1M
      pollInterval: PT1M
  persistence: # persistence setting
    messageMaxRetries: 100 
    messageFetchPageSize: 100
    messageBatchSize: 1
  retry:
    retryMaxBackoffMinutes: 5 
    retryBackoffBase: 2.0 
Property Values Meaning Default
batch.mode all, fail_first Batch processing mode all
batch.query.pollInitialDelay Duration in ISO8601 Start delay before correlation scheduler starts PT10S
batch.query.pollInterval Duration in ISO8601 Delay between correlation attempts PT6S
batch.cleanup.pollInitialDelay Duration in ISO8601 Start delay before clean-up scheduler starts
batch.cleanup.pollInterval Duration in ISO8601 Delay between clean-ups
persistence.messageMaxRetries Integer Maximum retries before giving up correlation 100
persistence.messageFetchPageSize Integer Paging size by message fetch 100
persistence.messageBatchSize Integer Limit the number of messages processed from a batch -1
retry.retryMaxBackoffMinutes Integer Maximum backoff-time in minutes 180
retry.retryBackoffBase Float Base for exponential backoff-time 180

Reading message#

Messages are read in batches which are paged. You can set-up the page size, the interval between reads and the initial delay from the application start.

Batch correlation#

Batches of messages are checked to fulfill the following criteria:

  • Batch contains no messages with errors
  • Batch contains messages with errors and all those are due to retry (now < due, retry < max-retries)

Messages of one batch are correlated in order of their sorting. If a correlation error occurs, the batch correlation is either interrupted (fail_first mode) or the batch is correlated to the end (all mode).

An important parameter for batch processing is the message-batch-size. This parameter specifies the number of messages taken from a batch for synchronous correlation. Effectively, this parameter has two interesting values. Set this parameter to -1 (default) and all messages from one batch will be correlated directly one after another. Set this parameter to 1 and the batch will be constructed, but only the first message will be correlated in current run. If successful, the next message will be fetched during the next message query (after the batch.query.pollInterval, which should be a small interval). By doing so, you can deal with asynchronous continuations in your process.

Error detection#

If the error is detected during the correlation, it is handled by the library. If the message time-to-live is set and the error happens during the TTL (the message is alive), the error is not noted (and not stored), but the message will be skipped and picked up by the next batch correlation. If the error happens after the message TTL or TTL is not set, the error is noted causing the following information to be stored along the message in the database:

  • head of the exception stack trace occurred during the correlation
  • value incremented by 1 in attempt
  • new due date for retry (now plus value in minutes of retryBackoffBase at the power of attempt but at most the retryMaxBackOffMinutes)

Message processing example#

Imagine the message inserted at a point in time with TTL of 10 seconds producing a correlation error which can't be resolved by retries. Imagine that the value of retryMaxBackOffMinutes is set to 10 and the messageMaxRetries is 5.

Offset from ingested (sec) Why Attempt Next Retry from ingested (secs)
6 Picked up by batch correlation scheduler, error, no error recording because of TTL 0 null
12 Picked up by batch correlation scheduler, error, error noted 1 12sec offset + 2^0M = 12 + 60 = 72
18 Not picked up, because of error and next retry not due 1 72
72 Picked up by batch correlation scheduler, error, error noted 2 72sec offset + 2^1M = 72 + 120 = 192
192 Picked up by batch correlation scheduler, error, error noted 3 192sec offset + 2^2M = 192 + 240 = 432
432 Picked up by batch correlation scheduler, error, error noted 4 432sec offset + 2^3M = 432 + 480 = 922, but 600 sec is max = 600
600 Picked up by batch correlation scheduler, error, error noted 5 600
606 Not picked up, because of error and max retries are reached 5 600

Running in a cluster#

For activation of the cluster support, please add the following configuration snippet to your application.yml:

correlate:
  batch:
    cluster:
      enabled: true
      queuePollLockMostInterval: PT5M

For a cluster operations it is important to synchronize the batch schedulers between the cluster nodes. For this purpose, the library Shedlock is used. Shedlock synchronizes the scheduled tasks using a RDBMS table (we are using a JDBC Lock Provider). Here are the required DDL snippets for some common databases, please see shedlock documentation for more information.

CREATE TABLE shedlock
(
    name       NVARCHAR(64)  NOT NULL,
    lock_until DATETIME2     NOT NULL,
    locked_at  DATETIME2     NOT NULL,
    locked_by  NVARCHAR(255) NOT NULL,
    PRIMARY KEY (name)
);
CREATE TABLE shedlock
(
    name       VARCHAR(64)  NOT NULL,
    lock_until DATETIME2     NOT NULL,
    locked_at  DATETIME2     NOT NULL,
    locked_by  VARCHAR(255) NOT NULL,
    PRIMARY KEY (name)
);

Last update: November 10, 2023