查看Topic


   private static void testCreateTopic() {

    /**
     * bin/kafka-topics.sh --create \
     *     --bootstrap-server localhost:9092 \
     *     --replication-factor 1 \
     *     --partitions 1 \
     *     --topic streams-wordcount-output \
     *     --config cleanup.policy=compact
     */
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        try (Admin admin = Admin.create(props)) {
            String topicName = "create-topic-with-java";
            int partitions = 1;
            short replicationFactor = 1;
            // Create a compacted topic
            CreateTopicsResult result = admin.createTopics(Collections.singleton(
                    new NewTopic(topicName, partitions, replicationFactor)
                            .configs(Collections.singletonMap(TopicConfig.CLEANUP_POLICY_CONFIG,
                                    TopicConfig.CLEANUP_POLICY_COMPACT))));

            // Call values() to get the result for a specific topic
            KafkaFuture<Void> future = result.values().get(topicName);

            // Call get() to block until the topic creation is complete or has failed
            // if creation failed the ExecutionException wraps the underlying cause.
            future.get();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

创建Topic

    /**
     * bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
     */
    private static void testTopicDescribe() {

        /*
        bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
        */
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        try (Admin admin = Admin.create(props)) {
             ListTopicsResult result = admin.listTopics();
            result.namesToListings().whenComplete(((stringTopicListingMap, throwable) -> {

                for (Map.Entry<String, TopicListing> entity:stringTopicListingMap.entrySet()) {
                    System.out.println(entity.getKey());
                    System.out.println(entity.getValue().topicId());
                    System.out.println(entity.getValue().name());
                    System.out.println(entity.getValue());

                    System.out.println("------------------------------------------");
                }
            }));
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }

    }

删除Topic


    private static void testTopicDelete() {


        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        try (Admin admin = Admin.create(props)) {

            TopicCollection topicCollection = TopicCollection.ofTopicNames(
                    Collections.singleton("create-topic-with-java"));
            DeleteTopicsResult result = admin.deleteTopics(topicCollection);
            result.all().whenComplete((r,t) ->{
               if (t != null) {
                   t.printStackTrace();
               }
                System.out.println("delete done!");
            });
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }

        
    }

基本消费者代码



    private static void testAutoConsumer(){

        // 配置属性
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // 订阅
            consumer.subscribe(Arrays.asList("quickstart-events"));
            while (true) {
                // 拉取
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }

    }

更详细的消费控制

 private static void testManualConsumer() {
        
        Properties props = new Properties();

        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("quickstart-events"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                for (TopicPartition part:collection) {
                    // 获取offset 可以自行处理
                    Long currOffset = consumer.position(part);
                    // we can save the offset in own db
                }
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {

//                consumer.seekToBeginning(collection);
                for (TopicPartition part:collection) {
                    // get offset from own db
                    // 可以从自己的系统取得offset 以达将offset与业务系统进行关联
                    Long currOffset = 0L;
                    consumer.seek(part,currOffset);
                }

            }
        });
        // 按照批去
        final int minBatchSize = 10;

        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                printThis(buffer);
                // 手动提交
                consumer.commitAsync();
                buffer.clear();
            }
        }

    }

指定分区

         // 指定分区
        String topic = "foo";
        TopicPartition partition0 = new TopicPartition(topic, 0);
        TopicPartition partition1 = new TopicPartition(topic, 1);
        consumer.assign(Arrays.asList(partition0, partition1));

基本生产者


        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        try(Producer<String, String> producer = new KafkaProducer<>(props)){
            for (int i = 0; i < 100; i++) {
                producer.send(new ProducerRecord<String, String>("quickstart-events", Integer.toString(i), Integer.toString(i)));
            }
            producer.flush();
        }

事务

    private static void testTransactions(){

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("transactional.id", "my-transactional-id");
        try(Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
            producer.initTransactions();

            producer.beginTransaction();
            for (int i = 0;i<10;i++){
                producer.send(new ProducerRecord<>("quickstart-events",Integer.toString(i),Integer.toString(i)));
            }
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {

        } catch (KafkaException e) {

        }
        
    }

Stream 基本结构

  // 属性配置
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // stream 构建器
        final StreamsBuilder builder = new StreamsBuilder();

        // builder.stream("Source-Topic").xxx.xxx.to("Sink-Topic")
        builder.<String, String>stream("streams-plaintext-input")
               .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
               .groupBy((key, value) -> value)
               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
               .toStream()
               .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

Processor



public class WorldCountProcessor implements Processor<String,String,String,String> {
    private KeyValueStore<String,Integer> keyValueStore;

    @Override
    public void process(Record<String,String> record) {
        
        final String[] words =
                record.value().toUpperCase(Locale.ROOT).split("\\W+");
        for (final String word:words) {
            final Integer oldValue = keyValueStore.get(word);
            if (oldValue == null) {
                keyValueStore.put(word,1);
            }else {
                keyValueStore.put(word,oldValue + 1);
            }
        }

    }

    @Override
    public void close() {

    }

    @Override
    public void init(ProcessorContext context) {
        context.schedule(Duration.ofSeconds(1),
                PunctuationType.STREAM_TIME,
                timestamp ->{
                    try(final KeyValueIterator<String,Integer> iter = keyValueStore.all()) {
                        while (iter.hasNext()) {
                            final KeyValue<String,Integer> entry = iter.next();
                            context.forward(new Record(entry.key,entry.value.toString(),timestamp));
                        }
                    }
                });
        keyValueStore = context.getStateStore("Counts");
    }
}

Topology use Processor

   Topology builder = new Topology();
        final StoreBuilder<KeyValueStore<String, Long>> countsStoreBuilder =
                Stores
                        .keyValueStoreBuilder(
                                Stores.persistentKeyValueStore("Counts"),
                                Serdes.String(),
                                Serdes.Long()
                        );
        builder.addSource("Source","source-topic")
                .addProcessor("Processor", () -> new WorldCountProcessor(),"Source")
                .addStateStore(countsStoreBuilder,"Process")
                .addSink("Sink","sink-topic","Process");

Kafka 设计思想

https://kafka.apache.org/documentation/#design

Poll or Push

Message Format

log 文件

log_file

Test Driver