ElasticSearch迁移至openGauss

article/2025/8/2 16:39:53

Elasticsearch 作为一种高效的全文搜索引擎,广泛应用于实时搜索、日志分析等场景。而 openGauss,作为一款企业级关系型数据库,强调事务处理与数据一致性。那么,当这两者的应用场景和技术架构发生交集时,如何实现它们之间的平滑迁移呢?

本文将探讨 Elasticsearch 基础数据数据迁移至 openGauss 的解决方案,在此,我们首先根据等价实例来看一下 Elasticsearch 和关系型数据库(如 openGauss)的基础数据结构:

关系型数据库操作

CREATE TABLE products (    id INT PRIMARY KEY,    name VARCHAR(100),    price DECIMAL(10,2));
INSERT INTO products VALUES (1, 'Laptop', 999.99);

Elasticsearch等价操作

PUT /products{  "mappings": {    "properties": {      "id": { "type": "integer" },      "name": { "type": "text" },      "price": { "type": "double" }    }  }}
POST /products/_doc/1{  "id": 1,  "name": "Laptop",  "price": 999.99}

数据组织层级

  • 关系型数据库:

    Database → Table → Row/Column

  • Elasticsearch:

    6.x之前:Index → Type → Document (类似Database → Table → Row)

    7.x之后:Index → Document (Type被移除,强化了Index≈Table的对应关系)

Elasticsearch 概念

关系型数据库(如openGauss)概念

说明

索引(Index)库-表(Table)

对应关系

类型(Type)

(已弃用,7.x后无对应)

早期版本中类似表分区

文档(Document)

行(Row)

一条记录

字段(Field)

列(Column)

数据属性

映射(Mapping)

表结构定义(Schema)

定义字段类型等

索引别名(Alias)

视图(View)

虚拟索引/表

分片(Shard)

分区(Partition)

数据水平拆分

检索方式

1、向量检索

Elasticsearch 向量检索

# 1. 创建包含向量字段的索引PUT /image_vectors{  "mappings": {    "properties": {      "image_name": {        "type": "text"      },      "image_vector": {        "type": "dense_vector",        "dims": 512      }    }  }}
# 2. 插入向量数据POST /image_vectors/_doc{  "image_name": "sunset.jpg",  "image_vector": [0.12, 0.34, ..., 0.56]  // 512维向量}
# 3. 精确向量检索 (script_score)GET /image_vectors/_search{  "query": {    "script_score": {      "query": {"match_all": {}},      "script": {        "source": "cosineSimilarity(params.query_vector, 'image_vector') + 1.0",        "params": {          "query_vector": [0.23, 0.45, ..., 0.67]  // 查询向量        }      }    }  }}
# 4. 近似最近邻搜索 (kNN search)GET /image_vectors/_search{  "knn": {    "field": "image_vector",    "query_vector": [0.23, 0.45, ..., 0.67],    "k": 10,    "num_candidates": 100  }}

openGauss 向量检索(openGauss 从 7.0 版本开始支持向量检索功能)

# 1. 创建包含向量字段的表
-- 创建表CREATE TABLE image_vectors (  id SERIAL PRIMARY KEY,  image_name TEXT,  image_vector VECTOR(512)  -- 512维向量);#2. 插入向量数据INSERT INTO image_vectors (image_name, image_vector) VALUES ('sunset.jpg', '[0.12, 0.34, ..., 0.56]');
# 3. 精确向量检索 (余弦相似度)-- 使用余弦相似度SELECT id, image_name,        1 - (image_vector <=> '[0.23, 0.45, ..., 0.67]') AS cosine_similarityFROM image_vectorsORDER BY cosine_similarity DESCLIMIT 10;
# 4. 近似最近邻搜索 (使用IVFFLAT索引)-- 创建IVFFLAT索引CREATE INDEX idx_image_vector ON image_vectors USING IVFFLAT(image_vector) WITH (lists = 100);
-- 近似最近邻查询SELECT id, image_name,        image_vector <=> '[0.23, 0.45, ..., 0.67]' AS distanceFROM image_vectorsORDER BY distanceLIMIT 10;
2、全文检索

