Flume到MongoDB的日志行为收集

需求概述

某公司需要对玩家行为日志进行统一收集,收集后全部存入MongoDB,同时部分写入Kafka,对接SparkSteaming 做实时计算处理。他们使用的日志收集框架完全自己手动管理 效率低 需要一款可以实现高可用 分布式横向扩展的日志收集框架这时候Flume来了~

概述

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

Flume主要由3个重要的组件购成:

  • Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。
  • Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。
  • Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。

对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。

我们实现Java写日志到Flume,Flume最终把日志写到MongoDB。目前官方Sink并没有提供,需要自己手动实现Sink

http://flume.apache.org/FlumeUserGuide.html#custom-sink

环境

  • Flume-1.8
  • MongoDB-4.0.3
  • JDK1.8

线上使用需在flume/lib下添加一下几个jar包

  • mongodb-driver-3.8.2.jar
  • mongodb-driver-core-3.8.2.jar
  • bson-3.8.2.jar

配置说明

Property Name Default Description
hostNames host1:port1,host2,port2,…the mongodb host and port
database the mongodb database
collection the collection of database
user the username of databse
password the password of database
batchSize 100 the batchSize of sources
authentication_enabled False Whether u need a password and a user

如果没有密码和用户名 就不需要user password authentication_enabled T

如果有密码,设置authentication_enabled =True

代码实现

项目开源地址(点击原文查看地址)

https://gitee.com/czy006/MongoDBSink

配置文件

MongoDBFlume.conf

#定义组件名称

a1.sources = r

a1.sinks = s

a1.channels = c

#定义数据入口

a1.sources.r.channels = c

a1.sources.r.type = netcat

a1.sources.r.bind = localhost

a1.sources.r.port = 44444

##定义拦截器

#a1.sources.r.interceptors=i1

#a1.sources.r.interceptors.i1.type=regex_filter

#a1.sources.r.interceptors.i1.regex= ERROR

# 定义数据出口

a1.sinks.s.type = com.gzczy.MongodbSink.MongoSinkSelf

a1.sinks.s.hostNames=192.168.2.99:20000

a1.sinks.s.authentication_enabled=False

a1.sinks.s.database = gslog_test

#a1.sinks.s.password =

#a1.sinks.s.user =

a1.sinks.s.collection = logsearch_info

a1.sinks.s.batchSize = 3

a1.sinks.s.channel = c

# 使用内存管道

a1.channels.c.type = memory

a1.channels.c.capacity = 10000

a1.channels.c.transactionCapacity = 100

启动Flume:

[root@master apache-flume-1.8.0-bin]# bin/flume-ng agent –conf conf –conf-file conf/MongoDBFlume.conf –name a1 -Dflume.root.logger=info,console

启动telnet:

telnet localhost 44444

在telent模拟一条数据:

{“_sys”:”gslog”,”loginId”:101725,”_topic”:”loginIn”,”date”:”2018-11-06″,”rmb”:9999,”_lv”:1,”lv”:1,”uuid”:2569021610400000017,”_server”:”999″,”_job”:4,”vip”:999,”money”:999999,”_tag”:””,”exp”:0,”time”:”2018-11-06 16:11:392″}

检查数据库是否插入成功

后续还需要2个功能

  • 插入指定的集合,无需配置 根据传入的数据(JSON)进行动态插入指定集合(collection)
  • 实现mongodb连接池