这个实例中有一个KafkaSpout,一个KafkaBolt,一个自定义Bolt QueryBolt。数据流程是KafkaSpout从topic为recommend的消息队列中取出String类型的消息,发送给QueryBolt。QueryBolt不做任何处理,直接转发给KafkaBolt,只把经过的消息存储在list。QueryBolt中自定义了cleanup方法,该方法在topology被杀死时调用,方法中把list中的所有数据打印在"C://"+this+".txt"文件中。KafkaBolt将接收到的数据直接转存在主题为recevier的kafka消息队列中。
代码结构:
以下是详细代码:
首先是topology.java
import java.util.HashMap;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
//import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
public class topology {
public static void main(String [] args) throws Exception{
//配置zookeeper 主机:端口号
BrokerHosts brokerHosts =new ZkHosts("110.64.76.130:2181,110.64.76.131:2181,110.64.76.132:2181");
//接收消息队列的主题
String topic="recommend";
//zookeeper设置文件中的配置,如果zookeeper配置文件中设置为主机名:端口号 ,该项为空
String zkRoot="";
//任意
String spoutId="zhou";
SpoutConfig spoutConfig=new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
//设置如何处理kafka消息队列输入流
spoutConfig.scheme=new SchemeAsMultiScheme(new MessageScheme());
Config conf=new Config();
//不输出调试信息
conf.setDebug(false);
//设置一个spout task中处于pending状态的最大的tuples数量
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
Map<String, String> map=new HashMap<String,String>();
// 配置Kafka broker地址
map.put("metadata.broker.list", "master:9092,slave1:9092,slave2:9092");
// serializer.class为消息的序列化类
map.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", map);
// 配置KafkaBolt生成的topic
conf.put("topic", "receiver");
TopologyBuilder builder =new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConfig),1);
builder.setBolt("bolt1", new QueryBolt(),1).setNumTasks(1).shuffleGrouping("spout");
builder.setBolt("bolt2", new KafkaBolt<String, String>(),1).setNumTasks(1).shuffleGrouping("bolt1");
if(args.length==0){
LocalCluster cluster = new LocalCluster();
//提交本地集群
cluster.submitTopology("test", conf, builder.createTopology());
//等待6s之后关闭集群
Thread.sleep(6000);
//关闭集群
cluster.shutdown();
}
StormSubmitter.submitTopology("test", conf, builder.createTopology());
}
}
然后是MessageScheme.java
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MessageScheme implements Scheme {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageScheme.class);
public List<Object> deserialize(byte[] ser) {
try {
//从kafka中读取的值直接序列化为UTF-8的str
String mString=new String(ser, "UTF-8");
return new Values(mString);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
LOGGER.error("Cannot parse the provided message");
}
return null;
}
public Fields getOutputFields() {
// TODO Auto-generated method stub
return new Fields("msg");
}
}
最后是QueryBolt.java
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class QueryBolt implements IRichBolt {
List<String> list;
OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
list=new ArrayList<String>();
this.collector=collector;
}
public void execute(Tuple input) {
// TODO Auto-generated method stub
String str=(String) input.getValue(0);
//将str加入到list
list.add(str);
//发送ack
collector.ack(input);
//发送该str
collector.emit(new Values(str));
}
public void cleanup() {
//topology被killed时调用
//将list的值写入到文件
try {
FileOutputStream outputStream=new FileOutputStream("C://"+this+".txt");
PrintStream p=new PrintStream(outputStream);
p.println("begin!");
p.println(list.size());
for(String tmp:list){
p.println(tmp);
}
p.println("end!");
try {
p.close();
outputStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
问题1:zkRoot如何设置?非常重要,设置错误无法正确从kafka消息队列中取出数据。
观察 server.properties 文件:
zookeeper.connect=master:2181,slave1:2181,slave2:2181
此时zkRoot="";
如果zookeeper.connect=master:2181,slave1:2181,slave2:2181/ok
此时zkRoot等于"/ok"
问题2:为什么KafkaSpout启动之后,不能从头开始读起,而是自动跳过了kafka消息队列之前的内容,只处理KafkaSpout启动之后消息队列中新增的值?
因为KafkaSpout默认跳过了Kafka消息队列之前就存在的值,如果要从头开始处理,那么需要设置spoutConfig.forceFromStart=true,即从offset最小的开始读起。
附录:KafkaSpout中关于 SpoutConfig的相关定义
SpoutConfig继承自KafkaConfig。由于SpoutConfig和KafkaConfig所有的instance field全是public, 因此在使用构造方法后,可以直接设置各个域的值。
public class SpoutConfig extends KafkaConfig implements Serializable {
public List<String> zkServers = null; //记录Spout读取进度所用的zookeeper的host
public Integer zkPort = null;//记录进度用的zookeeper的端口
public String zkRoot = null;//进度信息记录于zookeeper的哪个路径下
public String id = null;//进度记录的id,想要一个新的Spout读取之前的记录,应把它的id设为跟之前的一样。
public long stateUpdateIntervalMs = 2000;//用于metrics,多久更新一次状态。
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
this.zkRoot = zkRoot;
this.id = id;
}
}
public class KafkaConfig implements Serializable {
public final BrokerHosts hosts; //用以获取Kafka broker和partition的信息
public final String topic;//从哪个topic读取消息
public final String clientId; // SimpleConsumer所用的client id
public int fetchSizeBytes = 1024 * 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间
public int fetchMaxWait = 10000; //当服务器没有新消息时,消费者会等待这些时间
public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小
public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化
public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
public long maxOffsetBehind = 100000;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime
public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次metrics
public KafkaConfig(BrokerHosts hosts, String topic) {
this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
}
public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
this.hosts = hosts;
this.topic = topic;
this.clientId = clientId;
}
}