es全文检索 相当于 openGauss的LIKE和正则表达式

​​​​​​​​​​​​​​

# es 全文检索GET /products/_search{  "query": {    "match": {      "description": "search term"    }  }}
# openGauss 模糊查询SELECT * FROM products WHERE description LIKE '%search term%';
# openGauss 正则表达式匹配SELECT * FROM logs WHERE message ~ 'error|warning';

因此,根据数据层级及检索方式分析,迁移时将es的索引迁移到openGauss的一张表里。

环境准备

  • 已部署7.3 及以上(支持向量)版本的ElasticSearch实例

  • 已部署7.0.0-RC1 及以上版本(支持向量)的openGauss实例

  • 已安装3.8 及以上版本的Python环境

  • 已安装涉及的Python库

pip3 install psycopg2pip3 install requests pip3 install pyOpenSSL
#如果安装失败,可以考虑在一个新的虚拟环境中重新安装所需的库,执行以下命令:python3 -m venv venvsource venv/bin/activatepip install requests pyOpenSSL

前置条件

远程连接权限:

openGauss端:

​​​​​​​

#修改openGauss配置文件。将迁移脚本所在机器IP地址加入白名单,修改openGauss监听地址。# 执行以下命令gs_guc set -D {DATADIR} -c " listen_addresses = '\*'"
gs_guc set -D {DATADIR} -h "host all all x.x.x.x/32 sha256"
# 修改完毕后重启openGauss。
gs_ctl restart -D {DATADIR}

elasticsearch端:

​​​​​​​

vim /path/to/your_elasticsearch/config/elasticsearch.yml#修改network.hostnetwork.host: 0.0.0.0

openGauss端创建普通用户(赋权)、迁移的目标数据库:

​​​​​​​

 create user mig_test identified by 'Simple@123';
 grant all privileges to mig_test;
 create database es_to_og with owner mig_test;

迁移操作

1、根据本地部署的elasticsearch与openGauss对脚本进行配置修改,需要修改的内容如下:

​​​​​​​

# Elasticsearch 配置信息es_url = 'http://ip:port'  # Elasticsearch 服务器地址es_index = 'your_es_index'  # Elasticsearch 索引名
# openGauss 配置信息db_host = '127.0.0.1'   # openGauss服务器地址db_port = 5432          # openGauss 端口号db_name = 'your_opengauss_db' # 迁移到openGauss的数据库名称db_user = 'user_name'    # 连接openGauss的普通用户db_password = 'xxxxxx'   # 连接openGauss的用户密码

elasticsearchToOpenGauss.py迁移脚本如下:

​​​​​​​

