当前位置: 首页 > news >正文

郑州网站建设 58/qq引流推广软件哪个好

郑州网站建设 58,qq引流推广软件哪个好,b2c网站建设,中山顺德网站建设公众号关注 「奇妙的 Linux 世界」设为「星标」,每天带你玩转 Linux !背景众所周知单个机房在出现不可抗拒的问题(如断电、断网等因素)时,会导致无法正常提供服务,会对业务造成潜在的损失。所以在协同办公领…

公众号关注 「奇妙的 Linux 世界」

设为「星标」,每天带你玩转 Linux !

7ee9c5735371458edf0cc5d16e34aee9.png

背景

众所周知单个机房在出现不可抗拒的问题(如断电、断网等因素)时,会导致无法正常提供服务,会对业务造成潜在的损失。所以在协同办公领域,一种可以基于同城或异地多活机制的高可用设计,在保障数据一致性的同时,能够最大程度降低由于机房的仅单点可用所导致的潜在高可用问题,最大程度上保障业务的用户体验,降低单点问题对业务造成的潜在损失显得尤为重要。

同城双活,对于生产的高可用保障,重大的意义和价值是不可言喻的。表面上同城双活只是简单的部署了一套生产环境而已,但是在架构上,这个改变的影响是巨大的,无状态应用的高可用管理、请求流量的管理、版本发布的管理、网络架构的管理等,其提升的架构复杂度巨大。

结合真实的协同办公产品:京办(为北京市政府提供协同办公服务的综合性平台)生产环境面对的复杂的政务网络以及京办同城双活架构演进的案例,给大家介绍下京办持续改进、分阶段演进过程中的一些思考和实践经验的总结。本文仅针对 ES 集群在跨机房同步过程中的方案和经验进行介绍和总结。

架构

1. 部署 Logstash 在金山云机房上,Logstash 启动多个实例(按不同的类型分类,提高同步效率),并且和金山云机房的 ES 集群在相同的 VPC

2. Logstash 需要配置大网访问权限,保证 Logstash 和 ES 原集群和目标集群互通。

3. 数据迁移可以全量迁移和增量迁移,首次迁移都是全量迁移后续的增加数据选择增量迁移。

4. 增量迁移需要改造增加识别的增量数据的标识,具体方法后续进行介绍。

987456c866e15f501958ee07ff22e61e.png

原理

Logstash 工作原理

1a587613e6a8b7edce21d3dfd5d257c1.png

Logstash 分为三个部分 input 、filter、ouput:

1. input 处理接收数据,数据可以来源 ES,日志文件,kafka 等通道.

2. filter 对数据进行过滤,清洗。

3. ouput 输出数据到目标设备,可以输出到 ES,kafka,文件等。

增量同步原理

1. 对于 T 时刻的数据,先使用 Logstash 将 T 以前的所有数据迁移到有孚机房京东云 ES,假设用时∆T

2. 对于 T 到 T+∆T 的增量数据,再次使用 logstash 将数据导入到有孚机房京东云的 ES 集群

3. 重复上述步骤 2,直到∆T 足够小,此时将业务切换到华为云,最后完成新增数据的迁移

适用范围:ES 的数据中带有时间戳或者其他能够区分新旧数据的标签

流程



e4abf94f615f89a74f8d5d0884dbd45f.png



准备工作

1. 创建 ECS 和安装 JDK 忽略,自行安装即可

2. 下载对应版本的 Logstash,尽量选择与 Elasticsearch 版本一致,或接近的版本安装即可

https://www.elastic.co/cn/downloads/logstash

1) 源码下载直接解压安装包,开箱即用

2)修改对内存使用,logstash 默认的堆内存是 1G,根据 ECS 集群选择合适的内存,可以加快集群数据的迁移效率。

f5ba5af620449136ee5189c3ad58ee09.png

 3. 迁移索引

Logstash 会帮助用户自动创建索引,但是自动创建的索引和用户本身的索引会有些许差异,导致最终数据的搜索格式不一致,一般索引需要手动创建,保证索引的数据完全一致。

以下提供创建索引的 python 脚本,用户可以使用该脚本创建需要的索引。

