三.Flink集群搭建

Flink可以选择的部署方式有:

Local、Standalone(资源利用率低)、Yarn、Mesos、Docker、Kubernetes、AWS。

我们主要对Standalone模式和Yarn模式下的Flink集群部署进行分析。

3.1Standalone模式安装

1. ?软件要求

· Java 1.8.x或更高版本,

· ssh(必须运行sshd才能使用管理远程组件的Flink脚本)

集群部署规划

2. 解压

tar -zxvf flink-1.6.1-bin-hadoop28-scala_2.11.tgz -C /opt/module/

3. 修改配置文件

修改flink/conf/masters,slaves,flink-conf.yaml

[root@bigdata11 conf]$ sudo vi masters

bigdata11:8081

[root@bigdata11 conf]$ sudo vi slaves

bigdata12

bigdata13

[root@bigdata11 conf]$ sudo vi flink-conf.yaml

taskmanager.numberOfTaskSlots:2 ??//52行

jobmanager.rpc.address: bigdata11 ?//33行

可选配置:

· 每个JobManager(jobmanager.heap.mb)的可用内存量,

· 每个TaskManager(taskmanager.heap.mb)的可用内存量,

· 每台机器的可用CPU数量(taskmanager.numberOfTaskSlots),

· 集群中的CPU总数(parallelism.default)和

· 临时目录(taskmanager.tmp.dirs)

4. 拷贝安装包到各节点

[root@bigdata11 module]$ scp -r flink-1.6.1/ itstar@bigdata12:`pwd`

[root@bigdata11 module]$ scp -r flink-1.6.1/ itstar@bigdata13:`pwd`

5. 配置环境变量

配置所有节点Flink的环境变量

[root@bigdata11 flink-1.6.1]$ vi /etc/profile

export FLINK_HOME=/opt/module/flink-1.6.1

export PATH=$PATH:$FLINK_HOME/bin

[root@bigdata11 flink-1.6.1]$ source /etc/profile

6. 启动flink

[itstar@bigdata11 flink-1.6.1]$ ./bin/start-cluster.sh

Starting cluster.

Starting standalonesession daemon on host bigdata11.

Starting taskexecutor daemon on host bigdata12.

Starting taskexecutor daemon on host bigdata13.

jps查看进程

7. ?WebUI查看

http://bigdata11:8081

8. 运行测试任务

[itstar@bigdata11 flink-1.6.1]$ bin/flink run -m bigdata11:8081?./examples/batch/WordCount.jar --input /opt/module/datas/word.txt

[itstar@bigdata11 flink-1.6.1]$ bin/flink run -m bigdata11:8081?./examples/batch/WordCount.jar --input hdfs:///LICENSE.txt?--output hdfs:///out

9. Flink ?HA

首先,我们需要知道?Flink 有两种部署的模式,分别是?Standalone 以及?Yarn Cluster 模式。对于?Standalone 来说,Flink 必须依赖于?Zookeeper 来实现?JobManager 的?HA(Zookeeper 已经成为了大部分开源框架?HA 必不可少的??椋?。在?Zookeeper 的帮助下,一个?Standalone 的?Flink 集群会同时有多个活着的?JobManager,其中只有一个处于工作状态,其他处于?Standby 状态。当工作中的?JobManager 失去连接后(如宕机或?Crash),Zookeeper 会从?Standby 中选举新的?JobManager 来接管?Flink 集群。

对于?Yarn Cluaster 模式来说,Flink 就要依靠?Yarn 本身来对?JobManager 做?HA 了。其实这里完全是?Yarn 的机制。对于?Yarn Cluster 模式来说,JobManager 和?TaskManager 都是被?Yarn 启动在?Yarn 的?Container 中。此时的?JobManager,其实应该称之为?Flink Application Master。也就说它的故障恢复,就完全依靠着?Yarn 中的?ResourceManager(和?MapReduce 的?AppMaster 一样)。由于完全依赖了?Yarn,因此不同版本的?Yarn 可能会有细微的差异。这里不再做深究。

1)修改配置文件

修改flink-conf.yaml,HA模式下,jobmanager不需要指定,在master file中配置,由zookeeper选出leader与standby。

#jobmanager.rpc.address: bigdata11

high-availability: zookeeper ??//73行

#指定高可用模式(必须) //88行

