Configuration reference for the Kafka EMS sink Connector
Current release: 1.1.0
-
connect.ems.error.policy
Specifies the action to be taken if an error occurs while inserting the data. There are three available options: CONTINUE - the error is swallowed, THROW - the error is allowed to propagate, RETRY - The exception causes the Connect framework to retry the message. The number of retries is set by connect.ems.max.retries. All errors will be logged automatically, even if the code swallows them.
Type: string Default: RETRY Valid Values: [continue, throw, retry] (case insensitive) Importance: high -
connect.ems.max.retries
The maximum number of times to re-attempt to write the records before the task is marked as failed.
Type: int Default: 10 Valid Values: [1,...] Importance: medium -
connect.ems.retry.interval
The time in milliseconds between retries.
Type: long Default: 15000 Valid Values: [1000,...] Importance: medium -
connect.ems.error.policy.continue.on.invalid.record
If set to 'true', connector will continue when invalid input errors occur. Invalid records will be sent to the DLQ, if configured. Please note that this does not cover errors that happen in Converters during de-serialization.
Type: boolean Default: false Valid Values: Importance: medium -
connect.ems.remote.log.enable
activate/deactivate remote EMS logging
Type: boolean Default: true Valid Values: Importance: low -
connect.ems.remote.log.endpoint
The URL of the EMS extractor events endpoint the appender will submit log entries to. Only needed when `connect.ems.uploader=CBP`.
Type: string Default: null Valid Values: (Optional) Value must be a valid URL Importance: low -
connect.ems.remote.log.flush.interval.ms
The frequency at which the appender will try to flush the buffered log entries
Type: long Default: 60000 (1 minute) Valid Values: [1,...,9223372036854775807] Importance: low -
connect.ems.remote.log.flush.min.records
The minimum amount of events that need to be in the buffer before this can be flushed.
Type: int Default: 50 Valid Values: [1,...,2147483647] Importance: low -
connect.ems.remote.log.flush.max.records
The maximum amount of buffer entries before the appender will start to drop log entries.
Type: int Default: 1000 Valid Values: [1,...,2147483647] Importance: low -
connect.ems.remote.log.flush.grace.ms
The maximum time (in milliseconds) the appender should wait in between two consecutive flushes of log entries.
Type: long Default: 60000 (1 minute) Valid Values: [1,...,9223372036854775807] Importance: low -
connect.ems.remote.log.heartbeat.interval.ms
The approximate frequency at which the EMS heartbeat log message should be produced
Type: long Default: 180000 (3 minutes) Valid Values: [1,...,9223372036854775807] Importance: low -
connect.ems.remote.log.consecutive.zero.put.before.disable
The number of time the connector underlying consumer is called with a zero records before the telemetry gets disabled
Type: int Default: 10 Valid Values: [1,...,2147483647] Importance: low -
connect.ems.remote.log.access.key
The access key to be used to authenticate to the telemetry endpoint. Only needed when `connect.ems.uploader=CBP`
Type: password Default: null Valid Values: Importance: low -
connect.ems.uploader.s3.access.key
The AWS access key
Type: password Default: null Valid Values: Importance: high -
connect.ems.uploader.s3.access.secret
The AWS access secret
Type: password Default: null Valid Values: Importance: high -
connect.ems.uploader.s3.endpoint
The AWS S3 endpoint
Type: string Default: null Valid Values: (Optional) Value must be a valid URL Importance: high -
connect.ems.uploader.s3.region
The AWS region. Ignored if you are using https://**.celonis.cloud/api/data-ingestion/continuous as endpoint
Type: string Default: eu-central-1 Valid Values: Importance: high -
connect.ems.uploader.s3.bucket
The name of the S3 Bucket
Type: string Default: null Valid Values: Importance: high -
connect.ems.uploader.s3.object.key.prefix
Prefix of the S3 object keys
Type: string Default: null Valid Values: Importance: high -
connect.ems.target.template
Target table. This can be a string containing source topic and partition, that will be replaced with actual values. Example: "table_{topic}_{partition}"
Type: string Default: {topic} Valid Values: Importance: high -
connect.ems.uploader.type
How to upload the files. Available values: cbp, s3, noop.
Type: string Default: s3 Valid Values: [cbp, s3, noop] (case insensitive) Importance: high -
connect.ems.parquet.row.group.size.bytes
The number of bytes of the row groups in the Parquet file. Default is 16777216.
Type: int Default: 16777216 (16 mebibytes) Valid Values: Importance: medium -
connect.ems.debug.keep.parquet.files
For debug purpose, set the setting to true for the connector to keep the local files after an upload. Default is false.
Type: boolean Default: false Valid Values: Importance: low -
connect.ems.inmemfs.enable
Rather than writing to the host file system, buffer parquet data files in memory
Type: boolean Default: false Valid Values: Importance: medium -
connect.ems.tmp.dir
The folder to store the temporary files as it accumulates data. If not specified then [/var/folders/_9/59m1tvp57pl816byf01cc71w0000gp/T//ems] is being used.
Type: string Default: /var/folders/_9/59m1tvp57pl816byf01cc71w0000gp/T//ems Valid Values: Importance: low -
connect.ems.obfuscation.method
The type of obfuscation to apply. Available methods: NONE,FIX,SHA1,SHA512,RANDOM
Type: string Default: NONE Valid Values: [sha512, fix, random, none] (case insensitive) Importance: low -
connect.ems.obfuscation.paths
A comma-separated list of paths to apply obfuscation to. The format is: 'topicName:field1.field2...fieldN'. topicName may be omitted to match path of all topics. An additional array selector may be used to obfuscate fields nested in arrays, like in 'users[].password,users[].creditCard'
Type: list Default: "" Valid Values: Importance: low -
connect.ems.obfuscation.salt
An optional salt string to use during obfuscation. Only applied for SHA1 and SHA512 methods.
Type: string Default: "" Valid Values: Importance: low -
connect.ems.obfuscation.length
Optional length of obfuscated output. Only used for FIX and RANDOM obfuscation methods
Type: int Default: 5 Valid Values: [0,...,2147483647] Importance: low -
connect.ems.http.proxy.host
Proxy host including scheme (required) and port (optional), eg https://my-proxy.com or http://my-proxy.com:8080
Type: string Default: null Valid Values: Importance: medium -
connect.ems.http.proxy.auth.username
Proxy BASIC auth username
Type: string Default: null Valid Values: Importance: medium -
connect.ems.http.proxy.auth.password
Proxy BASIC auth password
Type: password Default: null Valid Values: Importance: medium -
connect.ems.http.proxy.non.proxy.hosts
A comma separated list of non-proxied hosts
Type: list Default: "" Valid Values: Importance: medium -
connect.ems.uploader.decimal.conversion.type
The conversion strategy to use for persisting decimal values into EMS. Available values (plainString, decimal)
Type: string Default: plainString Valid Values: [plainstring, double] (case insensitive) Importance: low -
connect.ems.commit.size.bytes
The accumulated file maximum size before it is uploaded to EMS. It cannot be less than 1MB.
Type: long Default: 25000000 Valid Values: [100000,...] Importance: high -
connect.ems.commit.records
The maximum number of records in the accumulated file before it is uploaded to EMS.
Type: long Default: 10000 Valid Values: [1,...] Importance: high -
connect.ems.commit.interval.ms
The time interval in milliseconds to upload the data to EMS if the other two commit policies are not yet applicable.
Type: long Default: 180000 (3 minutes) Valid Values: [1000,...] Importance: high -
connect.ems.embed.kafka.metadata
Embed Kafka metadata such as partition, offset and timestamp as additional record fields.
Type: boolean Default: true Valid Values: Importance: medium