import requestsimport psycopg2import jsonimport refrom typing import List, Dict, Any, Optional, Union
# Elasticsearch 配置信息es_url = 'http://192.168.0.114:9200'  # Elasticsearch 服务器地址es_index = 'my_dynamic_index'  # Elasticsearch 索引名
# openGauss 配置信息db_host = '192.168.0.219'   # openGauss服务器地址db_port = 15620          # openGauss 端口号db_name = 'es_to_og' # 迁移到openGauss的数据库名称db_user = 'mig_test'    # 连接openGauss的普通用户db_password = 'xxxxxx'   # 连接openGauss的用户密码
RESERVED_KEYWORDS = {    "select", "insert", "update", "delete", "drop", "table", "from", "where", "group",    "by", "having", "order", "limit", "join", "inner", "left", "right", "full", "union",    "all", "distinct", "as", "on", "and", "or", "not", "null", "true", "false", "case",    "when", "then", "else", "end", "exists", "like", "in", "between", "is", "like",    "references", "foreign", "primary", "key", "unique", "check", "default", "constraint",    "index", "unique", "varchar", "text", "int", "bigint", "smallint", "boolean", "timestamp"}
# 从 Elasticsearch 获取数据def fetch_data_from_es():    query = {        "query": {            "match_all": {}        },        "_source": True  # 获取所有字段    }    response = requests.get(f'{es_url}/{es_index}/_search', json=query)    if response.status_code == 200:        return response.json()['hits']['hits']    else:        raise Exception(f"Failed to fetch data from Elasticsearch: {response.status_code}, {response.text}")# 获取索引映射信息def fetch_mapping(es_url, es_index):    response = requests.get(f'{es_url}/{es_index}/_mapping')    if response.status_code == 200:        return response.json()    else:        raise Exception(f"Failed to fetch mapping: {response.status_code}, {response.text}")def get_field_type(es_url: str, es_index: str, field_name: str) -> str:    """ 获取 Elasticsearch 字段的类型 """    mappings = fetch_mapping(es_url, es_index)    print(f"Field name: {field_name}")    print(f"map: {mappings}")    # 获取 properties 字段    properties = mappings.get(es_index, {}).get('mappings', {}).get('properties', {})    # 遍历并查找字段的类型    field_type = 'text'  # 默认类型为 'text'    if field_name in properties:        field_type = properties[field_name].get('type', 'text')    elif 'fields' in properties.get(field_name, {}):        # 如果字段有子字段(比如 keyword),获取 'keyword' 类型        field_type = properties[field_name]['fields'].get('keyword', {}).get('type', 'text')    return field_type
def convert_dict_to_jsonb(value):    # 如果 value 是字典类型,递归调用该函数处理其中的每个元素    if isinstance(value, dict):        return json.dumps({k: convert_dict_to_jsonb(v) for k, v in value.items()})    # 如果 value 是列表类型,递归处理其中的每个元素    elif isinstance(value, list):        return json.dumps([convert_dict_to_jsonb(v) for v in value])    # 如果是其他类型(如字符串、数字),直接返回该值    else:        return value
# 映射 Elasticsearch 数据类型到 openGauss 类型def map_to_opengauss_type(es_type: str, dim: Optional[int] = None) -> str:    """Map Elasticsearch types to openGauss types"""    if isinstance(es_type, (dict, list)):  # 如果 es_type 是字典类型,则需要特殊处理        return 'JSONB'    type_map = {        "long": "BIGINT",  # 大整数        "integer": "INTEGER",  # 整数        "short": "SMALLINT",  # 小整数        "byte": "SMALLINT",  # 小字节        "float": "REAL",  # 浮点数        "double": "DOUBLE PRECISION",  # 双精度浮点数        "boolean": "BOOLEAN",  # 布尔值        "keyword": "VARCHAR",  # 关键字(字符串类型)        "text": "TEXT",  # 长文本        "date": "TIMESTAMP",  # 日期类型        "binary": "BYTEA",  # 二进制数据        "geo_point": "POINT",  # 地理坐标(经纬度)        "geo_shape": "GEOMETRY",  # 复杂地理形状        "nested": "JSONB",  # 嵌套对象        "object": "JSONB",  # 对象        "ip": "INET",  # IP 地址        "scaled_float": "REAL",  # 扩展浮动类型(带缩放的浮动)        "float_vector": f"VECTOR({dim})" if dim else "VECTOR",  # 浮动向量类型        "dense_vector": f"VECTOR({dim})" if dim else "VECTOR",  # 稠密向量类型        "binary_vector": f"BIT({dim})" if dim else "BIT",  # 二进制向量类型        "half_float": "REAL",  # 半精度浮动        "unsigned_long": "BIGINT",  # 无符号长整数        "date_nanos": "TIMESTAMP",  # 高精度日期时间        "alias": "TEXT",  # 别名(通常是字段的别名)    }
    # 如果 es_type 在映射表中,直接返回映射后的类型    if es_type in type_map:        print(f"es_type:{es_type} ----- og_type: {type_map[es_type]}")        return type_map[es_type]    else:        print(f"Warning: Unsupported Elasticsearch type '{es_type}', defaulting to 'TEXT'")        return 'TEXT'  # 默认使用 TEXT 类型# 函数:将非法字符替换为下划线def sanitize_name(field_name: str) -> str:    """处理字段名,确保不会与保留字冲突,且将非字母数字字符替换为下划线"""    # 将所有非字母数字字符替换为下划线    sanitized_name = re.sub(r'[^a-zA-Z0-9_]', '_', field_name)
    # 如果是保留字,则加双引号    if sanitized_name.lower() in RESERVED_KEYWORDS:        return f'"{sanitized_name}"'
    return sanitized_name# 创建 openGauss 表def create_table_in_opengauss(es_url, es_index, table_name):    columns_definition = ['id VARCHAR PRIMARY KEY']  # 增加 id 主键字段    seen_fields = set()  # 用于记录已经处理过的字段名
    # 获取 properties 字段    properties = fetch_mapping(es_url, es_index).get(es_index, {}).get('mappings', {}).get('properties', {})
    # 遍历每个字段    for field, field_info in properties.items():        # 如果该字段已经处理过,跳过        if field in seen_fields:            continue        # 获取字段的类型        es_type = field_info.get('type', 'text')        dim = field_info.get('dims', 0) if isinstance(field_info, dict) else 0        field_type = map_to_opengauss_type(es_type, dim)        sanitized_field_name = sanitize_name(field)        seen_fields.add(field)        columns_definition.append(f"{sanitized_field_name} {field_type}")    # 生成表创建 SQL    columns_str = ", ".join(columns_definition)    create_table_sql = f"DROP TABLE IF EXISTS {sanitize_name(table_name)}; CREATE TABLE {sanitize_name(table_name)} ({columns_str});"    try:        # 建立数据库连接并执行创建表 SQL        connection = psycopg2.connect(            host=db_host,            port=db_port,            dbname=db_name,            user=db_user,            password=db_password        )        cursor = connection.cursor()        cursor.execute(create_table_sql)        connection.commit()        print(f"Table {sanitize_name(table_name)} created successfully.")    except Exception as e:        print(f"Error while creating table {sanitize_name(table_name)}: {e}")    finally:        if connection:            cursor.close()            connection.close()