high-availability.zookeeper.quorum:bigdata11:2181,bigdata12:2181,bigdata13:2181

#ZooKeeper仲裁是ZooKeeper服务器的复制组,它提供分布式协调服务(必须) //82行

high-availability.storageDir: hdfs:///flink/ha/? ? ? ?

#JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须) //没有

high-availability.zookeeper.path.root: /flink? ? ? ? ?

#根ZooKeeper节点,在该节点下放置所有集群节点(推荐) //没有

high-availability.cluster-id:/flinkCluster? ? ? ? ? ?

#自定义集群(推荐)

state.backend: filesystem

state.checkpoints.dir: hdfs:///flink/checkpoints

state.savepoints.dir: hdfs:///flink/checkpoints

修改conf/zoo.cfg

server.1=bigdata11:2888:3888

server.2=bigdata12:2888:3888

server.3=bigdata13:2888:3888

修改conf/masters

bigdata11:8081

bigdata12:8081

修改slaves

bigdata12

bigdata13

同步配置文件conf到各节点

2)启动HA

先启动zookeeper集群各节点(测试环境中也可以用Flink自带的start-zookeeper-quorum.sh),启动dfs ,再启动flink

[itstar@bigdata11 flink-1.6.1]$ bin/start-cluster.sh

WebUI查看,这是会自动产生一个主Master,如下

3)验证HA

手动杀死bigdata12上的master,此时,bigdata11上的备用master转为主mater。

4)手动将JobManager / TaskManager实例添加到群集

您可以使用bin/jobmanager.sh和bin/taskmanager.sh脚本将JobManager和TaskManager实例添加到正在运行的集群中。

添加JobManager

bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

添加TaskManager

bin/taskmanager.sh start|start-foreground|stop|stop-all

[itstar@bigdata12 flink-1.6.1]$ jobmanager.sh start bigdata12

新添加的为从master。

3.2Yarn模式安装

在官网下载1.6.1版本Flink(https://archive.apache.org/dist/flink/flink-1.6.1/)。

将安装包上传到要按照JobManager的节点(bigdata11)。

进入Linux系统对安装包进行解压:(同上)

修改安装目录下conf文件夹内的flink-conf.yaml配置文件,指定JobManager:(同上)

修改安装目录下conf文件夹内的slave配置文件,指定TaskManager:(同上)

将配置好的Flink目录分发给其他的两台节点:(同上)

明确虚拟机中已经设置好了环境变量HADOOP_HOME。

启动Hadoop集群(HDFS和Yarn)。

在bigdata11节点提交Yarn-Session,使用安装目录下bin目录中的yarn-session.sh脚本进行提交:

在yarn-site.xml文件中加入以下配置

<property>

<name>yarn.nodemanager.resource.cpu-vcores</name>

<value>5</value>

</property>

/opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 4 -jm 1024 -tm 1024 -nm test -d

其中:

-n(--container):TaskManager的数量。

-s(--slots): 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。

-jm:JobManager的内存(单位MB)。

-tm:每个taskmanager的内存(单位MB)。

-nm:yarn 的appName(现在yarn的ui上的名字)。?

-d:后台执行。

启动后查看Yarn的Web页面,可以看到刚才提交的会话:

在提交Session的节点查看进程

提交Jar到集群运行:

/opt/module/flink-1.6.1/bin/flink run -m yarn-cluster examples/batch/WordCount.jar

提交后在Yarn的Web页面查看任务运行情况

任务运行结束后在控制台打印如下输出

3.3 FlinkWordCount

3.3.1 使用Socket传输数据

[root@bigdata13 flink-1.6.1]# bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999

#另起一个Xshell客户端

[root@bigdata13 flink-1.6.1]# nc -l 9999

#查看日志输出

[root@bigdata13 flink-1.6.1]# vi log/flink-root-taskexecutor-1-bigdata13.out

3.3.2 Java代码运行WordCount

#在bigdata13中打开9999端口

nc -l 9999

#运行以下代码,然后输入数据到以上的端口中

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

public class WordCount {

public static void main(String[] args) throws Exception {

//定义socket的端口号

int port;

try{

ParameterTool parameterTool = ParameterTool.fromArgs(args);

port = parameterTool.getInt("port");

}catch (Exception e){

System.err.println("没有指定port参数,使用默认值9000");

port = 9000;

}

//获取运行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//连接socket获取输入的数据

DataStreamSource<String> text = env.socketTextStream("192.168.1.53", port, "\n");

//计算数据

DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {

public void flatMap(String value, Collector<WordWithCount> out) throws Exception {

String[] splits = value.split("\\s");

for (String word:splits) {

out.collect(new WordWithCount(word,1L));

}

}

})//打平操作,把每行的单词转为<word,count>类型的数据

.keyBy("word")//针对相同的word数据进行分组

.timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小

.sum("count");

//把数据打印到控制台

windowCount.print()

.setParallelism(1);//使用一个并行度

//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行

env.execute("streaming word count");

}

/**

*主要为了存储单词以及单词出现的次数

*/

public static class WordWithCount{

public String word;

public long count;

public WordWithCount(){}

public WordWithCount(String word, long count) {

this.word = word;

this.count = count;

}

@Override

public String toString() {

return "WordWithCount{" +

"word='" + word + '\'' +

", count=" + count +

'}';

}

}

}

