# Configuration reference for the Kafka EMS sink Connector

Current release: *1.1.0*

ul
li
h4
a
a
connect.ems.error.policy
p
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: CONTINUE - the error is swallowed, THROW - the error is allowed to propagate, RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.ems.max.retries. All errors will be logged automatically, even if the code swallows them.
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
RETRY
tr
th
Valid Values:
td
[continue, throw, retry] (case insensitive)
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.max.retries
p
The maximum number of times to re-attempt to write the records before the task is marked as failed.
table
tbody
tr
th
Type:
td
int
tr
th
Default:
td
10
tr
th
Valid Values:
td
[1,...]
tr
th
Importance:
td
medium
li
h4
a
a
connect.ems.retry.interval
p
The time in milliseconds between retries.
table
tbody
tr
th
Type:
td
long
tr
th
Default:
td
15000
tr
th
Valid Values:
td
[1000,...]
tr
th
Importance:
td
medium
li
h4
a
a
connect.ems.error.policy.continue.on.invalid.record
p
If set to 'true', connector will continue when invalid input errors occur. Invalid records will be sent to the DLQ, if configured. Please note that this does not cover errors that happen in Converters during de-serialization.
table
tbody
tr
th
Type:
td
boolean
tr
th
Default:
td
false
tr
th
Valid Values:
td
tr
th
Importance:
td
medium
li
h4
a
a
connect.ems.remote.log.enable
p
activate/deactivate remote EMS logging
table
tbody
tr
th
Type:
td
boolean
tr
th
Default:
td
true
tr
th
Valid Values:
td
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.remote.log.endpoint
p
The URL of the EMS extractor events endpoint the appender will submit log entries to. Only needed when `connect.ems.uploader=CBP`.
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
null
tr
th
Valid Values:
td
(Optional) Value must be a valid URL
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.remote.log.flush.interval.ms
p
The frequency at which the appender will try to flush the buffered log entries
table
tbody
tr
th
Type:
td
long
tr
th
Default:
td
60000 (1 minute)
tr
th
Valid Values:
td
[1,...,9223372036854775807]
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.remote.log.flush.min.records
p
The minimum amount of events that need to be in the buffer before this can be flushed.
table
tbody
tr
th
Type:
td
int
tr
th
Default:
td
50
tr
th
Valid Values:
td
[1,...,2147483647]
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.remote.log.flush.max.records
p
The maximum amount of buffer entries before the appender will start to drop log entries.
table
tbody
tr
th
Type:
td
int
tr
th
Default:
td
1000
tr
th
Valid Values:
td
[1,...,2147483647]
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.remote.log.flush.grace.ms
p
The maximum time (in milliseconds) the appender should wait in between two consecutive flushes of log entries.
table
tbody
tr
th
Type:
td
long
tr
th
Default:
td
60000 (1 minute)
tr
th
Valid Values:
td
[1,...,9223372036854775807]
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.remote.log.heartbeat.interval.ms
p
The approximate frequency at which the EMS heartbeat log message should be produced
table
tbody
tr
th
Type:
td
long
tr
th
Default:
td
180000 (3 minutes)
tr
th
Valid Values:
td
[1,...,9223372036854775807]
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.remote.log.consecutive.zero.put.before.disable
p
The number of time the connector underlying consumer is called with a zero records before the telemetry gets disabled
table
tbody
tr
th
Type:
td
int
tr
th
Default:
td
10
tr
th
Valid Values:
td
[1,...,2147483647]
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.remote.log.access.key
p
The access key to be used to authenticate to the telemetry endpoint. Only needed when `connect.ems.uploader=CBP`
table
tbody
tr
th
Type:
td
password
tr
th
Default:
td
null
tr
th
Valid Values:
td
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.uploader.s3.access.key
p
The AWS access key
table
tbody
tr
th
Type:
td
password
tr
th
Default:
td
null
tr
th
Valid Values:
td
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.uploader.s3.access.secret
p
The AWS access secret
table
tbody
tr
th
Type:
td
password
tr
th
Default:
td
null
tr
th
Valid Values:
td
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.uploader.s3.endpoint
p
The AWS S3 endpoint
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
null
tr
th
Valid Values:
td
(Optional) Value must be a valid URL
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.uploader.s3.region
p
The AWS region. Ignored if you are using https://**.celonis.cloud/api/data-ingestion/continuous as endpoint
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
eu-central-1
tr
th
Valid Values:
td
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.uploader.s3.bucket
p
The name of the S3 Bucket
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
null
tr
th
Valid Values:
td
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.uploader.s3.object.key.prefix
p
Prefix of the S3 object keys
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
null
tr
th
Valid Values:
td
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.target.template
p
Target table. This can be a string containing source topic and partition, that will be replaced with actual values. Example: "table_{topic}_{partition}"
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
{topic}
tr
th
Valid Values:
td
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.uploader.type
p
How to upload the files. Available values: cbp, s3, noop.
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
s3
tr
th
Valid Values:
td
[cbp, s3, noop] (case insensitive)
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.parquet.row.group.size.bytes
p
The number of bytes of the row groups in the Parquet file. Default is 16777216.
table
tbody
tr
th
Type:
td
int
tr
th
Default:
td
16777216 (16 mebibytes)
tr
th
Valid Values:
td
tr
th
Importance:
td
medium
li
h4
a
a
connect.ems.debug.keep.parquet.files
p
For debug purpose, set the setting to true for the connector to keep the local files after an upload. Default is false.
table
tbody
tr
th
Type:
td
boolean
tr
th
Default:
td
false
tr
th
Valid Values:
td
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.inmemfs.enable
p
Rather than writing to the host file system, buffer parquet data files in memory
table
tbody
tr
th
Type:
td
boolean
tr
th
Default:
td
false
tr
th
Valid Values:
td
tr
th
Importance:
td
medium
li
h4
a
a
connect.ems.tmp.dir
p
The folder to store the temporary files as it accumulates data. If not specified then [/var/folders/_9/59m1tvp57pl816byf01cc71w0000gp/T//ems] is being used.
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
/var/folders/_9/59m1tvp57pl816byf01cc71w0000gp/T//ems
tr
th
Valid Values:
td
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.obfuscation.method
p
The type of obfuscation to apply. Available methods: NONE,FIX,SHA1,SHA512,RANDOM
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
NONE
tr
th
Valid Values:
td
[sha512, fix, random, none] (case insensitive)
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.obfuscation.paths
p
A comma-separated list of paths to apply obfuscation to. The format is: 'topicName:field1.field2...fieldN'. topicName may be omitted to match path of all topics. An additional array selector may be used to obfuscate fields nested in arrays, like in 'users[].password,users[].creditCard'
table
tbody
tr
th
Type:
td
list
tr
th
Default:
td
""
tr
th
Valid Values:
td
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.obfuscation.salt
p
An optional salt string to use during obfuscation. Only applied for SHA1 and SHA512 methods.
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
""
tr
th
Valid Values:
td
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.obfuscation.length
p
Optional length of obfuscated output. Only used for FIX and RANDOM obfuscation methods
table
tbody
tr
th
Type:
td
int
tr
th
Default:
td
5
tr
th
Valid Values:
td
[0,...,2147483647]
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.http.proxy.host
p
Proxy host including scheme (required) and port (optional), eg https://my-proxy.com or http://my-proxy.com:8080
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
null
tr
th
Valid Values:
td
tr
th
Importance:
td
medium
li
h4
a
a
connect.ems.http.proxy.auth.username
p
Proxy BASIC auth username
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
null
tr
th
Valid Values:
td
tr
th
Importance:
td
medium
li
h4
a
a
connect.ems.http.proxy.auth.password
p
Proxy BASIC auth password
table
tbody
tr
th
Type:
td
password
tr
th
Default:
td
null
tr
th
Valid Values:
td
tr
th
Importance:
td
medium
li
h4
a
a
connect.ems.http.proxy.non.proxy.hosts
p
A comma separated list of non-proxied hosts
table
tbody
tr
th
Type:
td
list
tr
th
Default:
td
""
tr
th
Valid Values:
td
tr
th
Importance:
td
medium
li
h4
a
a
connect.ems.uploader.decimal.conversion.type
p
The conversion strategy to use for persisting decimal values into EMS. Available values (plainString, decimal)
table
tbody
tr
th
Type:
td
string
tr
th
Default:
td
plainString
tr
th
Valid Values:
td
[plainstring, double] (case insensitive)
tr
th
Importance:
td
low
li
h4
a
a
connect.ems.commit.size.bytes
p
The accumulated file maximum size before it is uploaded to EMS. It cannot be less than 1MB.
table
tbody
tr
th
Type:
td
long
tr
th
Default:
td
25000000
tr
th
Valid Values:
td
[100000,...]
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.commit.records
p
The maximum number of records in the accumulated file before it is uploaded to EMS.
table
tbody
tr
th
Type:
td
long
tr
th
Default:
td
10000
tr
th
Valid Values:
td
[1,...]
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.commit.interval.ms
p
The time interval in milliseconds to upload the data to EMS if the other two commit policies are not yet applicable.
table
tbody
tr
th
Type:
td
long
tr
th
Default:
td
180000 (3 minutes)
tr
th
Valid Values:
td
[1000,...]
tr
th
Importance:
td
high
li
h4
a
a
connect.ems.embed.kafka.metadata
p
Embed Kafka metadata such as partition, offset and timestamp as additional record fields.
table
tbody
tr
th
Type:
td
boolean
tr
th
Default:
td
true
tr
th
Valid Values:
td
tr
th
Importance:
td
medium