`
zhao_rock
  • 浏览: 187987 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

基于flume-ng抓取mysql数据到kafka

阅读更多

flume是一个日志收集器,更多详细的介绍可以参照官网:http://flume.apache.org/

 

在apache flume的官网上没有找到sql数据源数据抓取的source,

可以利用github上的plugin插件:https://github.com/keedio/flume-ng-sql-source,1.4.3版本基于hibernate实现,已可以适配所有的关系型数据库。

目前的实验环境是在windows下,所以kafka在windows下相关的配置使用,参考了http://blog.csdn.net/linsongbin1/article/details/48022941

文章写的很到位,推荐给大家。

 

 

下面主要说一下flume-ng在windows下的启动,及具体的配置

启动:

“F:\Java\jdk1.8.0_101\bin\java.exe” -Xmx512m -Dlog4j.configuration=file:///E:\apache-flume-1.6.0-bin\conf\log4j.properties -cp "E:\apache-flume-1.6.0-bin\lib\*;E:\apache-flume-1.6.0-bin\plugins.d\sql-source\lib\flume-ng-sql-source-1.4.3-SNAPSHOT.jar;E:\apache-flume-1.6.0-bin\plugins.d\sql-source\libext\mysql-connector-java-5.1.35-bin.jar" org.apache.flume.node.Application -f E:\apache-flume-1.6.0-bin\conf\sql-kafka-conf.properties -n a1

 

主要的sql-kafka-conf.properties

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1

###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource

a1.sources.src-1.hibernate.connection.url = jdbc:mysql://127.0.0.1/test

# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = password
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=10000
a1.sources.src-1.status.file.path = E://apache-flume-1.6.0-bin
a1.sources.src-1.status.file.name = sqlSource.status

# Custom query
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select id,name from test_user where id > $@$ order by id asc

a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000

a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10

##############################

a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000


a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hellotest
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1


a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1

 

红色部分是需要注意的,有s


至此就完成了利用flume将mysql数据实时导入到kafka中

0
0
分享到:
评论
1 楼 kingding 2017-03-10  
我编译之后吧jar包拷贝到lib后按照你的配置修改后运行报错:
Could not configure sink  k1 due to: Component has no type. Cannot configure. k1
org.apache.flume.conf.ConfigurationException: Component has no type. Cannot configure. k1

请问是怎么回事

相关推荐

    Flume-ng-1.6.0-cdh.zip

    Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    flume-ng安装

    flume-ng安装

    flume-ng-sql-source-release-1.5.2.zip

    flume-ng-sql-source-release-1.5.2.jar 用flume-ng-sql-source 从数据库抽取数据到kafka,支持sql

    flume-ng-sql-source-1.5.3.jar

    flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...

    flume-ng-1.6.0-cdh5.5.0.tar.gz

    flume-ng-1.6.0-cdh5.5.0.tar.gz

    flume-ng-sql-source-1.5.2

    flume-ng-sql-source-1.5.2源码

    flume-ng-1.5.0-cdh5.3.6.rar

    flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...

    flume-ng-sql-source-1.5.1

    flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具

    apache-flume-1.5.0-cdh5.3.6-bin.zip

    flume-1.5.0-cdh5.3.6。 大数据日志收集工具 flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志...

    flume-ng-1.6.0 cdh5.7.0安装包

    flume-ng-1.6.0 cdh5.7.0安装包,稳定版本。大家可以自由下载

    flume-ng-elasticsearch6-sink.zip

    flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...

    flume-ng-sql-source-1.4.3.jar

    flume-ng-sql-source-1.4.3.jar

    flume-ng-sql-source

    包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载

    FlumeKafkaSink:Flume-ng Sink 插件生成到 Kafka

    Flume-NG 的配置 # example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources....

    flume-ng-sql-source-1.4.1.jar

    flume-ng-sql-source实现oracle增量数据读取 flume连接oracle增量数据读取

    flume-ng-1.6.0-cdh5.14.2.tar.gz

    Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,...同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS 11111

    flume-ng-1.6.0-cdh5.13.2

    CDH版本的flume Flume是Cloudera提供的一个高可用的,高可靠...当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

Global site tag (gtag.js) - Google Analytics