Flink可以选择的部署方式有:
Local、Standalone(资源利用率低)、Yarn、Mesos、Docker、Kubernetes、AWS。
我们主要对Standalone模式和Yarn模式下的Flink集群部署进行分析。
3.1Standalone模式安装
1. ?软件要求
· Java 1.8.x或更高版本,
· ssh(必须运行sshd才能使用管理远程组件的Flink脚本)
集群部署规划
2. 解压
tar -zxvf flink-1.6.1-bin-hadoop28-scala_2.11.tgz -C /opt/module/
3. 修改配置文件
修改flink/conf/masters,slaves,flink-conf.yaml
[root@bigdata11 conf]$ sudo vi masters
bigdata11:8081
[root@bigdata11 conf]$ sudo vi slaves
bigdata12
bigdata13
[root@bigdata11 conf]$ sudo vi flink-conf.yaml
taskmanager.numberOfTaskSlots:2 ??//52行
jobmanager.rpc.address: bigdata11 ?//33行
可选配置:
· 每个JobManager(jobmanager.heap.mb)的可用内存量,
· 每个TaskManager(taskmanager.heap.mb)的可用内存量,
· 每台机器的可用CPU数量(taskmanager.numberOfTaskSlots),
· 集群中的CPU总数(parallelism.default)和
· 临时目录(taskmanager.tmp.dirs)
4. 拷贝安装包到各节点
[root@bigdata11 module]$ scp -r flink-1.6.1/ itstar@bigdata12:`pwd`
[root@bigdata11 module]$ scp -r flink-1.6.1/ itstar@bigdata13:`pwd`
5. 配置环境变量
配置所有节点Flink的环境变量
[root@bigdata11 flink-1.6.1]$ vi /etc/profile
export FLINK_HOME=/opt/module/flink-1.6.1
export PATH=$PATH:$FLINK_HOME/bin
[root@bigdata11 flink-1.6.1]$ source /etc/profile
6. 启动flink
[itstar@bigdata11 flink-1.6.1]$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host bigdata11.
Starting taskexecutor daemon on host bigdata12.
Starting taskexecutor daemon on host bigdata13.
jps查看进程
7. ?WebUI查看
8. 运行测试任务
[itstar@bigdata11 flink-1.6.1]$ bin/flink run -m bigdata11:8081?./examples/batch/WordCount.jar --input /opt/module/datas/word.txt
[itstar@bigdata11 flink-1.6.1]$ bin/flink run -m bigdata11:8081?./examples/batch/WordCount.jar --input hdfs:///LICENSE.txt?--output hdfs:///out
9. Flink 的?HA
首先,我们需要知道?Flink 有两种部署的模式,分别是?Standalone 以及?Yarn Cluster 模式。对于?Standalone 来说,Flink 必须依赖于?Zookeeper 来实现?JobManager 的?HA(Zookeeper 已经成为了大部分开源框架?HA 必不可少的??椋?。在?Zookeeper 的帮助下,一个?Standalone 的?Flink 集群会同时有多个活着的?JobManager,其中只有一个处于工作状态,其他处于?Standby 状态。当工作中的?JobManager 失去连接后(如宕机或?Crash),Zookeeper 会从?Standby 中选举新的?JobManager 来接管?Flink 集群。
对于?Yarn Cluaster 模式来说,Flink 就要依靠?Yarn 本身来对?JobManager 做?HA 了。其实这里完全是?Yarn 的机制。对于?Yarn Cluster 模式来说,JobManager 和?TaskManager 都是被?Yarn 启动在?Yarn 的?Container 中。此时的?JobManager,其实应该称之为?Flink Application Master。也就说它的故障恢复,就完全依靠着?Yarn 中的?ResourceManager(和?MapReduce 的?AppMaster 一样)。由于完全依赖了?Yarn,因此不同版本的?Yarn 可能会有细微的差异。这里不再做深究。
1)修改配置文件
修改flink-conf.yaml,HA模式下,jobmanager不需要指定,在master file中配置,由zookeeper选出leader与standby。
#jobmanager.rpc.address: bigdata11
high-availability: zookeeper ??//73行
#指定高可用模式(必须) //88行
high-availability.zookeeper.quorum:bigdata11:2181,bigdata12:2181,bigdata13:2181
#ZooKeeper仲裁是ZooKeeper服务器的复制组,它提供分布式协调服务(必须) //82行
high-availability.storageDir: hdfs:///flink/ha/? ? ? ?
#JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须) //没有
high-availability.zookeeper.path.root: /flink? ? ? ? ?
#根ZooKeeper节点,在该节点下放置所有集群节点(推荐) //没有
high-availability.cluster-id:/flinkCluster? ? ? ? ? ?
#自定义集群(推荐)
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints
修改conf/zoo.cfg
server.1=bigdata11:2888:3888
server.2=bigdata12:2888:3888
server.3=bigdata13:2888:3888
修改conf/masters
bigdata11:8081
bigdata12:8081
修改slaves
bigdata12
bigdata13
同步配置文件conf到各节点
2)启动HA
先启动zookeeper集群各节点(测试环境中也可以用Flink自带的start-zookeeper-quorum.sh),启动dfs ,再启动flink
[itstar@bigdata11 flink-1.6.1]$ bin/start-cluster.sh
WebUI查看,这是会自动产生一个主Master,如下
3)验证HA
手动杀死bigdata12上的master,此时,bigdata11上的备用master转为主mater。
4)手动将JobManager / TaskManager实例添加到群集
您可以使用bin/jobmanager.sh和bin/taskmanager.sh脚本将JobManager和TaskManager实例添加到正在运行的集群中。
添加JobManager
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
添加TaskManager
bin/taskmanager.sh start|start-foreground|stop|stop-all
[itstar@bigdata12 flink-1.6.1]$ jobmanager.sh start bigdata12
新添加的为从master。
3.2Yarn模式安装
在官网下载1.6.1版本Flink(https://archive.apache.org/dist/flink/flink-1.6.1/)。
将安装包上传到要按照JobManager的节点(bigdata11)。
进入Linux系统对安装包进行解压:(同上)
修改安装目录下conf文件夹内的flink-conf.yaml配置文件,指定JobManager:(同上)
修改安装目录下conf文件夹内的slave配置文件,指定TaskManager:(同上)
将配置好的Flink目录分发给其他的两台节点:(同上)
明确虚拟机中已经设置好了环境变量HADOOP_HOME。
启动Hadoop集群(HDFS和Yarn)。
在bigdata11节点提交Yarn-Session,使用安装目录下bin目录中的yarn-session.sh脚本进行提交:
在yarn-site.xml文件中加入以下配置
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>5</value>
</property>
/opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 4 -jm 1024 -tm 1024 -nm test -d
其中:
-n(--container):TaskManager的数量。
-s(--slots): 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
-jm:JobManager的内存(单位MB)。
-tm:每个taskmanager的内存(单位MB)。
-nm:yarn 的appName(现在yarn的ui上的名字)。?
-d:后台执行。
启动后查看Yarn的Web页面,可以看到刚才提交的会话:
在提交Session的节点查看进程
提交Jar到集群运行:
/opt/module/flink-1.6.1/bin/flink run -m yarn-cluster examples/batch/WordCount.jar
提交后在Yarn的Web页面查看任务运行情况
任务运行结束后在控制台打印如下输出
3.3 FlinkWordCount
3.3.1 使用Socket传输数据
[root@bigdata13 flink-1.6.1]# bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
#另起一个Xshell客户端
[root@bigdata13 flink-1.6.1]# nc -l 9999
#查看日志输出
[root@bigdata13 flink-1.6.1]# vi log/flink-root-taskexecutor-1-bigdata13.out
3.3.2 Java代码运行WordCount
#在bigdata13中打开9999端口
nc -l 9999
#运行以下代码,然后输入数据到以上的端口中
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
//定义socket的端口号
int port;
try{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("没有指定port参数,使用默认值9000");
port = 9000;
}
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream("192.168.1.53", port, "\n");
//计算数据
DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] splits = value.split("\\s");
for (String word:splits) {
out.collect(new WordWithCount(word,1L));
}
}
})//打平操作,把每行的单词转为<word,count>类型的数据
.keyBy("word")//针对相同的word数据进行分组
.timeWindow(Time.seconds(2),Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
.sum("count");
//把数据打印到控制台
windowCount.print()
.setParallelism(1);//使用一个并行度
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
env.execute("streaming word count");
}
/**
*主要为了存储单词以及单词出现的次数
*/
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
3.3.3 Scala代码运行WordCount
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object ScalaWordCount {
def main(args: Array[String]): Unit = {
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream("bigdata13", 9999, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}
注意:导包用的import org.apache.flink.streaming.api.scala._ 不然会有缺包的BUG
3.3.4 Flink 监控维基百科
Pom.xml
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.8.4</hadoop.version>
<flink.version>1.6.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
代码
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
//创建一个streaming程序运行的上下文
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
//sowurce部分---数据来源部分
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
//获得修改词条的作者
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
//获得修改的结果
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.print();
see.execute();
}
}
然后在IDEA中直接执行即可,稍等20S即可
3.3.5 Wiki To Kafka
Kafka主题创建
#在bigdata11上创建topic wiki-results
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wiki-results
在Flink的项目中创建子module,Pom如下
<parent>
<artifactId>Flink</artifactId>
<groupId>com.itstar</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>wiki</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
代码如下
package wikiedits;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.print();
result
.map(new MapFunction<Tuple2<String,Long>, String>() {
@Override
public String map(Tuple2<String, Long> tuple) {
return tuple.toString();
}
})
.addSink(new FlinkKafkaProducer011<>("bigdata11:9092", "wiki-result", new SimpleStringSchema()));
see.execute();
}
}
提示:注意导包如下
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;
启动Kafka的消费者
bin/kafka-console-consumer.sh ?--zookeeper localhost:2181 --topic wiki-result
3.3.6 Flink Source实战:
?Kafka + Flink Stream + MySQL
创建student表
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
`password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
`age` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
插入数据
INSERT INTO `student` VALUES ('1', 'Andy', '123456', '18'), ('2', 'Bndy', '000000', '17'), ('3', 'Cndy', '012345', '18'), ('4', 'Dndy', '123456', '16');
COMMIT;
Pom
<dependencies>
<!--flink java-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!--flink kafka connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--alibaba fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<!--alibaba fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
Student Bean
package FlinkToMySQL;
public class Student {
public int id;
public String name;
public String password;
public int age;
public Student() {
}
public Student(int id, String name, String password, int age) {
this.id = id;
this.name = name;
this.password = password;
this.age = age;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", password='" + password + '\'' +
", age=" + age +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
注意:使用lombok可能会导致其他报错
SourceFromMySQL
package FlinkToMySQL;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class SourceFromMySQL extends RichSourceFunction<Student> {
PreparedStatement ps;
private Connection connection;
/**
* open()方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
connection = getConnection();
String sql = "select * from student;";
ps = this.connection.prepareStatement(sql);
}
/**
*程序执行完毕就可以进行,关闭连接和释放资源的动作了
*
* @throws Exception
*/
@Override
public void close() throws Exception {
if (connection != null) { //关闭连接和释放资源
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* DataStream调用一次 run() 方法用来获取数据
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Student> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
Student student = new Student(
resultSet.getInt("id"),
resultSet.getString("name").trim(),
resultSet.getString("password").trim(),
resultSet.getInt("age"));
ctx.collect(student);
}
}
@Override
public void cancel() {
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://bigdata11:3306/Andy?useUnicode=true&characterEncoding=UTF-8", "root", "000000");
} catch (Exception e) {
}
return con;
}
}
自定义Source的main方法
package FlinkToMySQL
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class customSource {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFromMySQL()).print();
env.execute("Flink add data sourc");
}
}
Flink Stream + Kafka
Pom
<dependencies>
<!--flink java-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!--flink kafka connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--alibaba fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<!--alibaba fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
Bean
package KafkaToFlink;
import lombok.*;
import java.util.Map;
public class Metric {
private String name;
private long timestamp;
private Map<String, Object> fields;
private Map<String, String> tags;
public Metric() {
}
public Metric(String name, long timestamp, Map<String, Object> fields, Map<String, String> tags) {
this.name = name;
this.timestamp = timestamp;
this.fields = fields;
this.tags = tags;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public Map<String, Object> getFields() {
return fields;
}
public void setFields(Map<String, Object> fields) {
this.fields = fields;
}
public Map<String, String> getTags() {
return tags;
}
public void setTags(Map<String, String> tags) {
this.tags = tags;
}
}
Kafkautils
package KafkaToFlink;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaUtils {
public static final String broker_list = "bigdata11:9092";
// kafka topic
public static final String topic = "metric";
//key序列化
public static final String KEY = "org.apache.kafka.common.serialization.StringSerializer";
//value序列化
public static final String VALUE = "org.apache.kafka.common.serialization.StringSerializer";
public static void writeToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", KEY);
props.put("value.serializer", VALUE);
KafkaProducer producer = new KafkaProducer<String, String>(props);
Metric metric = new Metric();
metric.setName("mem");
long timestamp = System.currentTimeMillis();
metric.setTimestamp(timestamp);
Map<String, Object> fields = new HashMap<>();
fields.put("used_percent", 90d);
fields.put("max", 27244873d);
fields.put("used", 17244873d);
fields.put("init", 27244873d);
Map<String, String> tags = new HashMap<>();
tags.put("cluster", "Andy");
tags.put("host_ip", "192.168.1.51");
metric.setFields(fields);
metric.setTags(tags);
ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(metric));
producer.send(record);
System.out.println("发送数据: " + JSON.toJSONString(metric));
producer.flush();
}
public static void main(String[] args) throws InterruptedException {
while (true) {
Thread.sleep(300);
writeToKafka();
}
}
}
package KafkaToFlink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class Main {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "bigdata11:9092");
props.put("zookeeper.connect", "bigdata11:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); ?//key反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); //value反序列化
DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
"metric", ?//kafka topic
new SimpleStringSchema(), ?// String序列化
props)).setParallelism(1);
dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台
env.execute("Flink add data source");
}
}
注意:kafka主题会自动创建Topic。无须手动创建