3.3.3 Scala代码运行WordCount

import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.api.windowing.time.Time

object ScalaWordCount {

def main(args: Array[String]): Unit = {

// get the execution environment

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// get input data by connecting to the socket

val text = env.socketTextStream("bigdata13", 9999, '\n')

// parse the data, group it, window it, and aggregate the counts

val windowCounts = text

.flatMap { w => w.split("\\s") }

.map { w => WordWithCount(w, 1) }

.keyBy("word")

.timeWindow(Time.seconds(5), Time.seconds(1))

.sum("count")

// print the results with a single thread, rather than in parallel

windowCounts.print().setParallelism(1)

env.execute("Socket Window WordCount")

}

// Data type for words with count

case class WordWithCount(word: String, count: Long)

}

注意:导包用的import org.apache.flink.streaming.api.scala._ 不然会有缺包的BUG

3.3.4 Flink 监控维基百科

Pom.xml

<properties>

<maven.compiler.source>1.8</maven.compiler.source>

<maven.compiler.target>1.8</maven.compiler.target>

<encoding>UTF-8</encoding>

<scala.version>2.11.12</scala.version>

<scala.binary.version>2.11</scala.binary.version>

<hadoop.version>2.8.4</hadoop.version>

<flink.version>1.6.1</flink.version>

</properties>

<dependencies>

<dependency>

<groupId>org.scala-lang</groupId>

<artifactId>scala-library</artifactId>

<version>${scala.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-java</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-scala_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-table_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-clients_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>${hadoop.version}</version>

</dependency>

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.38</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-wikiedits_2.11</artifactId>

<version>1.6.1</version>

</dependency>

</dependencies>

代码

import org.apache.flink.api.common.functions.FoldFunction;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class WikipediaAnalysis {

public static void main(String[] args) throws Exception {

//创建一个streaming程序运行的上下文

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

//sowurce部分---数据来源部分

DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

//获得修改词条的作者

KeyedStream<WikipediaEditEvent, String> keyedEdits = edits

.keyBy(new KeySelector<WikipediaEditEvent, String>() {

@Override

public String getKey(WikipediaEditEvent event) {

return event.getUser();

}

});

//获得修改的结果

DataStream<Tuple2<String, Long>> result = keyedEdits

.timeWindow(Time.seconds(5))

.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {

@Override

public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {

acc.f0 = event.getUser();

acc.f1 += event.getByteDiff();

return acc;

}

});

result.print();

see.execute();

}

}

然后在IDEA中直接执行即可,稍等20S即可

3.3.5 Wiki To Kafka

Kafka主题创建

#在bigdata11上创建topic wiki-results

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wiki-results

在Flink的项目中创建子module,Pom如下

<parent>

<artifactId>Flink</artifactId>

<groupId>com.itstar</groupId>

<version>1.0-SNAPSHOT</version>

</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>wiki</artifactId>

<dependencies>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-java</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.11</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-clients_2.11</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-wikiedits_2.11</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.11_2.11</artifactId>

<version>1.6.1</version>

</dependency>

</dependencies>

代码如下

package wikiedits;

import org.apache.flink.api.common.functions.FoldFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;