# 将数据插入到 openGauss 表中def insert_data_to_opengauss(table_name, es_source, es_id):    try:        # 建立数据库连接        connection = psycopg2.connect(            host=db_host,            port=db_port,            dbname=db_name,            user=db_user,            password=db_password        )        cursor = connection.cursor()
        # 动态生成插入 SQL 语句        sanitized_columns = ['id'] + [sanitize_name(col) for col in es_source.keys()]  # 清理列名        values = [es_id]
        # 处理每一列的数据类型,必要时进行转换        for column in es_source:            value = es_source[column]            if isinstance(value, (dict, list)):                # 如果是字典类型,转换为 JSONB                value = convert_dict_to_jsonb(value)            values.append(value)
        columns_str = ', '.join(sanitized_columns)        values_str = ', '.join(['%s'] * len(values))
        insert_sql = f"INSERT INTO {sanitize_name(table_name)} ({columns_str}) VALUES ({values_str})"        cursor.execute(insert_sql, values)
        # 提交事务        connection.commit()
    except Exception as e:        print(f"Error while inserting data into {table_name}: {e}")    finally:        if connection:            cursor.close()            connection.close()
# 主函数def main():    try:        es_data = fetch_data_from_es()        table_name = es_index  # 可以使用索引名作为表名
        create_table_in_opengauss(es_url, es_index, table_name)        for record in es_data:            es_source = record['_source']  # 获取 Elasticsearch 文档中的数据            es_id = record['_id']            insert_data_to_opengauss(table_name, es_source, es_id)        print(f"Successfully inserted data into table {table_name}.")    except Exception as e:        print(f"Migration failed: {e}")if __name__ == "__main__":    main()

2、执行脚本

python3 ./elasticsearchToOpenGauss.py

3、openGauss端查看数据​​​​​​​

#切换到迁移目标数据库openGauss=# \c es_to_og
#查看迁移的表es_to_og=# \d
#查看表结构es_to_og=# \d my_dynamic_index
#查看表数据es_to_og=# select c

图片

-END-


http://www.hkcw.cn/article/VnMVBefpHB.shtml

相关文章

搭建 Select 三级联动架构-东方仙盟插件开发 JavaScript ——仙盟创梦IDE

三级级联开卡必要性 在 “东方仙盟” 相关插件开发中&#xff0c;使用原生 HTML 和 JavaScript 实现三级联动选择&#xff08;如村庄 - 建筑 - 单元的选择&#xff09;有以下好处和意义&#xff0c;学校管理&#xff1a; 对游戏体验的提升 增强交互性&#xff1a;玩家能够通…