create_mapping.py 文件是同步索引的 python 脚本,config.yaml 是集群地址配置文件。

注:使用该脚本需要安装相关依赖

yum install -y PyYAML
yum install -y python-requests


拷贝以下代码保存为 create_mapping.py:

import yaml
import requests
import json
import getopt
import sysdefhelp():print"""usage:-h/--help print this help.-c/--config config file path, default is config.yamlexample:  python create_mapping.py -c config.yaml """
defprocess_mapping(index_mapping, dest_index):print(index_mapping)# remove unnecessary keysdel index_mapping["settings"]["index"]["provided_name"]del index_mapping["settings"]["index"]["uuid"]del index_mapping["settings"]["index"]["creation_date"]del index_mapping["settings"]["index"]["version"]# check aliasaliases = index_mapping["aliases"]for alias inlist(aliases.keys()):if alias == dest_index:print("source index "+ dest_index +" alias "+ alias +" is the same as dest_index name, will remove this alias.")del index_mapping["aliases"][alias]if index_mapping["settings"]["index"].has_key("lifecycle"):lifecycle = index_mapping["settings"]["index"]["lifecycle"]opendistro ={"opendistro":{"index_state_management":{"policy_id": lifecycle["name"],"rollover_alias": lifecycle["rollover_alias"]}}}index_mapping["settings"].update(opendistro)# index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]del index_mapping["settings"]["index"]["lifecycle"]print(index_mapping)return index_mapping
defput_mapping_to_target(url, mapping, source_index, dest_auth=None):headers ={'Content-Type':'application/json'}create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)if create_resp.status_code !=200:print("create index "+ url +" failed with response: "+str(create_resp)+", source index is "+ source_index)print(create_resp.text)withopen(source_index +".json","w")as f:json.dump(mapping, f)
defmain():config_yaml ="config.yaml"opts, args = getopt.getopt(sys.argv[1:],'-h-c:',['help','config='])for opt_name, opt_value in opts:if opt_name in('-h','--help'):help()exit()if opt_name in('-c','--config'):config_yaml = opt_valueconfig_file =open(config_yaml)config = yaml.load(config_file)source = config["source"]source_user = config["source_user"]source_passwd = config["source_passwd"]source_auth =Noneif source_user !="":source_auth =(source_user, source_passwd)dest = config["destination"]dest_user = config["destination_user"]dest_passwd = config["destination_passwd"]dest_auth =Noneif dest_user !="":dest_auth =(dest_user, dest_passwd)print(source_auth)print(dest_auth)# only deal with mapping listif config["only_mapping"]:for source_index, dest_index in config["mapping"].iteritems():print("start to process source index"+ source_index +", target index: "+ dest_index)source_url = source +"/"+ source_indexresponse = requests.get(source_url, auth=source_auth)if response.status_code !=200:print("*** get ElasticSearch message failed. resp statusCode:"+str(response.status_code)+" response is "+ response.text)continuemapping = response.json()index_mapping = process_mapping(mapping[source_index], dest_index)dest_url = dest +"/"+ dest_indexput_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)print("process source index "+ source_index +" to target index "+ dest_index +" successed.")else:# get all indicesresponse = requests.get(source +"/_alias", auth=source_auth)if response.status_code !=200:print("*** get all index failed. resp statusCode:"+str(response.status_code)+" response is "+ response.text)exit()all_index = response.json()for index inlist(all_index.keys()):if"."in index:continueprint("start to process source index"+ index)source_url = source +"/"+ indexindex_response = requests.get(source_url, auth=source_auth)if index_response.status_code !=200:print("*** get ElasticSearch message failed. resp statusCode:"+str(index_response.status_code)+" response is "+ index_response.text)continuemapping = index_response.json()dest_index = indexif index in config["mapping"].keys():dest_index = config["mapping"][index]index_mapping = process_mapping(mapping[index], dest_index)dest_url = dest +"/"+ dest_indexput_mapping_to_target(dest_url, index_mapping, index, dest_auth)print("process source index "+ index +" to target index "+ dest_index +" successed.")if __name__ =='__main__':main()

