Skip to content

Getting started

Install Dependency#

First install the extension using the corresponding ingress adapter (in this example we use Spring Cloud Stream for connecting with Kafka):

<properties>
  <camunda-bpm-correlate.version>1.0.0</camunda-bpm-correlate.version>
</properties>

<dependencies>
  <dependency>
    <groupId>io.holunda</groupId>
    <artifactId>camunda-bpm-correlate-spring-boot-starter</artifactId>
    <version>${camunda-bpm-correlate.version}</version>
  </dependency>

  <dependency>
    <groupId>io.holunda</groupId>
    <artifactId>camunda-bpm-correlate-spring-cloud-stream</artifactId>
    <version>${camunda-bpm-correlate.version}</version>
  </dependency>

  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
  </dependency>

</dependencies>

Configuration#

Please add the configuration of the extension:

correlate:
  enabled: true
  channels:
    my-kafka-channel:
      enabled: true
      type: stream
      beanName: special-name
  batch:
    mode: all # default fail_first -> 'all' will correlate one message after another, resulting in ignoring the order of receiving
    query:    # query scheduler
      pollInitialDelay: PT10S
      pollInterval: PT6S
    cleanup:  # cleanup of expired messages
      pollInitialDelay: PT1M
      pollInterval: PT1M
  message:
    timeToLiveAsString: PT10S # errors during TTL seconds after receiving are ignored
    payloadEncoding: jackson  # our bytes are actually JSON written by Jackson.
  persistence:
    messageMaxRetries: 5 # default 100 -> will try to deliver 5 times at most
    messageFetchPageSize: 100 # default 100
  retry:
    retryMaxBackoffMinutes: 5 # default 180 -> maximum 5 minutes between retries
    retryBackoffBase: 2.0 # value in minutes default 2.0 -> base in the power of retry to calculate the next retry

Now configure your basic Spring Cloud Streams Kafka configuration to looks like this (or similar). Pay attention to the name of the function definition and the bindings' in channels. It results from the value of correlate.channels.<channel-nam>.beanName and accordingly is part of the expression to bind the parameter of the binding (special-name-in-0).

spring:
  cloud:
    stream:
      function:
        definition: special-name
        bindings:
          special-name-in-0: correlate-ingress-binding      
      bindings:
        correlate-ingress-binding:
          content-type: application/json
          destination: ${KAFKA_TOPIC_CORRELATE_INGRES:correlate-ingress}
          binder: correlate-ingress-binder
          group: ${KAFKA_GROUP_ID}
      binders:
        correlate-ingress-binder:
          type: kafka
          defaultCandidate: false
          inheritEnvironment: false
          environment:
            spring:
              kafka:
                consumer:
                  key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
                  value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      autoAddPartitions: false
                      brokers: ${KAFKA_BOOTSTRAP_SERVER_HOST:localhost}:${KAFKA_BOOTSTRAP_SERVER_PORT:9092}
                      configuration:
                        security.protocol: ${KAFKA_SECURITY_PROTOCOL_OVERRIDE:PLAINTEXT}

Last update: November 10, 2023