Stand V0.100.10 ((FREE))
Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate message.
Next, we'll start two connectors running in standalone mode, which means they run in a single, local, dedicatedprocess. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connectprocess, containing common configuration such as the Kafka brokers to connect to and the serialization format for data.The remaining configuration files each specify a connector to create. These files include a unique connector name, the connectorclass to instantiate, and any other configuration required by the connector.
During startup you'll see a number of log messages, including some indicating that the connectors are being instantiated.Once the Kafka Connect process has started, the source connector should start reading lines from test.txt andproducing them to the topic connect-test, and the sink connector should start reading messages from the topic connect-testand write them to the file test.sink.txt. We can verify the data has been delivered through the entire pipelineby examining the contents of the output file:
Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.Potential performance impact following upgrade to 0.10.0.0 The message format in 0.10.0 includes a new timestamp field and uses relative offsets for compressed messages. The on disk message format can be configured through log.message.format.version in the server.properties file. The default on-disk message format is 0.10.0. If a consumer client is on a version before 0.10.0.0, it only understands message formats before 0.10.0. In this case, the broker is able to convert messages from the 0.10.0 format to an earlier format before sending the response to the consumer on an older version. However, the broker can't use zero-copy transfer in this case. Reports from the Kafka community on the performance impact have shown CPU utilization going from 20% before to 100% after an upgrade, which forced an immediate upgrade of all clients to bring performance back to normal. To avoid such message conversion before consumers are upgraded to 0.10.0.0, one can set log.message.format.version to 0.8.2 or 0.9.0 when upgrading the broker to 0.10.0.0. This way, the broker can still use zero-copy transfer to send the data to the old consumers. Once consumers are upgraded, one can change the message format to 0.10.0 on the broker and enjoy the new message format that includes new timestamp and improved compression. The conversion is supported to ensure compatibility and can be useful to support a few apps that have not updated to newer clients yet, but is impractical to support all consumer traffic on even an overprovisioned cluster. Therefore, it is critical to avoid the message conversion as much as possible when brokers have been upgraded but the majority of clients have not.
The exact binary format for messages is versioned and maintained as a standard interface so message sets can be transferred between producer, broker, and client without recopying or conversion when desirable. This format is as follows:
Configure custom quota for (user=user1, client-id=clientA): > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA Updated config for entity: user-principal 'user1', client-id 'clientA'. Configure custom quota for user=user1: > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 Updated config for entity: user-principal 'user1'. Configure custom quota for client-id=clientA: > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-name clientA Updated config for entity: client-id 'clientA'. It is possible to set default quotas for each (user, client-id), user or client-id group by specifying --entity-default option instead of --entity-name. Configure default client-id quota for user=userA: > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-default Updated config for entity: user-principal 'user1', default client-id. Configure default quota for user: > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-default Updated config for entity: default user-principal. Configure default quota for client-id: > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-default Updated config for entity: default client-id. Here's how to describe the quota for a given (user, client-id): > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048 Describe quota for a given user: > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048 Describe quota for a given client-id: > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048 If entity name is not specified, all entities of the specified type are described. For example, describe all users: > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048 Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048 Similarly for (user, client): > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-type clients Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048 Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048 It is possible to set default quotas that apply to all client-ids by setting these configs on the brokers. These properties are applied only if quota overrides or defaults are not configured in Zookeeper. By default, each client-id receives an unlimited quota. The following sets the default quota per producer and consumer client-id to 10MB/sec. quota.producer.default=10485760 quota.consumer.default=10485760 Note that these properties are being deprecated and may be removed in a future release. Defaults configured using kafka-configs.sh take precedence over these properties. 6.2 Datacenters Some deployments will need to manage a data pipeline that spans multiple datacenters. Our recommended approach to this is to deploy a local Kafka cluster in each datacenter with application instances in each datacenter interacting only with their local cluster and mirroring between clusters (see the documentation on the mirror maker tool for how to do this). This deployment pattern allows datacenters to act as independent entities and allows us to manage and tune inter-datacenter replication centrally. This allows each facility to stand alone and operate even if the inter-datacenter links are unavailable: when this occurs the mirroring falls behind until the link is restored at which time it catches up. For applications that need a global view of all data you can use mirroring to provide clusters which have aggregate data mirrored from the local clusters in all datacenters. These aggregate clusters are used for reads by applications that require the full data set. This is not the only possible deployment pattern. It is possible to read from or write to a remote Kafka cluster over the WAN, though obviously this will add whatever latency is required to get the cluster. Kafka naturally batches data in both the producer and consumer so it can achieve high-throughput even over a high-latency connection. To allow this though it may be necessary to increase the TCP socket buffer sizes for the producer, consumer, and broker using the socket.send.buffer.bytes and socket.receive.buffer.bytes configurations. The appropriate way to set this is documented here. It is generally not advisable to run a single Kafka cluster that spans multiple datacenters over a high-latency link. This will incur very high replication latency both for Kafka writes and ZooKeeper writes, and neither Kafka nor ZooKeeper will remain available in all locations if the network between locations is unavailable. 6.3 Kafka Configuration Important Client Configurations The most important old Scala producer configurations control acks
sync vs async production
batch size (for async producers)
The most important new Java producer configurations control acks
The most important consumer configuration is the fetch size. All configurations are documented in the configuration section. A Production Server Config Here is an example production server configuration: # ZooKeeper zookeeper.connect=[list of ZooKeeper servers] # Log configuration num.partitions=8 default.replication.factor=3 log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).] # Other configurations broker.id=[An integer. Start with 0 and increment by 1 for each new broker.] listeners=[list of listeners] auto.create.topics.enable=false min.insync.replicas=2 queued.max.requests=[number of concurrent requests] Our client configuration varies a fair amount between different use cases. 6.4 Java Version From a security perspective, we recommend you use the latest released version of JDK 1.8 as older freely available versions have disclosed security vulnerabilities. LinkedIn is currently running JDK 1.8 u5 (looking to upgrade to a newer version) with the G1 collector. If you decide to use the G1 collector (the current default) and you are still on JDK 1.7, make sure you are on u51 or newer. LinkedIn tried out u21 in testing, but they had a number of problems with the GC implementation in that version. LinkedIn's tuning looks like this: -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 For reference, here are the stats on one of LinkedIn's busiest clusters (at peak): 60 brokers
50k partitions (replication factor 2)
800k messages/sec in
300 MB/sec inbound, 1 GB/sec+ outbound
The tuning looks fairly aggressive, but all of the brokers in that cluster have a 90% GC pause time of about 21ms, and they're doing less than 1 young GC per second. 6.5 Hardware and OS We are using dual quad-core Intel Xeon machines with 24GB of memory. You need sufficient memory to buffer active readers and writers. You can do a back-of-the-envelope estimate of memory needs by assuming you want to be able to buffer for 30 seconds and compute your memory need as write_throughput*30. The disk throughput is important. We have 8x7200 rpm SATA drives. In general disk throughput is the performance bottleneck, and more disks is better. Depending on how you configure flush behavior you may or may not benefit from more expensive disks (if you force flush often then higher RPM SAS drives may be better). OS Kafka should run well on any unix system and has been tested on Linux and Solaris. We have seen a few issues running on Windows and Windows is not currently a well supported platform though we would be happy to change that. It is unlikely to require much OS-level tuning, but there are two potentially important OS-level configurations: File descriptor limits: Kafka uses file descriptors for log segments and open connections. If a broker hosts many partitions, consider that the broker needs at least (number_of_partitions)*(partition_size/segment_size) to track all log segments in addition to the number of connections the broker makes. We recommend at least 100000 allowed file descriptors for the broker processes as a starting point. Max socket buffer size: can be increased to enable high-performance data transfer between data centers as described here. Disks and Filesystem We recommend using multiple drives to get good throughput and not sharing the same drives used for Kafka data with application logs or other OS filesystem activity to ensure good latency. You can either RAID these drives together into a single volume or format and mount each drive as its own directory. Since Kafka has replication the redundancy provided by RAID can also be provided at the application level. This choice has several tradeoffs. If you configure multiple data directories partitions will be assigned round-robin to data directories. Each partition will be entirely in one of the data directories. If data is not well balanced among partitions this can lead to load imbalance between disks. RAID can potentially do better at balancing load between disks (although it doesn't always seem to) because it balances load at a lower level. The primary downside of RAID is that it is usually a big performance hit for write throughput and reduces the available disk space. Another potential benefit of RAID is the ability to tolerate disk failures. However our experience has been that rebuilding the RAID array is so I/O intensive that it effectively disables the server, so this does not provide much real availability improvement. Application vs. OS Flush Management Kafka always immediately writes all data to the filesystem and supports the ability to configure the flush policy that controls when data is forced out of the OS cache and onto disk using the flush. This flush policy can be controlled to force data to disk after a period of time or after a certain number of messages has been written. There are several choices in this configuration. Kafka must eventually call fsync to know that data was flushed. When recovering from a crash for any log segment not known to be fsync'd Kafka will check the integrity of each message by checking its CRC and also rebuild the accompanying offset index file as part of the recovery process executed on startup. Note that durability in Kafka does not require syncing data to disk, as a failed node will always recover from its replicas. We recommend using the default flush settings which disable application fsync entirely. This means relying on the background flush done by the OS and Kafka's own background flush. This provides the best of all worlds for most uses: no knobs to tune, great throughput and latency, and full recovery guarantees. We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported. The drawback of using application level flush settings is that it is less efficient in its disk usage pattern (it gives the OS less leeway to re-order writes) and it can introduce latency as fsync in most Linux filesystems blocks writes to the file whereas the background flushing does much more granular page-level locking. In general you don't need to do any low-level tuning of the filesystem, but in the next few sections we will go over some of this in case it is useful. Understanding Linux OS Flush Behavior In Linux, data written to the filesystem is maintained in pagecache until it must be written out to disk (due to an application-level fsync or the OS's own flush policy). The flushing of data is done by a set of background threads called pdflush (or in post 2.6.32 kernels "flusher threads"). Pdflush has a configurable policy that controls how much dirty data can be maintained in cache and for how long before it must be written back to disk. This policy is described here. When Pdflush cannot keep up with the rate of data being written it will eventually cause the writing process to block incurring latency in the writes to slow down the accumulation of data. You can see the current state of OS memory usage by doing > cat /proc/meminfo The meaning of these values are described in the link above. Using pagecache has several advantages over an in-process cache for storing data that will be written out to disk: The I/O scheduler will batch together consecutive small writes into bigger physical writes which improves throughput. The I/O scheduler will attempt to re-sequence writes to minimize movement of the disk head which improves throughput. It automatically uses all the free memory on the machine Filesystem Selection Kafka uses regular files on disk, and as such it has no hard dependency on a specific filesystem. The two filesystems which have the most usage, however, are EXT4 and XFS. Historically, EXT4 has had more usage, but recent improvements to the XFS filesystem have shown it to have better performance characteristics for Kafka's workload with no compromise in stability. 041b061a72