SpringBoot+vue+SSE+Nginx实现消息实时推送

一、背景 项目中消息推送&#xff0c;简单的有短轮询、长轮询&#xff0c;还有SSE&#xff08;Server-Sent Events&#xff09;、以及最强大复杂的WebSocket。 至于技术选型&#xff0c;SSE和WebSocket区别&#xff0c;网上有很多&#xff0c;我也不整理了&#xff0c;大佬的链…

软件测试的分类

为什么要软件测试分类呢&#xff1f; 软件测试是软件生命周期中的一个重要的环节&#xff0c;基本伴随着软件整个生命周期&#xff0c;对软件测试分类后&#xff0c;我们可以根据软件生命不同阶段&#xff0c;进行对应的测试&#xff0c;这样就有助于我们条理分明&#xff0c;…

<PLC><socket><西门子>基于西门子S7-1200PLC,实现手机与PLC通讯(通过websocket转接)

前言 本系列是关于PLC相关的博文,包括PLC编程、PLC与上位机通讯、PLC与下位驱动、仪器仪表等通讯、PLC指令解析等相关内容。 PLC品牌包括但不限于西门子、三菱等国外品牌,汇川、信捷等国内品牌。 除了PLC为主要内容外,PLC相关元器件如触摸屏(HMI)、交换机等工控产品,如…

实现一个免费可用的文生图的MCP Server

概述 文生图模型为使用 Cloudflare Worker AI 部署 Flux 模型&#xff0c;是参照视频https://www.bilibili.com/video/BV1UbkcYcE24/?spm_id_from333.337.search-card.all.click&vd_source9ca2da6b1848bc903db417c336f9cb6b的复现Cursor MCP Server实现是参照文章https:/…

Windows安装Miniconda

Windows安装miniconda 下载安装常用命令配置powershellVSCode配置虚拟环境 下载 进入官网 https://www.anaconda.com/download/success 下载windows版本的miniconda Miniconda3-latest-Windows-x86_64.exe 安装 一直点击下一步&#xff0c;可以选择安装路径 配置环境变量…

华为OD机试真题——二叉树中序遍历(2025A卷:200分)Java/python/JavaScript/C++/C语言/GO六种最佳实现

2025 A卷 200分 题型 本文涵盖详细的问题分析、解题思路、代码实现、代码详解、测试用例以及综合分析; 并提供Java、python、JavaScript、C++、C语言、GO六种语言的最佳实现方式! 2025华为OD真题目录+全流程解析/备考攻略/经验分享 华为OD机试真题《二叉树中序遍历》: 目录 …

现代密码学 | 高级加密标准(AES)

接下来我们将讨论目前大多数计算机和硬件基础设施所使用的最重要的加密算法&#xff0c;例如高级加密标准&#xff08;AES&#xff09;、里弗斯特-沙米尔-阿德曼算法&#xff08;RSA&#xff09;、椭圆曲线加密&#xff08;ECC&#xff09;、基于格的加密、&#xff08;环&…

cocos creator资源管理器,资源动态加载和释放

cocos 2.4.11版本 cocos 动态加载的资源需要自己增加引用和减少引用计数 cc.Asset.addRef 和 cc.Asset.decRef 注意&#xff1a; 1.使用当前代码管理资源&#xff0c;要区分项目中的静态资源和动态资源&#xff0c;静态资源就是预制体或者场景中的资源&#xff0c;代码中动态…

认识scratch,scratch是什么,如何使用

scratch是图形编程&#xff0c;将编程简化为积木的堆叠和嵌套&#xff0c;无需手写代码&#xff0c;只需清晰的逻辑即可完成自己的代码设计。通过它可以制作简单的小游戏等。 如图所示&#xff0c;这个就是scratch打开的界面&#xff0c;整个界面分为左中右三个部分&#xff0c…

HarmonyOS实战:腾讯IM之聊天详情页面搭建(二)

