Apache Kafka

Минимально необходимое для использования.

Опубликовано 18-12-2015
Эксперементы
Теги js, kafka, big data

Минимально необходимое для использования.

Сервер.

Список библиотек для сервера:

activation-1.1.jar
jline-0.9.94.jar
jopt-simple-3.2.jar
junit-3.8.1.jar
kafka_2.10-0.9.0.0.jar
kafka-clients-0.9.0.0.jar
log4j-1.2.15.jar
lz4-1.2.0.jar
mail-1.4.jar
metrics-core-2.2.0.jar
netty-3.7.0.Final.jar
scala-library-2.10.5.jar
slf4j-api-1.6.1.jar
slf4j-log4j12-1.7.6.jar
snappy-java-1.1.1.7.jar
zkclient-0.7.jar
zookeeper-3.4.6.jar

Координатор ZooKeeper.

Код запуска координатора ZooKeeper: 

function startZoo() {
  try {
    var startupProperties = new java.util.Properties();
    startupProperties.put("dataDir","/tmp/zookeeper");
    startupProperties.put("clientPort",2181);
    startupProperties.put("maxClientCnxns",0);

    var quorumConfiguration = new org.apache.zookeeper.server.quorum.QuorumPeerConfig();
    try {
      quorumConfiguration.parseProperties(startupProperties);
    } catch(e) {
      e.printStackTrace();
    }

    var zooKeeperServer = new org.apache.zookeeper.server.ZooKeeperServerMain();
    var configuration = new org.apache.zookeeper.server.ServerConfig();
    configuration.readFrom(quorumConfiguration);

    var Thread = Java.type("java.lang.Thread");

    function run(){
        try {
          zooKeeperServer.runFromConfig(configuration);
        } catch (e) {
          e.printStackTrace();
        }
    }
    var zth = new Thread(run);
    zth.start();
  } catch (ee) {
    ee.printStackTrace();
  }
}

Брокер.

Код запуска брокера Kafka:

function startKafka() {  
  try {  
      var props = new java.util.Properties();  
      props.setProperty("broker.id","0");  
      props.setProperty("listeners","PLAINTEXT://:9092");  
      props.setProperty("num.network.threads","3");  
      props.setProperty("num.io.threads","8");  
      props.setProperty("socket.send.buffer.bytes","102400");  
      props.setProperty("socket.receive.buffer.bytes","102400");  
      props.setProperty("socket.request.max.bytes","104857600");  
      props.setProperty("log.dirs","/tmp/kafka-logs");  
      props.setProperty("num.partitions","1");  
      props.setProperty("num.recovery.threads.per.data.dir","1");  
      props.setProperty("log.retention.hours","168");  
      props.setProperty("log.segment.bytes","1073741824");  
      props.setProperty("log.retention.check.interval.ms","300000");  
      props.setProperty("log.cleaner.enable","false");  
      props.setProperty("zookeeper.connect", "127.0.0.1");  
      props.setProperty("zookeeper.connection.timeout.ms","6000");  
      props.setProperty("host.name","sarjsheff.ru");  
      props.setProperty("advertised.host.name","sarjsheff.ru");  
      var KafkaConfig = Java.type("kafka.server.KafkaConfig");  
      var KafkaServerStartable = Java.type("kafka.server.KafkaServerStartable");  
      var cfg = new KafkaConfig(props);  
      var srv = new KafkaServerStartable(cfg);  
      srv.startup();  
  } catch (ex) {  
    ex.printStackTrace();  
  }  
}

Запуск.

Запускаем координатор и брокер:

startZoo();
startKafka();

Пример конфигурации логера (log4j.properties):

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.conversionPattern=%d{ABSOLUTE} %5p %t %c{1}:%M:%L - %m%n

Запускается командой:

jjs -J-Djava.class.path=.:lib/activation-1.1.jar:lib/jline-0.9.94.jar:lib/jopt-simple-3.2.jar:lib/junit-3.8.1.jar:lib/kafka_2.10-0.9.0.0.jar:lib/kafka-clients-0.9.0.0.jar:lib/log4j-1.2.15.jar:lib/lz4-1.2.0.jar:lib/mail-1.4.jar:lib/metrics-core-2.2.0.jar:lib/netty-3.7.0.Final.jar:lib/scala-library-2.10.5.jar:lib/slf4j-api-1.6.1.jar:lib/slf4j-log4j12-1.7.6.jar:lib/snappy-java-1.1.1.7.jar:lib/zkclient-0.7.jar:lib/zookeeper-3.4.6.jar main.js

Клиент.

Список библиотек для клиента:

kafka-clients-0.9.0.0.jar
lz4-1.2.0.jar
slf4j-api-1.7.6.jar
slf4j-simple-1.7.6.jar
snappy-java-1.1.1.7.jar

Публикация сообщений.

Код запуска поставщика сообщений:

  getProducer:function(server,p) {
    try{
      var props = p;
      if (typeof props == "undefined") {
        var props = new java.util.Properties();

        props.put("bootstrap.servers", (server?server:"localhost:9092"));
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("client.id", "console-producer");
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
      }
      var KafkaProducer = Java.type("org.apache.kafka.clients.producer.KafkaProducer");

      return new KafkaProducer(props);
    } catch (e) {
      e.printStackTrace();
    }
  }

Код публикации собщения в топик:

  send:function (p,topic,key,val,callb) {
    try {
      if (callb) {
        p.send(new org.apache.kafka.clients.producer.ProducerRecord(topic.toString(), key.toString().getBytes(), val.toString().getBytes()),callb);
      } else {
        p.send(new org.apache.kafka.clients.producer.ProducerRecord(topic.toString(), key.toString().getBytes(), val.toString().getBytes()));
      }
    } catch(e) {
      e.printStackTrace();
    }
  }

Пример публикации сообщения:

load("http://sarjsheff.ru/libs/kafka.js");

var p = kafkaUtil.getProducer("sarjsheff.ru:9092");
kafkaUtil.send(p,"testTopic","testKey","testValue");

Запуск:

jjs -J-Djava.class.path=lib/kafka-clients-0.9.0.0.jar:lib/lz4-1.2.0.jar:lib/slf4j-api-1.7.6.jar:lib/slf4j-simple-1.7.6.jar:lib/snappy-java-1.1.1.7.jar prod.js

Получение и обработка сообщений.

Код запуска обработчика сообщений:

  getConsumer:function(server,p) {
    try{
      var props = p;
      if (typeof props == "undefined") {
        var props = new java.util.Properties();
        props.put("bootstrap.servers", (server?server:"localhost:9092"));
        props.put("group.id", "test");
        props.put("session.timeout.ms", "10000");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "10000");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
      }
      return new org.apache.kafka.clients.consumer.KafkaConsumer(props);
    } catch (e) {
      e.printStackTrace();
    }
  }

Пример получения и обработки сообщений из топика:

load("http://sarjsheff.ru/libs/kafka.js");

var c = kafkaUtil.getConsumer("sarjsheff.ru:9092");
c.subscribe(["testTopic"]);
var records = c.poll(1000);
if (records.count() > 0) {
    var it = records.iterator();
    while (it.hasNext()) {
        var r = it.next();
        print(r);
    }
    c.commitSync();
}
c.close();

Запуск:

jjs -J-Djava.class.path=lib/kafka-clients-0.9.0.0.jar:lib/lz4-1.2.0.jar:lib/slf4j-api-1.7.6.jar:lib/slf4j-simple-1.7.6.jar:lib/snappy-java-1.1.1.7.jar cons.js