5.Java Kafka Demo

ParkJun 1年前 ⋅ 452 阅读

直接开始吧

首先maven引入jar包

<!-- kafka -->
<dependency>
 	<groupId>org.apache.kafka</groupId>
	 <artifactId>kafka_2.12</artifactId>
	 <version>1.0.0</version>
		 <exclusions>
			<exclusion>
				<groupId>org.apache.zookeeper</groupId>
				<artifactId>zookeeper</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
			<exclusion>
				<groupId>log4j</groupId>
				<artifactId>log4j</artifactId>
			</exclusion>
		</exclusions>
		<scope>provided</scope> 
	</dependency>


	<dependency>
   		 <groupId>org.apache.kafka</groupId>
   		 <artifactId>kafka-clients</artifactId>
  		  <version>1.0.0</version>
	</dependency>
	
	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-streams</artifactId>
	    <version>1.0.0</version>
	</dependency>

新建Producer的Class

public class KafkaProducerTest implements Runnable {
private final KafkaProducer<String, String> producer;
private final String topic;


public KafkaProducerTest(String topicName) {
	Properties props = new Properties();
	props.put("bootstrap.servers", "x.x.x.x:9092");
	//acks=0:如果设置为0,生产者不会等待kafka的响应。
	//acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
	//acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这		是最强的可用性保证。
	props.put("acks", "all");
	//配置为大于0的值的话,客户端会在消息发送失败时重新发送。
	props.put("retries", 0);
	//当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。
	props.put("batch.size", 16384);
	props.put("key.serializer", StringSerializer.class.getName());
	props.put("value.serializer", StringSerializer.class.getName());
	this.producer = new KafkaProducer<String, String>(props);
	this.topic = topicName;
}

public void run() {
	int messageNo = 1;
	try {
		for(;;) {
        
			String messageStr="你好,这是第"+messageNo+"条数据";
			producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));			
			messageNo++;
            Thread.sleep(1000);
			}
		}
	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		producer.close();
	}
}

public static void main(String args[]) {
	KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST");
	Thread thread = new Thread(test);
	thread.start();
}

新建Consumer的Class

    public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private  String topic;
    private static final String GROUPID = "groupA";



    public KafkaConsumerTest(String topicName) {
        Properties props = new Properties();
        //kafka消费的的地址
        props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
        //组名 不同组名可以重复消费
        props.put("group.id", GROUPID);
        //是否自动提交
        props.put("enable.auto.commit", "true");
        //从poll(拉)的回话处理时长
        props.put("auto.commit.interval.ms", "1000");
        //超时时间
        props.put("session.timeout.ms", "30000");
        //一次最多拉取的条数
        props.put("max.poll.records", 1000);
//		earliest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 
//		latest 
//		当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 
//		none 
//		topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        props.put("auto.offset.reset", "earliest");
        //序列化
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        //订阅主题列表topic
        this.consumer.subscribe(Arrays.asList(topic));

    }

    public void run() {
        int messageNo = 1;
        System.out.println("---------开始消费---------");
        try {
            for (;;) {
                    msgList = consumer.poll(10);//一次拉取10条
                    if(null!=msgList&&msgList.count()>0){
                    for (ConsumerRecord<String, String> record : msgList) {                                             
                            System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                            messageNo++;
                    }
                }else{	
                    Thread.sleep(1000);
                }
            }		
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    public static void main(String args[]) {
        KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST");
        Thread thread1 = new Thread(test1);
        thread1.start();
    }
}

到此一个最简单的demo 就可以运行起来了,当然,看起来简单,内部还有很多深层次的东西,我们会在后续谈到!

这些配置可以在

org.apache.kafka.clients.consumer.ConsumerConfig 以及 org.apache.kafka.clients.producer.ProducerConfig 上

org.apache.kafka.clients.producer.ProducerConfig 下 中找到

本文归作者所有,未经作者允许,不得转载