配置文件保存为 config.yaml:

# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"# 是否只处理这个文件中mapping地址的索引
# 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建
# 如果设置成false,则会取源端集群的所有索引,除去(.kibana)
# 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称
# 如果匹配不到,则使用源端原始的索引名称
only_mapping: true# 要迁移的索引,key为源端的索引名字,value为目的端的索引名字
mapping:source_index: dest_index

以上代码和配置文件准备完成,直接执行 python create_mapping.py 即可完成索引同步。

索引同步完成可以取目标集群的 kibana 上查看或者执行 curl 查看索引迁移情况:

GET _cat/indices?v

9af4139672a45629c3ce1e006aba327c.png

全量迁移

Logstash 配置位于 config 目录下。

用户可以参考配置修改 Logstash 配置文件,为了保证迁移数据的准确性,一般建议建立多组 Logstash,分批次迁移数据,每个 Logstash 迁移部分数据。

配置集群间迁移配置参考:

b9df9dcd3610c72ca7c43b082c6587f0.png



input{elasticsearch{# 源端地址hosts =>  ["ip1:port1","ip2:port2"]# 安全集群配置登录用户名密码user => "username"password => "password"# 需要迁移的索引列表,以逗号分隔,支持通配符index => "a_*,b_*"# 以下三项保持默认即可,包含线程数和迁移数据大小和logstash jvm配置相关docinfo=>trueslices => 10size => 2000scroll => "60m"}
}filter {# 去掉一些logstash自己加的字段mutate {remove_field => ["@timestamp", "@version"]}
}output{elasticsearch{# 目的端es地址hosts => ["http://ip:port"]# 安全集群配置登录用户名密码user => "username"password => "password"# 目的端索引名称,以下配置为和源端保持一致index => "%{[@metadata][_index]}"# 目的端索引type,以下配置为和源端保持一致document_type => "%{[@metadata][_type]}"# 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好document_id => "%{[@metadata][_id]}"ilm_enabled => falsemanage_template => false}# 调试信息,正式迁移去掉stdout { codec => rubydebug { metadata => true }}
}

增量迁移

预处理:

1. @timestamp 在 elasticsearch2.0.0beta 版本后弃用

https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping-timestamp-field.html

2. 本次对于京办从金山云机房迁移到京东有孚机房,所涉及到的业务领域多,各个业务线中所代表新增记录的时间戳字段不统一,所涉及到的兼容工作量大,于是考虑通过 elasticsearch 中预处理功能 pipeline 进行预处理添加统一增量标记字段:gmt_created_at,以减少迁移工作的复杂度(各自业务线可自行评估是否需要此步骤)。

PUT _ingest/pipeline/gmt_created_at
{"description":"Adds gmt_created_at timestamp to documents","processors":[{"set":{"field":"_source.gmt_created_at","value":"{{_ingest.timestamp}}"}}]
}


3. 检查 pipeline 是否生效

GET _ingest/pipeline/*

4. 各个 index 设置对应 settings 增加 pipeline 为默认预处理

PUT index_xxxx/_settings
{"settings": {"index.default_pipeline": "gmt_created_at"}
}

5. 检查新增 settings 是否生效

GET index_xxxx/_settings

c9d03b88e93caf89da57075b3e7eff68.png

增量迁移脚本

schedule-migrate.conf

index:可以使用通配符的方式

query: 增量同步的 DSL,统一 gmt_create_at 为增量同步的特殊标记

schedule: 每分钟同步一把,"* * * * *"

input {
elasticsearch {hosts =>["ip:port"]# 安全集群配置登录用户名密码user =>"username"password =>"password"index =>"index_*"query =>'{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'size =>5000scroll =>"5m"docinfo =>trueschedule =>"* * * * *"}
}
filter {mutate {remove_field =>["source", "@version"]}
}
output {elasticsearch {# 目的端es地址hosts =>["http://ip:port"]# 安全集群配置登录用户名密码user =>"username"password =>"password"index =>"%{[@metadata][_index]}"document_type =>"%{[@metadata][_type]}"document_id =>"%{[@metadata][_id]}"ilm_enabled =>falsemanage_template =>false}# 调试信息,正式迁移去掉
stdout { codec => rubydebug { metadata =>true}}
}

问题:

mapping 中存在 join 父子类型的字段,直接迁移报 400 异常



f8fc5e0d00bf44091d8191eb9e17b9a5.png

[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, 
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #<LogStash::Event:0x3b3df773>], 
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400, 
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", 
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}

解决方法:

https://discuss.elastic.co/t/an-routing-missing-exception-is-obtained-when-reindex-sets-the-routing-value/155140 https://github.com/elastic/elasticsearch/issues/26183

结合业务特征,通过在 filter 中加入小量的 ruby 代码,将_routing 的值取出来,放回 logstah event 中,由此问题得以解决。

示例:

ee27f483ce5901a7f22c7651589cd7f5.png

本文转载自:「OSC开源社区」,原文:https://url.hi-linux.com/XHYXQ,版权归原作者所有。欢迎投稿,投稿邮箱: editor@hi-linux.com。

1b41cbea8813b89dd4d1a83d94d78cd8.gif

最近,我们建立了一个技术交流微信群。目前群里已加入了不少行业内的大神,有兴趣的同学可以加入和我们一起交流技术,在 「奇妙的 Linux 世界」 公众号直接回复 「加群」 邀请你入群。

91073af0f5ce5d3cdf5fb82650c49729.png

你可能还喜欢

点击下方图片即可阅读

cc2dc5ca2331fe7c0a36bfe09401898c.png

Redis-Shake: 一款阿里开源,功能超级强大的 Redis 数据迁移工具

043f4365168cb1735dfc7493f81480c6.png
点击上方图片,『美团|饿了么』外卖红包天天免费领

1a8923c1abbc0b0082323cf8e26029aa.png

更多有趣的互联网新鲜事,关注「奇妙的互联网」视频号全了解!

http://www.lbrq.cn/news/1047097.html

相关文章:

  • 华春建设工程项目管理有限公司网站/seo诊断站长
  • 郑州外贸网站建设商家/免费域名服务器
  • 济南专门做网站的公司/友情链接导航
  • 无锡好的网站公司/舆情监控系统
  • 国产crm系统/seo优化专员招聘
  • 有什么国外的设计网站推荐/20个排版漂亮的网页设计
  • h5是动态网站吗/软文写作案例
  • 建设网站的价格分析/温州seo优化
  • wordpress菜单调用/常德seo
  • 大良建网站/打开百度网址
  • wordpress 私密文章/整站seo排名外包
  • 慈溪网站设计/竞价推广账户竞价托管收费
  • 优秀网站首页设计/网店营销策略有哪些
  • 佛山网站建设是哪个/安卓aso优化排名
  • 做网站教程百度云/广州百度seo优化排名
  • 怎么做兼职网站吗/百度应用市场下载安装
  • 内网 wordpress慢/网站优化推广的方法
  • 算命网站怎么做/网站关键词优化价格
  • cdn wordpress 登录/西安百度提升优化
  • 潜江做网站/营销软文怎么写
  • 怎样淘宝做seo网站推广/设计网络营销方案
  • 摄影图片网站/百度收录查询代码
  • 做赌博游戏网站违法/如何让百度快速收录新网站
  • 信誉好的邢台做网站/杭州新站整站seo
  • 网站做短信接口具体方法/大数据营销平台那么多
  • 2013我国中小企业接入互联网和网站建设情况/小程序开发公司
  • 安吉网站开发/深圳seo优化方案
  • 网站关键字排名优化/自己创建网站
  • WordPress小说网站源码/网站排名优化怎样做
  • 网站需要多少钱/互联网营销师培训学校
  • Speaking T2 - Dining Hall to CloseDuring Spring Break
  • drippingblues靶机通关练习笔记
  • 【渲染流水线】[几何阶段]-[归一化NDC]以UnityURP为例
  • Android MediaMetadataRetriever取视频封面,Kotlin(1)
  • Python 属性描述符(描述符用法建议)
  • 爬虫攻防战:反爬与反反爬全解析