import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class WikipediaAnalysis {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

KeyedStream<WikipediaEditEvent, String> keyedEdits = edits

.keyBy(new KeySelector<WikipediaEditEvent, String>() {

@Override

public String getKey(WikipediaEditEvent event) {

return event.getUser();

}

});

DataStream<Tuple2<String, Long>> result = keyedEdits

.timeWindow(Time.seconds(5))

.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {

@Override

public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {

acc.f0 = event.getUser();

acc.f1 += event.getByteDiff();

return acc;

}

});

result.print();

result

.map(new MapFunction<Tuple2<String,Long>, String>() {

@Override

public String map(Tuple2<String, Long> tuple) {

return tuple.toString();

}

})

.addSink(new FlinkKafkaProducer011<>("bigdata11:9092", "wiki-result", new SimpleStringSchema()));

see.execute();

}

}

提示:注意导包如下

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.api.common.functions.MapFunction;

启动Kafka的消费者

bin/kafka-console-consumer.sh ?--zookeeper localhost:2181 --topic wiki-result

3.3.6 Flink Source实战:

?Kafka + Flink Stream + MySQL

创建student表

DROP TABLE IF EXISTS `student`;

CREATE TABLE `student` (

`id` int(11) unsigned NOT NULL AUTO_INCREMENT,

`name` varchar(25) COLLATE utf8_bin DEFAULT NULL,

`password` varchar(25) COLLATE utf8_bin DEFAULT NULL,

`age` int(10) DEFAULT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

插入数据

INSERT INTO `student` VALUES ('1', 'Andy', '123456', '18'), ('2', 'Bndy', '000000', '17'), ('3', 'Cndy', '012345', '18'), ('4', 'Dndy', '123456', '16');

COMMIT;

Pom

<dependencies>

<!--flink java-->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-java</artifactId>

<version>${flink.version}</version>

<!--<scope>provided</scope>-->

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

<!--<scope>provided</scope>-->

</dependency>

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

<version>1.7.7</version>

<scope>runtime</scope>

</dependency>

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

<scope>runtime</scope>

</dependency>

<!--flink kafka connector-->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<!--alibaba fastjson-->

<dependency>

<groupId>com.alibaba</groupId>

<artifactId>fastjson</artifactId>

<version>1.2.51</version>

</dependency>

<!--alibaba fastjson-->

<dependency>

<groupId>com.alibaba</groupId>

<artifactId>fastjson</artifactId>

<version>1.2.51</version>

</dependency>

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.27</version>

</dependency>

</dependencies>

Student Bean

package FlinkToMySQL;

public class Student {

public int id;

public String name;

public String password;

public int age;

public Student() {

}

public Student(int id, String name, String password, int age) {

this.id = id;

this.name = name;

this.password = password;

this.age = age;

}

@Override

public String toString() {

return "Student{" +

"id=" + id +

", name='" + name + '\'' +

", password='" + password + '\'' +

", age=" + age +

'}';

}

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this.password = password;

}

public int getAge() {

return age;

}

public void setAge(int age) {

this.age = age;

}

}

注意:使用lombok可能会导致其他报错

SourceFromMySQL

package FlinkToMySQL;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

public class SourceFromMySQL extends RichSourceFunction<Student> {

PreparedStatement ps;

private Connection connection;

/**

* open()方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。

*

* @param parameters

* @throws Exception

*/

@Override

public void open(Configuration parameters) throws Exception {

connection = getConnection();

String sql = "select * from student;";

ps = this.connection.prepareStatement(sql);

}

/**

*程序执行完毕就可以进行,关闭连接和释放资源的动作了

*

* @throws Exception

*/

@Override

public void close() throws Exception {

if (connection != null) { //关闭连接和释放资源

connection.close();

}

if (ps != null) {

ps.close();

}

}

/**

* DataStream调用一次 run() 方法用来获取数据

*

* @param ctx

* @throws Exception

*/

@Override

public void run(SourceContext<Student> ctx) throws Exception {

ResultSet resultSet = ps.executeQuery();

while (resultSet.next()) {

Student student = new Student(

resultSet.getInt("id"),

resultSet.getString("name").trim(),

resultSet.getString("password").trim(),

resultSet.getInt("age"));

ctx.collect(student);

}

}

@Override

public void cancel() {

}

private static Connection getConnection() {

Connection con = null;

try {

Class.forName("com.mysql.jdbc.Driver");

con = DriverManager.getConnection("jdbc:mysql://bigdata11:3306/Andy?useUnicode=true&characterEncoding=UTF-8", "root", "000000");

} catch (Exception e) {

}

return con;

}

}

