博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink读取kafka数据并写入HDFS
阅读量:5747 次
发布时间:2019-06-18

本文共 2365 字,大约阅读时间需要 7 分钟。

hot3.png

### 本地代码flink streaming读取远程环境的kafka的数据,写入远程环境的HDFS中;public static void main(String[] args) throws Exception {         // set up the streaming execution environment        final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();           env.enableCheckpointing(5000);        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        Properties properties = new Properties();//目标环境的IP地址和端口号        properties.setProperty("bootstrap.servers", "192.168.0.1:9092");//kafka//kafka版本0.8需要;//        properties.setProperty("zookeeper.connect", "192.168.0.1:2181");//zookeepe        properties.setProperty("group.id", "test-consumer-group"); //group.id//第一种方式://这里很重要,填写hdfs-site.xml和core-site.xml的路径,可以把目标环境上的hadoop的这两个配置拉到本地来,这个是我放在了项目的resources目录下。 //       properties.setProperty("fs.hdfs.hadoopconf", "E:\\Ali-Code\\cn-smart\\cn-components\\cn-flink\\src\\main\\resources");//第二种方式:  properties.setProperty("fs.default-scheme","hdfs://ip:8020"); //根据不同的版本new不同的消费对象;//        FlinkKafkaConsumer09
flinkKafkaConsumer09 = new FlinkKafkaConsumer09
("test0", new SimpleStringSchema(),properties); FlinkKafkaConsumer010
flinkKafkaConsumer010 = new FlinkKafkaConsumer010
("test1", new SimpleStringSchema(), properties);// flinkKafkaConsumer010.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream
keyedStream = env.addSource(flinkKafkaConsumer010); keyedStream.print(); // execute program System.out.println("*********** hdfs ***********************"); BucketingSink
bucketingSink = new BucketingSink<>("/var"); //hdfs上的路径 BucketingSink
bucketingSink1 = bucketingSink.setBucketer((Bucketer
) (clock, basePath, value) -> { return basePath; }); bucketingSink.setWriter(new StringWriter<>()) .setBatchSize(1024 * 1024 ) .setBatchRolloverInterval(2000); keyedStream.addSink(bucketingSink); env.execute("test"); }在远程目标环境上hdfs的/var下面生成很多小目录,这些小目录是kafka中的数据;问题:1. 这种方式生成的hdfs文件不能够被spark sql去读取;解决: 将数据写成parquet格式到hdfs上可解决这个问题;见另一篇博客https://blog.csdn.net/u012798083/article/details/858528302. 如果出现大量inprocess的文件,怎么办?解决: 将数据量加大一点;3. 如何增加窗口处理?解决:见另一篇博客:https://blog.csdn.net/u012798083/article/details/85852830

转载于:https://my.oschina.net/u/3005325/blog/2999420

你可能感兴趣的文章
iOS-Block全解
查看>>
新篇章 每周分享第22期 (20180910~20180917)
查看>>
数值计算 插值与拟合
查看>>
Java学习必备书籍推荐终极版!
查看>>
thinkphp3.2.3源码学习(3)
查看>>
深入理解Java虚拟机之性能监控与故障处理工具
查看>>
Vue学习之路1-集成环境安装
查看>>
Chatopera企业聊天机器人解决方案
查看>>
[阿里云Java Web环境搭建]二、Ubuntu安装JDK
查看>>
大学生毕业后想成为产品经理?那你得先从以下几个方面入手!
查看>>
商品定时器
查看>>
记一次微信小程序动画实现
查看>>
Spring注解基础笔记
查看>>
HBase在移动广告监测产品中的应用
查看>>
Centos7下安装FastDFS和nginx的详细步骤
查看>>
spring boot websocket广播式
查看>>
设计模式之单例模式
查看>>
技术分享 | 基于 Tron 的 Dapp 开发实战分享
查看>>
CITA 是如何达到 15000 TPS 的?
查看>>
老司机 iOS 周报 #64 | 2019-04-22
查看>>