• 上海合作组织青岛峰会举行 习近平主持会议并发表重要讲话 2019-05-17
  • 世界上海拔最高的无人超市将落户拉萨 2019-05-17
  • 阳泉首次颁布地方实体性法规 两部法规将于7月1日起实施 2019-05-14
  • 空调有异味是为什么 这是你可能忽略的健康隐患 2019-05-14
  • “黑社会老大”电话敲诈“拿钱消灾” 警方揭骗局 2019-05-12
  • 习近平欢迎出席上海合作组织青岛峰会的外方领导人 2019-04-27
  • 黄家驹蜡像北京揭幕 黄家强动容追忆哥哥 2019-04-27
  • 高清:创意十足!杭州萌娃毕业照留下成长足迹 2019-04-08
  • 上交所:存托凭证上市首日不实行价格涨跌幅限制 2019-04-08
  • 谁拆迁都是一样一片狼藉,拆迁时欢天喜地,回迁时垂头丧气。拆迁者得到好处,被拆者哭天喊地。 2019-04-07
  • 网上支付出现异常如何解决? 2019-04-07
  • [微笑]咱建议进一步提高挂号费标准,最起码也得200元起步…… 2019-03-30
  • 一语惊坛(5月30日):磋商,不等于反复折腾。 2019-03-16
  • 快乐十分容易出的5个号:说说 MQ 之 Kafka(二)

    Kafka 的工具和编程接口

    Kafka 的工具

    Kafka 提供的工具还是比较全的,bin/?目录下的工具有以下一些,

    bin/connect-distributed.sh     bin/kafka-consumer-offset-checker.sh     bin/kafka-replica-verification.sh   bin/kafka-verifiable-producer.sh
    bin/connect-standalone.sh      bin/kafka-consumer-perf-test.sh          bin/kafka-run-class.sh              bin/zookeeper-security-migration.sh
    bin/kafka-acls.sh              bin/kafka-mirror-maker.sh                bin/kafka-server-start.sh           bin/zookeeper-server-start.sh
    bin/kafka-configs.sh           bin/kafka-preferred-replica-election.sh  bin/kafka-server-stop.sh            bin/zookeeper-server-stop.sh
    bin/kafka-console-consumer.sh  bin/kafka-producer-perf-test.sh          bin/kafka-simple-consumer-shell.sh  bin/zookeeper-shell.sh
    bin/kafka-console-producer.sh  bin/kafka-reassign-partitions.sh         bin/kafka-topics.sh
    bin/kafka-consumer-groups.sh   bin/kafka-replay-log-producer.sh         bin/kafka-verifiable-consumer.sh

    我常用的命令有以下几个,

    bin/kafka-server-start.sh -daemon config/server.properties &
    bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
    bin/kafka-topics.sh --list --zookeeper 192.168.232.23:2181
    bin/kafka-topics.sh --delete --zookeeper 192.168.232.23:2181 --topic topic1
    bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1
    bin/kafka-console-consumer.sh --zookeeper 192.168.232.23:2181 --topic topic1 --from-beginning
    bin/kafka-console-producer.sh --broker-list 192.168.232.23:9092 --topic topic1

    kafka-server-start.sh?是用于 Kafka 的 Broker 启动的,主要就一个参数?config/server.properties,该文件中的配置项待会再说.还有一个?-daemon?参数,这个是将 Kafka 放在后台用守护进程的方式运行,如果不加这个参数,Kafka 会在运行一段时间后自动退出,据说这个是 0.10.0.0 版本才有的问题?5。kafka-topics.sh?是用于管理 Topic 的工具,我主要用的?--describe、--list、--delete、--create?这4个功能,上述的例子基本是不言自明的,--replication-factor 3、--partitions 2?这两个参数分别表示3个副本(含 Leader),和2个分区。kafka-console-consumer.sh?和?kafka-console-producer.sh?是生产者和消费者的简易终端工具,在调试的时候比较有用,我常用的是?kafka-console-consumer.sh。我没有用 Kafka 自带的 zookeeper,而是用的 zookeeper 官方的发布版本 3.4.8,端口是默认2181,与 Broker 在同一台机器上。

    下面说一下 Broker 启动的配置文件?config/server.properties,我在默认配置的基础上,修改了以下一些,

    broker.id=0
    listeners=PLAINTEXT://192.168.232.23:9092
    log.dirs=/tmp/kafka-logs
    delete.topic.enable=true

    broker.id?是 Kafka 集群中的 Broker ID,不可重复,我在多副本的实验中,将他们分别设置为0、1、2;listeners?是 Broker 监听的地址,默认是监听?localhost:9092,因为我不是单机实验,所以修改为本机局域网地址,当然,如果要监听所有地址的话,也可以设置为?0.0.0.0:9092,多副本实验中,将监听端口分别设置为 9092、9093、9094;log.dirs?是 Broker 的 log 的目录,多副本实验中,不同的 Broker 需要有不同的 log 目录;delete.topic.enable?设为 true 后,可以删除 Topic,并且连带 Topic 中的消息也一并删掉,否则,即使调用?kafka-topics.sh --delete?也无法删除 Topic,这是一个便利性的设置,对于开发环境可以,生产环境一定要设为 false(默认)。实验中发现, 如果有消费者在消费这个 Topic,那么也无法删除,还是比较安全的。

    剩下的工具多数在文档中也有提到。如果看一下这些脚本的话,会发现多数脚本的写法都是一致的,先做一些参数的校验,最后运行?exec $base_dir/kafka-run-class.sh XXXXXXXXX "[email protected]",可见,这些工具都是使用运行 Java Class 的方式调用的。

    Kafka 的 Java API

    在编程接口方面,官方提供了 Scala 和 Java 的接口,社区提供了更多的其他语言的接口,基本上,无论用什么语言开发,都能找到相应的 API。下面说一下 Java 的 API 接口。

    生产者的 API 只有一种,相对比较简单,代码如下,

    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    public class SimpleProducerDemo {
        public static void main(String[] args){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094");
            props.put("zookeeper.connect", "192.168.232.23:2181");
            props.put("client.id", "DemoProducer");
            props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);
            String topic = "topic1";
            Boolean isAsync = false;
            int messageNo = 1;
            while (true) {
                String messageStr = "Message_" + String.format("%05d",messageNo);
                long startTime = System.currentTimeMillis();
                if (isAsync) { // Send asynchronously
                    producer.send(new ProducerRecord<>(topic,
                            messageNo,
                            messageStr), new DemoCallBack(startTime, messageNo, messageStr));
                } else { // Send synchronously
                    try {
                        producer.send(new ProducerRecord<>(topic,
                                messageNo,
                                messageStr)).get();
                        System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                ++messageNo;
            }
        }
    }
    class DemoCallBack implements Callback {
        private final long startTime;
        private final int key;
        private final String message;
        public DemoCallBack(long startTime, int key, String message) {
            this.startTime = startTime;
            this.key = key;
            this.message = message;
        }
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (metadata != null) {
                System.out.println(
                        "Send     message: (" + String.format("%05d",key) + ", " + message + ") at offset "+ metadata.offset() +
                                " to partition(" + metadata.partition() +
                                ") in " + elapsedTime + " ms");
            } else {
                exception.printStackTrace();
            }
        }
    }

    上例中使用了同步和异步发送两种方式。在多副本的情况下,如果要指定同步复制还是异步复制,可以使用?acks?参数,详细参考官方文档 Producer Configs 部分的内容;在多分区的情况下,如果要指定发送到哪个分区,可以使用?partitioner.class?参数,其值是一个实现了?org.apache.kafka.clients.producer.Partitioner?接口的类,用于根据不同的消息指定分区6。消费者的 API 有几种,比较新的 API 如下,

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.util.Arrays;
    import java.util.Properties;
    public class SimpleConsumer {
        public static void main(String[] args){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.232.23:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3"));
            while (true) {
                ConsumerRecords<Integer, String> records = consumer.poll(100);
                for (ConsumerRecord<Integer, String> record : records) {
                    System.out.println("Received message: (" + String.format("%05d", record.key()) + ", " + record.value() + ") at offset " + record.offset());
                }
            }
        }
    }

    消费者还有旧的 API,比如?Consumer?和?SimpleConsumer?API,这些都可以从 Kafka 代码的 kafka-example 中找到,上述的两个例子也是改写自 kafka-example。使用新旧 API 在功能上都能满足消息收发的需要,但新 API 只依赖?kafka-clients,打包出来的 jar 包会小很多,以我的测试,新 API 的消费者 jar 包大约有 2M 左右,而旧 API 的消费者 jar 包接近 16M。

    其实,Kafka 也提供了按分区订阅,可以一次订阅多个分区?TopicPartition[];也支持手动提交 offset,需要调用?consumer.commitSync。

    Kafka 似乎没有公开 Topic 创建以及修改的 API(至少我没有找到),如果生产者向 Broker 写入的 Topic 是一个新 Topic,那么 Broker 会创建这个 Topic。创建的过程中会使用默认参数,例如,分区个数,会使用 Broker 配置中的?num.partitions?参数(默认1);副本个数,会使用?default.replication.factor?参数。但是通常情况下,我们会需要创建自定义的 Topic,那官方的途径是使用 Kafka 的工具。也有一些非官方的途径?7,例如可以这样写,

    String[] options = new String[]{
            "--create",
            "--zookeeper",
            "192.168.232.23:2181",
            "--partitions",
            "2",
            "--replication-factor",
            "3",
            "--topic",
            "topic1"
    };
    TopicCommand.main(options);

    但是这样写有一个问题,在执行完?TopicCommand.main(options);?之后,系统会自动退出,原因是执行完指令之后,会调用?System.exit(exitCode);?系统直接退出。这样当然不行,我的办法是,把相关的执行代码挖出来,写一个 TopicUtils 类,如下,

    import joptsimple.OptionSpecBuilder;
    import kafka.admin.TopicCommand;
    import kafka.admin.TopicCommand$;
    import kafka.utils.ZkUtils;
    import org.apache.kafka.common.security.JaasUtils;
    import scala.runtime.Nothing$;
    public class TopicUtils {
        // from: //blog.csdn.net/changong28/article/details/39325079
        // from: //www.cnblogs.com/davidwang456/p/4313784.html
        public static void createTopic(){
            String[] options = new String[]{
                    "--create",
                    "--zookeeper",
                    KafkaProperties.ZOOKEEPER_URL,
                    "--partitions",
                    "2",
                    "--replication-factor",
                    "3",
                    "--topic",
                    KafkaProperties.TOPIC
            };
    //        TopicCommand.main(options);
            oper(options);
        }
        public static void listTopic(){
            String[] options = new String[]{
                    "--list",
                    "--zookeeper",
                    KafkaProperties.ZOOKEEPER_URL
            };
    //        TopicCommand.main(options);
            oper(options);
        }
        public static void deleteTopic(){
            String[] options = new String[]{
                    "--delete",
                    "--zookeeper",
                    KafkaProperties.ZOOKEEPER_URL,
                    "--topic",
                    KafkaProperties.TOPIC
            };
    //        TopicCommand.main(options);
            oper(options);
        }
        public static void describeTopic(){
            String[] options = new String[]{
                    "--describe",
                    "--zookeeper",
                    KafkaProperties.ZOOKEEPER_URL,
                    "--topic",
                    KafkaProperties.TOPIC
            };
    //        TopicCommand.main(options);
            oper(options);
        }
        public static void main(String[] args){
            listTopic();
            createTopic();
            listTopic();
            describeTopic();
            deleteTopic();
            try {
                Thread.sleep(3*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            listTopic();
        }
        /** copied & modified from kafka.admin.TopicCommand$.main
         *
         * @param args
         */
        public static void oper(String args[]){
            try {
            TopicCommand$ topicCommand$ = TopicCommand$.MODULE$;
            final TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(args);
            if(args.length == 0) {
                throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Create, delete, describe, or change a topic.");
            } else {
                int actions =0;
                OptionSpecBuilder[] optionSpecBuilders = {opts.createOpt(), opts.listOpt(), opts.alterOpt(), opts.describeOpt(), opts.deleteOpt()};
                for (OptionSpecBuilder temp:optionSpecBuilders){
                    if (opts.options().has(temp)) {
                        actions++;
                    }
                }
                if(actions != 1) {
                    throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Command must include exactly one action: --list, --describe, --create, --alter or --delete");
                } else {
                    opts.checkArgs();
                    ZkUtils zkUtils = kafka.utils.ZkUtils$.MODULE$.apply((String)opts.options().valueOf(opts.zkConnectOpt()), 30000, 30000, JaasUtils.isZkSecurityEnabled());
                    byte exitCode = 0;
                    try {
                        try {
                            if(opts.options().has(opts.createOpt())) {
                                topicCommand$.createTopic(zkUtils, opts);
                            } else if(opts.options().has(opts.alterOpt())) {
                                topicCommand$.alterTopic(zkUtils, opts);
                            } else if(opts.options().has(opts.listOpt())) {
                                topicCommand$.listTopics(zkUtils, opts);
                            } else if(opts.options().has(opts.describeOpt())) {
                                topicCommand$.describeTopic(zkUtils, opts);
                            } else if(opts.options().has(opts.deleteOpt())) {
                                topicCommand$.deleteTopic(zkUtils, opts);
                            }
                        } catch (final Throwable var12) {
                            scala.Predef$.MODULE$.println((new StringBuilder()).append("Error while executing topic command : ").append(var12.getMessage()).toString());
                            System.out.println(var12);
                            exitCode = 1;
                            return;
                        }
                    } finally {
                        zkUtils.close();
    //                    System.exit(exitCode);
                    }
                }
            }
            } catch (Nothing$ nothing$) {
                nothing$.printStackTrace();
            }
        }
    }

    以上的?oper?方法改写自?kafka.admin.TopicCommand$.main?方法??梢苑⑾终獠糠执敕浅9忠?,原因是?TopicCommand$?是 Scala 写的,再编译成 Java class 字节码,然后我根据这些字节码反编译得到 Java 代码,并以此为基础进行修改,等于是我在用 Java 的方式改写 Scala 的代码,难免会觉得诡异。当然,这种写法用在生产环境的话是不太合适的,因为调用的?topicCommand$.createTopic?等方法都没有抛出异常,例如参数不合法的情况,而且也没有使用 log4j 之类的 log 库,只是用?System.out.println?这样的方法屏显,在出现错误的时候,比较难以定位。

    参考文章

    1. //kafka.apache.org/documentation.html?
    2. //www.jianshu.com/p/453c6e7ff81c?
    3. //www.infoq.com/cn/author/%E9%83%AD%E4%BF%8A#文章?
    4. //developer.51cto.com/art/201501/464491.htm?
    5. https://segmentfault.com/q/1010000004292925?
    6. //www.cnblogs.com/gnivor/p/5318319.html?
    7. //www.cnblogs.com/davidwang456/p/4313784.html?
    8. //www.jianshu.com/p/8689901720fd?
    9. //zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/?
    10. //www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/?


    相关文章

    发表评论

    Comment form

    (*) 表示必填项

    还没有评论。

    广东好彩36开奖结果
    返回顶部
  • 上海合作组织青岛峰会举行 习近平主持会议并发表重要讲话 2019-05-17
  • 世界上海拔最高的无人超市将落户拉萨 2019-05-17
  • 阳泉首次颁布地方实体性法规 两部法规将于7月1日起实施 2019-05-14
  • 空调有异味是为什么 这是你可能忽略的健康隐患 2019-05-14
  • “黑社会老大”电话敲诈“拿钱消灾” 警方揭骗局 2019-05-12
  • 习近平欢迎出席上海合作组织青岛峰会的外方领导人 2019-04-27
  • 黄家驹蜡像北京揭幕 黄家强动容追忆哥哥 2019-04-27
  • 高清:创意十足!杭州萌娃毕业照留下成长足迹 2019-04-08
  • 上交所:存托凭证上市首日不实行价格涨跌幅限制 2019-04-08
  • 谁拆迁都是一样一片狼藉,拆迁时欢天喜地,回迁时垂头丧气。拆迁者得到好处,被拆者哭天喊地。 2019-04-07
  • 网上支付出现异常如何解决? 2019-04-07
  • [微笑]咱建议进一步提高挂号费标准,最起码也得200元起步…… 2019-03-30
  • 一语惊坛(5月30日):磋商,不等于反复折腾。 2019-03-16