2 Storm代码实现

Storm的一个拓扑中包括Spout和Blots。
代码主要体现在Spout读取数据,然后发送给Blot去处理。

首先添加maven依赖

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.1.1</version>
    </dependency>

Spout读取数据

实现Spout有两种方式,一种是继承BaseRichSpout,一种是实现IRichSpout。
其实BaseRichSpout也是实现了IRichSpout。

public abstract class BaseRichSpout extends BaseComponent implements IRichSpout

这里我就用BaseRichSpout去实现读取文件

import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Map;

public class SpoutTest  extends BaseRichSpout implements Serializable {
    SpoutOutputCollector collector;
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }

    public void nextTuple() {
        //读取目录`d:\\test`下的txt格式的文件,你也可以添加其他类型
        Collection<File> listFiles = FileUtils.listFiles(new File("d:\\test"),  new String[] { "txt" }, true);
        for (File file : listFiles) {
            // 行格式发送
            try {
               //按行发送
                List<String> lines = FileUtils.readLines(file,"utf-8");
                for (String line : lines) {
                    this.collector.emit(new Values(lines));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            // 文件已经处理完成
            try {
                File srcFile = file.getAbsoluteFile();
                File destFile = new File(srcFile + ".done." + System.currentTimeMillis());
                FileUtils.moveFile(srcFile, destFile);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("lines"));
    }
}

Blot处理数据

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class BlotTest extends BaseRichBolt {
    private Map conf;
    private TopologyContext context;
    private OutputCollector collector;

    //准备阶段,初始化conf,context和collector
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        //接收tuple中的信息
        String line = tuple.getStringByField("line");
        if ("".equals(line) || null == line){
            return;
        }
        System.out.println(line);
          //。。。这块处理数据或者存储数据库
          //如果有需要发送到下一个blot,在下一个blot存储
//        collector.emit(new Values(line));

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //outputFieldsDeclarer.declare(new Fields("phone","time"));
    }
}

上面的Blot接收之前的Spout传过来的数据。如果为空直接返回。如果还需要过滤,则可以调用上面注释的代码继续发送到下一个blot,当然需要下面的declareOutputFields()和spout一样。

最后主方法

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;

public class Main {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        try {
            //移动
            builder.setSpout("spoutid",new SpoutTest());
            builder.setBolt("blotid", new BlotTest()).shuffleGrouping("spoutid");
            //对应Blot里面的注释,以phone分组,给它开了4个并行度
           // builder.setBolt("blotid", new BlotTest(),4).fieldsGrouping("spoutid",new Fields("phone"));
            Config config = new Config();
            //这里对数据准确性要求不高,就不设置ack数量了,按需设置,不然会有处理堆积的问题
            config.setNumAckers(0);
            
            //>0是集群用的,else里面是本机运行
            if (args.length>0){
                config.setNumWorkers(Integer.parseInt(args[1]));
                config.setMaxSpoutPending(5000);
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            }else {
                String topologyName = Main.class.getSimpleName();
                StormTopology stormTopology = builder.createTopology();
                LocalCluster lCluster = new LocalCluster();
                lCluster.submitTopology(topologyName, config, stormTopology);
            }
        } catch (InvalidTopologyException e) {
            e.printStackTrace();
        } catch (AlreadyAliveException e) {
            e.printStackTrace();
        } catch (AuthorizationException e) {
            e.printStackTrace();
        }

    }
}

上面有一行注释的,是按照BlotTest下面的注释分组。里面的并行度具体我没研究过,根据业务设定吧。
到此,简单的一个拓扑就完成了。
那么问题来了,如果storm一直处理,什么时候去存入数据库等。这就涉及到storm的定时器
把上面的代码稍微改一下

import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class Blot1Test extends BaseRichBolt {
    private Map conf;
    private TopologyContext context;
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    Map map = new HashMap();
    public void execute(Tuple tuple) {
        String line = tuple.getStringByField("lines");
        if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            //接收到定时信号的时候,处理这里,其余时间走else
            savemaptodb();
            return;
        }else {
            map.put(line,line);
            return;
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

     //设置10秒发送一次tick心跳
    @SuppressWarnings("static-access")
    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
        return conf;
    }
}

上面这个getComponentConfiguration()就是实现了这个blot的定时,还有全局的定时器,在Main类的config加上

 config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);//设置定时器,每五秒发送一次系统级别的

然后在每个blot的execute方法里面判断是否触发

tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)

这样就实现了一个简单是storm例子(说实话我没有验证,都是手敲出来的。公司的代码在内网,拿出来太麻烦),但是大体上是这样的。

这个拓扑没有失败机制,也不是从hdfs或者kafka读取。自己去写吧。遇到问题才能真正掌握。

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容