Configuration reference for the Kafka EMS sink Connector

Current release: 1.1.0

  • connect.ems.error.policy

    Specifies the action to be taken if an error occurs while inserting the data. There are three available options: CONTINUE - the error is swallowed, THROW - the error is allowed to propagate, RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.ems.max.retries. All errors will be logged automatically, even if the code swallows them.

    Type:string
    Default:RETRY
    Valid Values:[continue, throw, retry] (case insensitive)
    Importance:high
  • connect.ems.max.retries

    The maximum number of times to re-attempt to write the records before the task is marked as failed.

    Type:int
    Default:10
    Valid Values:[1,...]
    Importance:medium
  • connect.ems.retry.interval

    The time in milliseconds between retries.

    Type:long
    Default:15000
    Valid Values:[1000,...]
    Importance:medium
  • connect.ems.error.policy.continue.on.invalid.record

    If set to 'true', connector will continue when invalid input errors occur. Invalid records will be sent to the DLQ, if configured. Please note that this does not cover errors that happen in Converters during de-serialization.

    Type:boolean
    Default:false
    Valid Values:
    Importance:medium
  • connect.ems.remote.log.enable

    activate/deactivate remote EMS logging

    Type:boolean
    Default:true
    Valid Values:
    Importance:low
  • connect.ems.remote.log.endpoint

    The URL of the EMS extractor events endpoint the appender will submit log entries to. Only needed when `connect.ems.uploader=CBP`.

    Type:string
    Default:null
    Valid Values:(Optional) Value must be a valid URL
    Importance:low
  • connect.ems.remote.log.flush.interval.ms

    The frequency at which the appender will try to flush the buffered log entries

    Type:long
    Default:60000 (1 minute)
    Valid Values:[1,...,9223372036854775807]
    Importance:low
  • connect.ems.remote.log.flush.min.records

    The minimum amount of events that need to be in the buffer before this can be flushed.

    Type:int
    Default:50
    Valid Values:[1,...,2147483647]
    Importance:low
  • connect.ems.remote.log.flush.max.records

    The maximum amount of buffer entries before the appender will start to drop log entries.

    Type:int
    Default:1000
    Valid Values:[1,...,2147483647]
    Importance:low
  • connect.ems.remote.log.flush.grace.ms

    The maximum time (in milliseconds) the appender should wait in between two consecutive flushes of log entries.

    Type:long
    Default:60000 (1 minute)
    Valid Values:[1,...,9223372036854775807]
    Importance:low
  • connect.ems.remote.log.heartbeat.interval.ms

    The approximate frequency at which the EMS heartbeat log message should be produced

    Type:long
    Default:180000 (3 minutes)
    Valid Values:[1,...,9223372036854775807]
    Importance:low
  • connect.ems.remote.log.consecutive.zero.put.before.disable

    The number of time the connector underlying consumer is called with a zero records before the telemetry gets disabled

    Type:int
    Default:10
    Valid Values:[1,...,2147483647]
    Importance:low
  • connect.ems.remote.log.access.key

    The access key to be used to authenticate to the telemetry endpoint. Only needed when `connect.ems.uploader=CBP`

    Type:password
    Default:null
    Valid Values:
    Importance:low
  • connect.ems.uploader.s3.access.key

    The AWS access key

    Type:password
    Default:null
    Valid Values:
    Importance:high
  • connect.ems.uploader.s3.access.secret

    The AWS access secret

    Type:password
    Default:null
    Valid Values:
    Importance:high
  • connect.ems.uploader.s3.endpoint

    The AWS S3 endpoint

    Type:string
    Default:null
    Valid Values:(Optional) Value must be a valid URL
    Importance:high
  • connect.ems.uploader.s3.region

    The AWS region. Ignored if you are using https://**.celonis.cloud/api/data-ingestion/continuous as endpoint

    Type:string
    Default:eu-central-1
    Valid Values:
    Importance:high
  • connect.ems.uploader.s3.bucket

    The name of the S3 Bucket

    Type:string
    Default:null
    Valid Values:
    Importance:high
  • connect.ems.uploader.s3.object.key.prefix

    Prefix of the S3 object keys

    Type:string
    Default:null
    Valid Values:
    Importance:high
  • connect.ems.target.template

    Target table. This can be a string containing source topic and partition, that will be replaced with actual values. Example: "table_{topic}_{partition}"

    Type:string
    Default:{topic}
    Valid Values:
    Importance:high
  • connect.ems.uploader.type

    How to upload the files. Available values: cbp, s3, noop.

    Type:string
    Default:s3
    Valid Values:[cbp, s3, noop] (case insensitive)
    Importance:high
  • connect.ems.parquet.row.group.size.bytes

    The number of bytes of the row groups in the Parquet file. Default is 16777216.

    Type:int
    Default:16777216 (16 mebibytes)
    Valid Values:
    Importance:medium
  • connect.ems.debug.keep.parquet.files

    For debug purpose, set the setting to true for the connector to keep the local files after an upload. Default is false.

    Type:boolean
    Default:false
    Valid Values:
    Importance:low
  • connect.ems.inmemfs.enable

    Rather than writing to the host file system, buffer parquet data files in memory

    Type:boolean
    Default:false
    Valid Values:
    Importance:medium
  • connect.ems.tmp.dir

    The folder to store the temporary files as it accumulates data. If not specified then [/var/folders/_9/59m1tvp57pl816byf01cc71w0000gp/T//ems] is being used.

    Type:string
    Default:/var/folders/_9/59m1tvp57pl816byf01cc71w0000gp/T//ems
    Valid Values:
    Importance:low
  • connect.ems.obfuscation.method

    The type of obfuscation to apply. Available methods: NONE,FIX,SHA1,SHA512,RANDOM

    Type:string
    Default:NONE
    Valid Values:[sha512, fix, random, none] (case insensitive)
    Importance:low
  • connect.ems.obfuscation.paths

    A comma-separated list of paths to apply obfuscation to. The format is: 'topicName:field1.field2...fieldN'. topicName may be omitted to match path of all topics. An additional array selector may be used to obfuscate fields nested in arrays, like in 'users[].password,users[].creditCard'

    Type:list
    Default:""
    Valid Values:
    Importance:low
  • connect.ems.obfuscation.salt

    An optional salt string to use during obfuscation. Only applied for SHA1 and SHA512 methods.

    Type:string
    Default:""
    Valid Values:
    Importance:low
  • connect.ems.obfuscation.length

    Optional length of obfuscated output. Only used for FIX and RANDOM obfuscation methods

    Type:int
    Default:5
    Valid Values:[0,...,2147483647]
    Importance:low
  • connect.ems.http.proxy.host

    Proxy host including scheme (required) and port (optional), eg https://my-proxy.com or http://my-proxy.com:8080

    Type:string
    Default:null
    Valid Values:
    Importance:medium
  • connect.ems.http.proxy.auth.username

    Proxy BASIC auth username

    Type:string
    Default:null
    Valid Values:
    Importance:medium
  • connect.ems.http.proxy.auth.password

    Proxy BASIC auth password

    Type:password
    Default:null
    Valid Values:
    Importance:medium
  • connect.ems.http.proxy.non.proxy.hosts

    A comma separated list of non-proxied hosts

    Type:list
    Default:""
    Valid Values:
    Importance:medium
  • connect.ems.uploader.decimal.conversion.type

    The conversion strategy to use for persisting decimal values into EMS. Available values (plainString, decimal)

    Type:string
    Default:plainString
    Valid Values:[plainstring, double] (case insensitive)
    Importance:low
  • connect.ems.commit.size.bytes

    The accumulated file maximum size before it is uploaded to EMS. It cannot be less than 1MB.

    Type:long
    Default:25000000
    Valid Values:[100000,...]
    Importance:high
  • connect.ems.commit.records

    The maximum number of records in the accumulated file before it is uploaded to EMS.

    Type:long
    Default:10000
    Valid Values:[1,...]
    Importance:high
  • connect.ems.commit.interval.ms

    The time interval in milliseconds to upload the data to EMS if the other two commit policies are not yet applicable.

    Type:long
    Default:180000 (3 minutes)
    Valid Values:[1000,...]
    Importance:high
  • connect.ems.embed.kafka.metadata

    Embed Kafka metadata such as partition, offset and timestamp as additional record fields.

    Type:boolean
    Default:true
    Valid Values:
    Importance:medium