自定义Source的main方法

package FlinkToMySQL

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class customSource {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new SourceFromMySQL()).print();

env.execute("Flink add data sourc");

}

}

Flink Stream + Kafka

Pom

<dependencies>

<!--flink java-->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-java</artifactId>

<version>${flink.version}</version>

<!--<scope>provided</scope>-->

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

<!--<scope>provided</scope>-->

</dependency>

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

<version>1.7.7</version>

<scope>runtime</scope>

</dependency>

<dependency>

<groupId>log4j</groupId>

<artifactId>log4j</artifactId>

<version>1.2.17</version>

<scope>runtime</scope>

</dependency>

<!--flink kafka connector-->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<!--alibaba fastjson-->

<dependency>

<groupId>com.alibaba</groupId>

<artifactId>fastjson</artifactId>

<version>1.2.51</version>

</dependency>

<!--alibaba fastjson-->

<dependency>

<groupId>com.alibaba</groupId>

<artifactId>fastjson</artifactId>

<version>1.2.51</version>

</dependency>

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.27</version>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>

<version>3.6.0</version>

<configuration>

<source>1.8</source>

<target>1.8</target>

</configuration>

</plugin>

</plugins>

</build>

Bean

package KafkaToFlink;

import lombok.*;

import java.util.Map;

public class Metric {

private String name;

private long timestamp;

private Map<String, Object> fields;

private Map<String, String> tags;

public Metric() {

}

public Metric(String name, long timestamp, Map<String, Object> fields, Map<String, String> tags) {

this.name = name;

this.timestamp = timestamp;

this.fields = fields;

this.tags = tags;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public long getTimestamp() {

return timestamp;

}

public void setTimestamp(long timestamp) {

this.timestamp = timestamp;

}

public Map<String, Object> getFields() {

return fields;

}

public void setFields(Map<String, Object> fields) {

this.fields = fields;

}

public Map<String, String> getTags() {

return tags;

}

public void setTags(Map<String, String> tags) {

this.tags = tags;

}

}

Kafkautils

package KafkaToFlink;

import com.alibaba.fastjson.JSON;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;

import java.util.Map;

import java.util.Properties;

public class KafkaUtils {

public static final String broker_list = "bigdata11:9092";

// kafka topic

public static final String topic = "metric";

//key序列化

public static final String KEY = "org.apache.kafka.common.serialization.StringSerializer";

//value序列化

public static final String VALUE = "org.apache.kafka.common.serialization.StringSerializer";

public static void writeToKafka() throws InterruptedException {

Properties props = new Properties();

props.put("bootstrap.servers", broker_list);

props.put("key.serializer", KEY);

props.put("value.serializer", VALUE);

KafkaProducer producer = new KafkaProducer<String, String>(props);

Metric metric = new Metric();

metric.setName("mem");

long timestamp = System.currentTimeMillis();

metric.setTimestamp(timestamp);

Map<String, Object> fields = new HashMap<>();

fields.put("used_percent", 90d);

fields.put("max", 27244873d);

fields.put("used", 17244873d);

fields.put("init", 27244873d);

Map<String, String> tags = new HashMap<>();

tags.put("cluster", "Andy");

tags.put("host_ip", "192.168.1.51");

metric.setFields(fields);

metric.setTags(tags);

ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(metric));

producer.send(record);

System.out.println("发送数据: " + JSON.toJSONString(metric));

producer.flush();

}

public static void main(String[] args) throws InterruptedException {

while (true) {

Thread.sleep(300);

writeToKafka();

}

}

}

package KafkaToFlink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public class Main {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties props = new Properties();

props.put("bootstrap.servers", "bigdata11:9092");

props.put("zookeeper.connect", "bigdata11:2181");

props.put("group.id", "metric-group");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); ?//key反序列化

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("auto.offset.reset", "earliest"); //value反序列化

DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(

"metric", ?//kafka topic

new SimpleStringSchema(), ?// String序列化

props)).setParallelism(1);

dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台

env.execute("Flink add data source");

}

}

注意:kafka主题会自动创建Topic。无须手动创建

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容