热消息:基于MongoDB的实时数仓实现
一、概述
目前公司离线数仓现状,数仓部门每日凌晨后处理昨天的线上业务数据,因此第二天业务人员才看到的报表,数据是T-1的,因此数据是具有滞后性,尤其在互联网金融公司,有业务人员需要做信贷的风险管控,及时的调整一些风控规则和策略,但是不能立刻看到效果,而是需要等到第二天才可以看到调整的效果,因此才有了实时数仓的需求。线上业务数据基本存储在Mysql和MongoDB数据库中,因此实时数仓会基于这两个工作流实现,本文重点讲述基于MongoDB实现实时数仓的架构。
由于线上MongoDB是Sharding模式,规模中等,但由于数据量比较大,因此集群的IO一直存储高负荷状态,无法开放查询功能给业务人员进行实时查询。期间由于一个业务部分查询条件Key值有误造成全库扫描(COLLSCAN),造成在业务出现很多Slow-Query,因此线上集群不再提供个人查询需求,基于目前现状,有我们基础架构部调研并基于MongoDB实现的实时数仓的技术方案。
二、实现的具体步骤
2.1 架构图
a) 架构图中"绿色"线条是提供风控业务人员实时查询策略效果的流程图,由于服务器资源有限,因此从上线MongoDB-Sharding实时同步到线下MongoDB—RS(副本),因此不可能保存全部数据,而且对保存数据的有效期也有限制,在实现前期规划中实时数据默认保留14天(在线下mongodb库中对数据表需要增加过期索引) b) 架构图中"蓝色"线条是提供给实时数仓,并且保留历史数据。2.2 Debezium CDC实现过程
mongodb同步工具:mongo-kafka 官方提供的jar包,具备Source、Sink功能,但是不支持CDC。无法从上线MongoDB库同步到线下MongoDB库,最初选择Confluent工具是由于它集成了多个同步组件,是目前比较流行的同步工具,同时是一个可靠的,高性能的流处理平台。但是由于MongoDB同步需求的改变,需要选择一种支持CDC的同步工具-Debezium。
【资料图】
Debezium-MongoDB连接器可以监视MongoDB副本集或MongoDB分片群集中数据库和集合中的文档更改,并将这些更改记录为Kafka主题中的事件。连接器自动处理分片群集中分片的添加或删除,每个副本集的成员资格更改,每个副本集内的选举以及等待通信问题的解决。
目前选择方案: 使用Debezium Souce 同步mongo数据进入Kafka, 然后使用Mongo-Kafka Sink功能同步Kafka 数据到线下MongoDB库。这样既可以解决数仓实时读取Kafka,又能解决政审部门查询线下MongoDB库的问题。
2.2.1 工具集成
1) 下载源码 地址:https://github.com/debezium/debezium/archive/v0.10.0.Final.tar.gz2) 业务需求 在每条update/delete数据记录中增加oid标识,以提供数仓溯源使用。3) 实现方法 打开debezium/RecordMakers.java::createRecords() 中增加value.put("objectid", objId);4) 编译 命令:mvn install -pl debezium-connector-mongodb -Ddocker.skip.build=true -Ddocker.skip.run=true -DskipITs=true5) 构建新docker镜像 将编译后的包:debezium-connector-mongodb/target/debezium-connector-mongodb-0.10.0.Final.jar 拷贝到debezium/connect:0.10 Docker容器内。重新commit、push到测试环境。6) 打包Sink功能 将Mongo-Kafka 编译后的jar包(mongo-kafka-0.3-SNAPSHOT-all.jar) 拷贝到debezium/connect:0.10 Docker容器内/kafka/connect/mongodb-kafka-connect目录下。需要提前创建mongodb-kafka-connect目录。 重新commit、push image到测试环境。7) 容器内目录结构[kafka@deb-connect ~]$ ls -l connect/total 8drwxr-xr-x 1 kafka kafka 52 Dec 1 16:18 debezium-connector-mongodbdrwxr-xr-x 1 kafka kafka 4096 Oct 2 00:52 debezium-connector-mysqldrwxr-xr-x 1 kafka kafka 204 Oct 2 00:52 debezium-connector-oracledrwxr-xr-x 1 kafka kafka 285 Oct 2 00:52 debezium-connector-postgresdrwxr-xr-x 1 kafka kafka 259 Oct 2 00:52 debezium-connector-sqlserverdrwxrwxr-x 1 kafka kafka 46 Nov 28 08:27 mongodb-kafka-connect复制代码
2.2.2 Debezium上线部署
# 由于需要提供Source和Sink功能,根据同步库的数量,适当的增加Docker数量,这样可以确保任务的正常高效执行。根据相同的GROUP_ID为一个集群,支持负载均衡。默认数据格式为:Avro。# 依赖的环境变量如下: GROUP_ID: "DW-MongoToKafka" KAFKA_HEAP_OPTS: "-Xms2G -Xmx8G" SERVICE_28083_NAME: "dw-mongo-connect" SANITIZE_FIELD_NAMES: "true" CONNECT_PRODUCER_MAX_REQUEST_SIZE: 16777216 KAFKA_PRODUCER_MAX_REQUEST_SIZE: 16777216 STATUS_STORAGE_TOPIC: "debezium_connect_status" CONFIG_STORAGE_TOPIC: "debezium_connect_configs" OFFSET_STORAGE_TOPIC: "debezium_connect_offsets" KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter" VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter" INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://dw-schema-registry.com" CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://dw-schema-registry.com" BOOTSTRAP_SERVERS: "dn5.infra.app:9092, dn6.infra.app:9092, dn7.infra.app:9092"复制代码
2.2.3 创建Source connector
# 使用API方式创建source connector,开启实时同步MongoDB-Sharding数据到Kafka Topiccurl -X POST -H "Content-Type: application/json" --data"{ "name": "debezium-source-表名", "config": { "connector.class":"io.debezium.connector.mongodb.MongoDbConnector", "sanitize.field.names":"true", "tasks.max":"1", "mongodb.hosts":"mongos地址:端口", "mongodb.user":"用户名", "mongodb.password":"密码", "mongodb.name":"datawarehouse.mongo.debezium", "database.whitelist":"库名", "collection.whitelist":"库名.表名", "max.request.size":"16777216", "database.history.kafka.bootstrap.servers":"dn5.infra.app:9092" }}" http://dw-mongo-connect.com/connectors/复制代码
2.2.4 创建Sink Connector
# 使用API方式创建sink connector,开启实时增量同步Kafka数据到线下MongoDB-RS库。curl -X POST -H "Content-Type: application/json" --data"{ "name": "debezium-sink-表名", "config": { "tasks.max":"1", "database":"目标库", "topics":"填写source connector同步的topic", "connection.uri":"mongodb://用户名:密码@IP:PORT/库名", "collection":"表名", "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "change.data.capture.handler":"com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler" }}" http://dw-mongo-connect.com/connectors/复制代码
2.2.5 Topic 数据保留时效
# 由于kafka服务器存储受限,根据业务数据需求修改topic 保留失效为3天kafka-topics --zookeeper zk地址:2181 --alter --topic TopicName --config retention.ms=259200000复制代码
2.2.6 检查Debezium同步数据效果
A) 查看Prometheus kafka 监控的Dashboard
B) 查看线下MongoDB-RS库下的数据
2.2.7 问题&记录
# 由于线上Mongo-Sharding集群对DataBase都有严格的权限管理,因此在创建connector后,一般会出现权限拒绝问题。错误信息如下【2019-11-30 16:49:52,955 ERROR MongoDB|datawarehouse.mongo.debezium|confrs Error while attempting to get oplog position: Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName="同步用户", source="admin", password=, mechanismProperties={}} [io.debezium.connector.mongodb.Replicator]com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName="同步用户", source="admin", password=, mechanismProperties={}}】使用Debezium Source connector 同步Mongo-sharding数据时,需要开启的权限为: mongos进入后admin库的read权限mongos> show users;{"_id" : "admin.同步用户","userId" : UUID("fb982511-c779-41b8-8a9f-9ba492c30c28"),"user" : "同步用户","db" : "admin","roles" : [{"role" : "read","db" : "risk"},{"role" : "read","db" : "admin"},{"role" : "read","db" : "config"}],"mechanisms" : ["SCRAM-SHA-1","SCRAM-SHA-256"]}进入每个Replica下,创建 admin和local库的 read权限。s5rs:PRIMARY> show users;{"_id" : "admin.同步用户","userId" : UUID("b99bd150-dc9c-4f67-8177-2580b78d63c1"),"user" : "同步用户","db" : "admin","roles" : [{"role" : "read","db" : "local"},{"role" : "read","db" : "admin"}],"mechanisms" : ["SCRAM-SHA-1","SCRAM-SHA-256"]}使用Mongo-Kakfa Sink connector操作线下Mongodb时,需要开启权限:riskPoolRs:PRIMARY> show users;{"_id" : "risk.同步用户","userId" : UUID("9f5e079f-a665-4664-830f-8b54f9848ea2"),"user" : "同步用户","db" : "库名","roles" : [{"role" : "readWrite","db" : "risk"},{"role" : "read","db" : "admin"},{"role" : "clusterAdmin","db" : "admin"}],"mechanisms" : ["SCRAM-SHA-1","SCRAM-SHA-256"]}复制代码
默认情况下debezium source connector 同步数据大小限制1M以内。 同步mongo大数据时需要修改此参数。"max.request.size":"16777216" 修改为16M
2.3 对接Presto
这个步骤比较简单,根据presto官方提供的配置说明
2.3.1 增加配置文件
# 在etc/catalog下创建mongodb.propertiesconnector.name=mongodbmongodb.seeds=IP:27017mongodb.credentials=用户名:密码@库名mongodb.schema-collection=presto_mongomongodb.socket-keep-alive=true复制代码
2.3.2 重启presto
bin/launcher stopbin/launcher start复制代码
2.3.3 问题&记录
问题:presto 连接mongo读取数据时,发现没有显示所有的字段?? 解决:在mongo库中查询schema数据,发现缺少某些字段值,登陆mongo手动更新schema数据,增加指定域值的显示,定义为varchar类型。 修改之前
修改之后
2.4 对接SuperSet
打开superset界面,选择添加数据源
打开SQL编辑器,即可进行实时查询mongo数据
三、准实时报表
结构图的"蓝色"线条 实现过程比较简单基于Flume对接Kafka写入Hive这个是数仓平台上的一个定时任务,实现比较简单,数据是实时同步的, 但是基于数仓的特性,不能做到分钟级别的报表,但是可以做到小时级别的。如果需要准实时报表,则需要基于Druid或Kylin等分析引擎处理数据,这个方案会在后面博文中介绍。
四、总结
在mongodb实时数仓架构实现过程中,由于环境不同,在部署过程中会遇到不少问题, 但是不要怕,正是因为这些问题才让你更深入的了解各个模块内部实现原理和机制,耐心一点,总会解决的。 另外,上述的基于MongoDB实现的实时数仓架构并不是最优的,主要是结合公司目前业务架构以及各个系统、网络等环境的限制,调研的实时方案。
标签:
相关文章
热消息:基于MongoDB的实时数仓实现
目前公司离线数仓现状,数仓部门每日凌晨后处理昨天的线上业务数据,因此第二天业务人员才看到的报表,数据是T-1的,因此数据是具有滞后性,尤
微头条丨Economic growth: Quantity vs quality
RobotsworkatasemiconductorcompanyinGaoyou,Jiangsuprovince,onFeb 24
环球今日讯!农业农村部:3月14日“农产品批发价格200指数”为131.71 比昨天上升0.03个点
上证报中国证券网讯据农业农村部监测,3月14日“农产品批发价格200指数”为131 71,比昨天上升0 03个点,“菜篮子”产品批发价格指数为13...
环球要闻:脚底板热是好事还是坏事_脚底板热怎么回事
1、病情分析:根据临床经验和发病率,一般认为是肾阴虚所致。2、阴虚则热,阴虚则阳亢,所以会出现脚热,通常会伴有一些阴虚的
今日快讯:保时捷IPO后首份财报出炉:2022年实现营收376亿欧元 营业利润68亿欧元同增27.4%
3月13日,保时捷发布了IPO之后的首份财报。数据显示,保时捷在2022财年营业收入约为376亿欧元,同比增长13 6%。报告期内,保时捷全球新车交付
世界看热讯:海口龙华区永东村开展道德评议会育和谐乡风
杂草丛生的荒地变增收地,农户家门前的庭院干净整洁……3月13日,走进龙华区龙桥镇永东村,文明新风扑面而来。据了解,永东村近年来通过...
全球百事通!无水奶油是人造奶油吗_人造奶油是什么做的
1、人造奶油一般用的是植物奶油提炼。2、植物油在一定条件下加氢一般是用大豆为原料加工而成的。3、人造奶油是用植物油进行氢
天天观点:2023北京考研复试调剂系统官网入口+开通时间
2023年北京考研复试调剂服务系统入口:点击进入调剂系统开通时间:将于2023年4月6日开通“网上调剂意向采集系统”开通时间:2023年3月31日调剂
环球速递!总胆固醇标准值是多少_总胆固醇高是什么意思
1、总胆固醇高一般是指高脂血症的问题,是指人的脂质代谢功能下降。总胆固醇是指血液中所有脂蛋白所含胆固醇的总和,包括甘油三
要闻:NBA最新排名!雄鹿掘金领跑,湖人跌出前十,开拓者奇才掉队
NBA最新排名!雄鹿掘金领跑,湖人跌出前十,开拓者奇才掉队,湖人,奇才,雄鹿队,76人队,丹佛掘金队,波特兰开拓者,纽约尼克斯队,波士顿凯尔特人
全球最新:盐津铺子(002847)3月10日主力资金净买入287.33万元
截至2023年3月10日收盘,盐津铺子(002847)报收于130 3元,上涨0 62%,换手率0 48%,成交量5458 0手,成交额7088 56万元。
天天通讯!依托草场、水库等资源 福清南岭发力乡村旅游
近日,福清市南岭镇与福清文投集团联合成立福建云上南岭文旅有限公司,助力南岭镇打造福州都市圈乡村旅游目的地。
环球通讯!七开头的四字成语_八开头的四字成语
1、八仙过海、八面玲珑、八拜之交、八府巡按、八九不离十、八面威风、八字没一撇、八斗之才、八珍玉食、八面见光、八街九陌、八
热点聚焦:今日如何查看电脑硬件配置_如何查看电脑硬件配置信息
1、鼠标点击桌面左下角开始按钮,找到运行,输入dxdiag,进入DirectX诊断工具界面。2、这时我们可以查看系统、显
【环球新视野】行道树是以什么标准“选拔”出来的
行道树,是街边最常见的“风景线”。不知道你有没有好奇过,同样都是种在道路两边的树,为啥有的轻枝薄叶,有的“当心落叶”,有的花香四溢...
每日视讯:装扮少女挑战攻略最简单_装扮少女挑战攻略
1、点击特殊挑战2关图标进入正文部分,可以看到6张有具体要求的衣服图片。单击下面的挑战进入关卡。2、首先,第一张图是眼罩
世界快消息!疯狂飞车城市飙车好玩吗 疯狂飞车城市飙车玩法简介
期待已久的手游疯狂飞车城市飙车即将登陆九游,这款手机游戏吸引了大批玩家的关注,想下载这款游戏,有很多粉丝都在问九游小编疯狂飞车城市飙
热点在线丨中央气象台卫星云图台风路径_中央气象台卫星云图
一、题文带情求素明行指具就过强来中央气象台天气预报中的卫星云图属于( )带情求素明行指具就过强来二、解答年认主便解须办
每日热门:阿里小鱼卡怎么头条免流_阿里小鱼卡怎么申请
1、阿里小鱼卡的申请方法如下:2、打开UC浏览器,个人设置中心;3、进入账户中心;4、点击界面,点击应用选项卡,进入应用
环球速递!评论君聊两会|文化服务更贴心 人民生活更美好
点击图片观看视频我们期待,到图书馆获取新知,到文化馆听课学艺,到小广场唱歌跳舞,会成为生活常态。文化服务更贴近、更贴心,一天天殷实起
当前快看:宝光股份:3月10日融资买入243.08万元,融资融券余额7180.58万元
3月10日,宝光股份(600379)融资买入243 08万元,融资偿还364 09万元,融资净卖出121 01万元,融资余额7180 58万元,近20个交易日中有12个交
观焦点:女生qq名字 好听(女生qq名)
女生qq名字好听,女生qq名这个很多人还不知道,现在让我们一起来看看吧!1、ヤo独特之蕞oοΟヤo拾囿叭勼oοΟ兲丄嘚訫图絮尘倾恬灵之舞爱哭dē小魔女
天天简讯:怎么取消分页预览的线_怎么取消分页预览
1、重新设置到分页预览后,再把粗蓝框用鼠标拖大,即将粗蓝框拖到你要打印的内容的范围。2、用鼠标操作就可以了。3、记住是拖