前言 鸿蒙版本腾讯 IM 的聊天功能十分复杂&#xff0c;需要开发者手动实现整个聊天对话的业务代码&#xff0c;这对开发者来说是个不小的挑战。本篇文章先从最基础的聊天对话列表开始教你一步一步实现完整的聊天功能&#xff0c;建议点赞收藏&#xff01; 实现效果 先看本文…

IM系统的负载均衡

1.IM场景的负载均衡 2.方案总览 SDK层想要连接一个TCP网关或者WebSocket网关的方案 SDK单地址:在SDK中写死某个网关的IP或者域名,缺点是更换地址需要重新打包SDK SDK多地址:防止某一个地址嗝屁了写上多个地址用足保持高可用 暴露接口给客户端:SDK层访问接口动态获得地址 注…

动态规划之网格图模型(一)

文章目录 动态规划之网格图模型&#xff08;一&#xff09;LeetCode 64. 最小路径和思路Golang 代码 LeetCode 62. 不同路径思路Golang 代码 LeetCode 63. 不同路径 II思路Golang 代码 LeetCode 120. 三角形最小路径和思路Golang 代码 LeetCode 3393. 统计异或值为给定值的路径…

血糖监测仪解决方案推荐芯片-NRF52832/HS6621/OM6626

随着糖尿病患者数量的增加和人们健康意识的提升&#xff0c;血糖监测仪成为了日常健康管理的重要设备。市场对便携、智能且易于使用的血糖监测仪需求持续增长&#xff0c;而无线通信技术&#xff0c;尤其是蓝牙技术&#xff0c;已成为现代血糖监测仪的核心组件&#xff0c;提供…

基于Vite的前端自动化部署方案

&#x1f468; 作者简介&#xff1a;大家好&#xff0c;我是Taro&#xff0c;全栈领域创作者 ✒️ 个人主页&#xff1a;唐璜Taro &#x1f680; 支持我&#xff1a;点赞&#x1f44d;&#x1f4dd; 评论 ⭐️收藏 文章目录 前言一、主流解决方案二、了解SCP概念三、自动化部署…

PlankAssembly 笔记 DeepWiki 正交视图三维重建

manycore-research/PlankAssembly | DeepWiki PlankAssembly项目原理 这个项目是一个基于深度学习的3D重建系统&#xff0c;其核心原理是从三个正交视图的工程图纸中重建出3D形状的结构化程序表示。 核心技术原理 1. 问题定义 PlankAssembly旨在从三个正交视图的工程图纸中…

MQTT协议,EMQX部署,MQTTX安装学习

一、MQTT概述 1.什么是MQTT MQTT是一种基于“发布订阅“”模式的消息传输协议。 消息&#xff1a;设备和设备之间传输的数据&#xff0c;或者服务和服务之间要传输的数据。 协议&#xff1a;传输数据时所遵循的规范。 2.常见的通讯模式 &#xff08;1&#xff09;客户端-服…

多模态大语言模型arxiv论文略读(101)

ML-Mamba: Efficient Multi-Modal Large Language Model Utilizing Mamba-2 ➡️ 论文标题&#xff1a;ML-Mamba: Efficient Multi-Modal Large Language Model Utilizing Mamba-2 ➡️ 论文作者&#xff1a;Wenjun Huang, Jiakai Pan, Jiahao Tang, Yanyu Ding, Yifei Xing, …

论文阅读:ADVWEB : CONTROLLABLE BLACK-BOX ATTACKS ON VLM-POWERED WEB AGENTS

原文&#xff1a;2410.17401 源码&#xff1a;https://ai-secure.github.io/AdvWeb/ 摘要&#xff1a; 本文设计了一种专门针对web agent的黑盒攻击框架&#xff0c;通过训练一个对抗性提示生成模型&#xff0c;在网页中自动生成并注入“隐形”对抗性字符串&#xff0c;引导网…

Wireshark 在 macOS 上使用及问题解决

wireshark概述 Wireshark 是被广泛使用的免费开源网络协议分析软件&#xff08;network protocol analyzer&#xff09;或网络数据包分析工具&#xff0c;它可以让你在微观层面上查看网络上发生的事情。它的主要功能是截取网络数据包&#xff0c;并尽可能详细地展示网络数据包…