当前位置: 首页 > news >正文

ES海量数据更新及导入导出备份

一、根据查询条件更新字段

from elasticsearch import Elasticsearch
import redis
import json# 替换下面的用户名、密码和Elasticsearch服务器地址
username = 'elastic'
password = 'password '
es_host = 'https://127.0.0.2:30674'# 使用Elasticsearch实例化时传递用户名和密码
es = Elasticsearch(hosts=[es_host],basic_auth=(username, password),verify_certs=False# 如果你的Elasticsearch是通过SSL加密的,还可以添加下面的参数# use_ssl=True,# verify_certs=True,# ca_certs='/path/to/ca/cert',
)# 使用Elasticsearch实例进行操作,例如搜索
# print(es.cluster.state())# response = es.search(index="remote_statistics_202409", query={"match_all": {}})
# print(response)# # 连接Elasticsearch和Redis
# #es = Elasticsearch("http://localhost:9200")
r = redis.StrictRedis(host='10.7.9.13', port=32197, db=11)# 假设你的Redis键是'my_key'
keys = r.keys()count = 0my_map = dict()
for key in keys:maps = r.hgetall(key)print(key)#list_map = dict()for key01, value in maps.items():# print(type(json.loads(key)))# print(type(value))# print(type(json.loads(value)))# print(json.loads(key),  json.loads(value))aa = json.loads(value).get("Latitude")if not aa.startswith("0."):# list_map[json.loads(key01)] = json.loads(value)my_map[key.decode('utf-8') + str(json.loads(key01))] = json.loads(value)# my_list.append(list_map)print(len(my_map))# print(type(my_map.get("21V70000110122B000139"+str(1719713910000))))# print(my_map.get("21V70000110122B000139"+str(1719713910000)))# print(my_map["21V70000110122B0001391719713910000"])# new_map={}# new_map["21V70000110122B000139"] = my_map["21V70000110122B0001391719713910000"]# for key02, value in my_map.items():#     print(key02)#     print(type(key02))#     print(type(value))#     count = len(value) + count#     print(count)# print(key)# print(maps.__len__())# count = count + maps.__len__()# print(count)## one = r.hget(name=key, key='1719563700000')# print(one)# print(type(one))## if one is not None:#     one_json = json.loads(one)#     print(type(one_json))#     print(one_json.get("Latitude"))#     print(one_json.get("Longitude"))## print("-------------------------------------------------")# 从Redis获取数据# redis_data = r.get(redis_key)# key_map = my_map["21V70000110122B000139"]# print(type(key_map))# key_json = key_map.get(1719713910000)# print(type(key_json))# print(key_json)# print(key_json.get('Latitude'))# minutes = create_at // 60000# left = create_at % 60000 // 1000## if left <= 15:#     left = 0# elif left > 45:#     left = 60# else:#     left = 30## map_key = minutes * 60000 + left * 1000# 如果存在,解析数据并更新Elasticsearch# 构建更新查询script = {# "source": "ctx._source.field_to_update = params.new_value",# "source": "ctx._source['Latitude'] = params.new_value[ctx._source.RemoteId][ctx._source.createAt].get('Latitude')",# "source": "ctx._source['Latitude'] = params.new_value['21V70000110122B0001391719713910000']['Latitude'];ctx._source['Longitude'] = params.new_value['21V70000110122B0001391719713910000']['Longitude'];ctx._source['tmp'] = params.new_value['21V70000110122B0001391719713910000']",# "source": "def aa=ctx._source.RemoteId;ctx._source['Latitude'] = params.new_value['21V70000110122B0001391719713910000']['Latitude'];ctx._source['Longitude'] = params.new_value['21V70000110122B0001391719713910000']['Longitude'];ctx._source['rrc'] = params.new_value['21V70000110122B0001391719713910000'];ctx._source.remove('tmp')","source": "def create_at=ctx._source.createAt;def minutes = Math.floor(create_at / 60000);""def left = Math.floor(create_at % 60000 / 1000);if(left<=15) {left=0} else if(left >45){ left=60} else {left=30} def form_time= minutes * 60000 + left * 1000;""ctx._source['create_lltime'] = form_time;def key = ctx._source.RemoteId + (Long)form_time ;def rru=  params.new_value[key];""if(rru !=null) {ctx._source['Latitude']=rru['Latitude'];ctx._source['Longitude']=rru['Longitude'];ctx._source['rrc'] = rru}",# "source": "def aa=ctx._source.RemoteId;ctx._source['Latitude'] = aa;ctx._source['Longitude'] = params.new_value['21V70000110122B0001391719713910000']['Longitude'];ctx._source['tmp'] = params.new_value['21V70000110122B0001391719713910000']","params": {"new_value": my_map}, "lang": "painless"}
#
# # 更新查询
ret = es.update_by_query(index="remote_statistics_202406",script=script,# body={#     "query": {#         "match": {#             "id_field": my_list['id_value'],,,,,,,#         }#     }## },slices="auto",wait_for_completion=False,conflicts="proceed")print(ret)

二、ES数据批量导出

import json
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
import urllib3
urllib3.disable_warnings()
# 变量
start_date = 1735689600000
end_date = 1738368000000
#index_name = 'remote_statistics_202410'
index_name = 'remote_statistics_202501'# 替换下面的用户名、密码和Elasticsearch服务器地址
username = 'elastic'
password = 'password'
es_host = 'https://127.0.0.1:32293'# 使用Elasticsearch实例化时传递用户名和密码
es = Elasticsearch(hosts=[es_host],basic_auth=(username, password),verify_certs=False# 如果你的Elasticsearch是通过SSL加密的,还可以添加下面的参数# use_ssl=True,# verify_certs=True,# ca_certs='/path/to/ca/cert',
)print("----------start---------------")def fetch_data(start_time, end_time):results = helpers.scan(es, body={"query": {"range": {"createAt": {"gte": start_time,"lt": end_time}}}}, index=index_name)return resultsif __name__ == "__main__":step = 60 * 60 * 1000for i in range(start_date, end_date+8*step, step):date = datetime.fromtimestamp(i / 1000)print("**********************************************************")print(date)print("**********************************************************")ret = fetch_data(i, i + step)count = 0with open(str(i) + '.json', 'w') as f:for doc in ret:f.write(json.dumps(doc['_source']) + '\n')count = count + 1print(count)

三、ES数据批量导入

import json
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import os
import urllib3
urllib3.disable_warnings()
# 变量
#index_name = 'remote_statistics_202410'
index_name = 'network_statistics_202410'# 替换下面的用户名、密码和Elasticsearch服务器地址
username = 'elastic'
password = 'password '
es_host = 'https://127.0.0.1:32067'# 使用Elasticsearch实例化时传递用户名和密码
es = Elasticsearch(hosts=[es_host],basic_auth=(username, password),verify_certs=False# 如果你的Elasticsearch是通过SSL加密的,还可以添加下面的参数# use_ssl=True,# verify_certs=True,# ca_certs='/path/to/ca/cert',
)def bulk_index_file(idx_name, file_path):current_dir = os.getcwd()file_names = os.listdir(current_dir)for file_name in file_names:if file_name.endswith(".json"):with open(file_name, 'r') as file:try:print(file_name)actions = (json.loads(line) for line in file)helpers.bulk(es, actions, index=idx_name)except Exception as e:print("error-----------------------")print(e)print(file_name)# 调用函数
bulk_index_file(index_name, None)

http://www.lqws.cn/news/118729.html

相关文章:

  • 高防服务器价格高原因分析
  • git引用概念(git reference,git ref)(简化对复杂SHA-1哈希值的管理)(分支引用、标签引用、HEAD引用、远程引用、特殊引用)
  • mysql知识点
  • 【C++】AVL树的概念及实现(万字图文超详解)
  • 电商、交通、医疗三大应用典型分析
  • 【开源工具】 黑客帝国系列系统监控工具:基于PyQt5的全方位资源监控系统
  • go的工具库:github.com/expr-lang/expr
  • Python爬虫:trafilatura 的详细使用(高效的网页正文提取工具)
  • Vue3+Vite中lodash-es安装与使用指南
  • 共识机制全景图:PoW、PoS 与 DAG 的技术对比
  • NFT 市场开发:基于 Ethereum 和 IPFS 构建去中心化平台
  • 正则表达式检测文件类型是否为视频或图片
  • 实时通信RTC与传统直播的异同
  • HashMap中的put方法执行流程(流程图)
  • 【免杀】C2免杀技术(十五)shellcode混淆uuid/ipv6/mac
  • 微软重磅发布Magentic UI,交互式AI Agent助手实测!
  • SQL 中 JOIN 的执行顺序优化指南
  • 神经网络-Day44
  • 根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要
  • Python爬虫爬取天猫商品数据,详细教程【Python经典实战项目】
  • SpringAI(GA):Nacos2下的分布式MCP
  • 【25软考网工】第十章 网络规划与设计(1)综合布线
  • 基于Axure+墨刀设计的电梯管理系统云台ERP的中保真原型图
  • [Java 基础]注释
  • 生成式AI驱动的智能采集实战
  • NeRF PyTorch 源码解读 - NDC空间
  • Linux容器篇、第一章_01Linux系统下 Docker 安装与镜像部署全攻略
  • 回归分析-非线性回归及岭回归.docx
  • LabVIEW的MathScript Node 绘图功能
  • OpenCV C++ 学习笔记(六):绘制文本、几何绘图、查找/绘制轮廓