做游戏都需要什么网站吗,好用的网站模板,互联网保险公司十大排名,网站设计一般是什么专业本文档演示如何从Milvus将Collection数据全量导出#xff0c;并适配迁移至DashVector。方案的主要流程包括#xff1a;
首先#xff0c;升级Milvus版本#xff0c;目前Milvus只有在最新版本(v.2.3.x)中支持全量导出其次#xff0c;将Milvus Collection的Schema信息和数据…本文档演示如何从Milvus将Collection数据全量导出并适配迁移至DashVector。方案的主要流程包括
首先升级Milvus版本目前Milvus只有在最新版本(v.2.3.x)中支持全量导出其次将Milvus Collection的Schema信息和数据信息导出到具体的文件中最后以导出的文件作为输入来构建DashVector Collection并数据导入
下面将详细阐述迁移方案的具体操作细节。 1. Milvus升级2.3.x版本
本文中我们将借助Milvus的query_iterator来全量导出数据query接口无法导出完整数据由于该接口目前只在v2.3.x版本中支持所以在导出数据前需要先将Milvus版本升级到该版本。Milvus版本升级的详细操作参考Milvus用户文档。
注意在进行Milvus Upgrade时需要注意数据的备份安全问题。
2. Milvus全量数据导出
数据的导出包含Schema以及数据记录Schema主要用于完备地定义Collection数据记录对应于每个Partition下的全量数据这两部分涵盖了需要导出的全部数据。下文展示如何将单个Milvus Collection全量导出。
2.1. Schema导出
DashVector和Milvus在Schema的设计上有一些区别DashVector向用户透出的接口非常简单Milvus则更加详尽。从Milvus迁移DashVector时会涉及到部分Schema参数的删除例如Collection的index_param参数只会保留DashVector构建Collection的必要参数以下为一个Schema转换的简单示例其中Collection已有的数据参考Milvus示例代码写入。
python示例
from pymilvus import (connections,utility,Collection,DataType
)
import os
import json
from pathlib import Pathfmt \n {:30} \nprint(fmt.format(start connecting to Milvus))
host os.environ.get(MILVUS_HOST, localhost)
print(fmt.format(fMilvus host: {host}))
connections.connect(default, hosthost, port19530)metrics_map {COSINE: cosine,L2: euclidean,IP: dotproduct,
}dtype_map {DataType.BOOL: bool,DataType.INT8: int,DataType.INT16: int,DataType.INT32: int,DataType.INT64: int,DataType.FLOAT: float,DataType.DOUBLE: float,DataType.STRING: str,DataType.VARCHAR: str,
}def load_collection(collection_name: str) - Collection:has utility.has_collection(collection_name)print(fDoes collection hello_milvus exist in Milvus: {has})if not has:return Nonecollection Collection(collection_name) collection.load()return collectiondef export_collection_schema(collection, file: str):schema collection.schema.to_dict()index collection.indexes[0].to_dict()export_schema dict()milvus_metric_type index[index_param][metric_type]try:export_schema[metrics] metrics_map[milvus_metric_type]except:raise Exception(fmilvus metrics_type{milvus_metric_type} not supported)export_schema[fields_schema] {}for field in schema[fields]:if is_primary in field and field[is_primary]:continueif field[name] index[field]:# vectorif field[type] DataType.FLOAT_VECTOR:export_schema[dtype] floatexport_schema[dimension] field[params][dim]else:raise Exception(fmilvus dtype{field[type]} not supported yet)else:try:# non-vectorexport_schema[fields_schema][field[name]] dtype_map[field[type]]except:raise Exception(fmilvus dtype{field[type]} not supported yet)with open(file, w) as file:json.dump(export_schema, file, indent4) if __name__ __main__:collection_name YOUR_MILVUS_COLLECTION_NAMEcollection load_collection(collection_name)dump_path_str collection_name.dumpdump_path Path(dump_path_str)dump_path.mkdir(parentsTrue, exist_okTrue)schema_file dump_path_str /schema.jsonexport_collection_schema(collection, schema_file)
JSON示例
{metrics: euclidean,fields_schema: {random: float,var: str},dtype: float,dimension: 8
}
2.2. Data导出
DashVector和Milvus在设计上都有Partition的概念所以向量以及其他数据进行导出时需要注意按照Partition粒度进行导出。此外DashVector的主键类型为str而Milvus设计其为自定义类型所以在导出时需要考虑主键类型的转换。以下为一个基于query_iterator接口导出的简单代码示例
from pymilvus import (connections,utility,Collection,DataType
)
import os
import json
import numpy as np
from pathlib import Pathfmt \n {:30} \nprint(fmt.format(start connecting to Milvus))
host os.environ.get(MILVUS_HOST, localhost)
print(fmt.format(fMilvus host: {host}))
connections.connect(default, hosthost, port19530)
pk pk
vector_field_name vectordef load_collection(collection_name: str) - Collection:has utility.has_collection(collection_name)print(fDoes collection hello_milvus exist in Milvus: {has})if not has:return Nonecollection Collection(collection_name) collection.load()return collectiondef export_partition_data(collection, partition_name, file: str):batch_size 10output_fields[pk, random, var, embeddings]query_iter collection.query_iterator(batch_sizebatch_size,output_fields output_fields,partition_names[partition_name])export_file open(file, w)while True:docs query_iter.next()if len(docs) 0:# close the iteratorquery_iter.close()breakfor doc in docs:new_doc {}new_doc_fields {}for k, v in doc.items():if k pk:# primary keynew_doc[pk] str(v)elif k vector_field_name:new_doc[vector] [float(k) for k in v]else:new_doc_fields[k] vnew_doc[fields] new_doc_fieldsjson.dump(new_doc, export_file)export_file.write(\n)export_file.close()if __name__ __main__:collection_name YOUR_MILVUS_COLLECTION_NAMEcollection load_collection(collection_name)pk collection.schema.primary_field.namevector_field_name collection.indexes[0].field_namedump_path_str collection_name.dumpdump_path Path(dump_path_str)dump_path.mkdir(parentsTrue, exist_okTrue)for partition in collection.partitions:partition_name partition.nameif partition_name _default:export_path dump_path_str /default.txtelse:export_path dump_path_str / partition_name .txtexport_partition_data(collection, partition_name, export_path)
3. 将数据导入DashVector
3.1. 创建Cluster
参考DashVector官方用户手册构建Cluster。
3.2. 创建Collection
根据2.1章节中导出的Schema信息以及参考Dashvector官方用户手册来创建Collection。下面的示例代码会根据2.1章节中导出的schema.json来创建一个DashVector的Collection。
from dashvector import Client, DashVectorExceptionfrom pydantic import BaseModel
from typing import Dict, Type
import jsondtype_convert {int: int,float: float,bool: bool,str: str
}class Schema(BaseModel):metrics: strdtype: Typedimension: intfields_schema: Dict[str, Type]classmethoddef from_dict(cls, json_data):metrics json_data[metrics]dtype dtype_convert[json_data[dtype]]dimension json_data[dimension]fields_schema {k: dtype_convert[v] for k, v in json_data[fields_schema].items()}return cls(metricsmetrics, dtypedtype, dimensiondimension, fields_schemafields_schema)def read_schema(schema_path) - Schema:with open(schema_path) as file:json_data json.loads(file.read())return Schema.from_dict(json_data)if __name__ __main__:milvus_dump_path f{YOUR_MILVUS_COLLECTION_NAME}.dumpmilvus_dump_scheme_path milvus_dump_path /schema.jsonschema read_schema(milvus_dump_scheme_path)client dashvector.Client(api_keyYOUR_API_KEY,endpointYOUR_CLUSTER_ENDPOINT)# create collectionrsp client.create(nameYOUR_DASHVECTOR_COLLECTION_NAME, dimensionschema.dimension, metricschema.metrics, dtypeschema.dtype,fields_schemaschema.fields_schema)if not rsp:raise DashVectorException(rsp.code, reasonrsp.message)
3.3. 导入Data
根据2.2章节中导出的数据以及参考DashVector官方用户手册来批量插入Doc。下面的示例代码会依次解析各个Partition导出的数据然后依次创建DashVector下的Partition并导入数据。
from dashvector import Client, DashVectorException, Docfrom pydantic import BaseModel
from typing import Dict, Type
import json
import glob
from pathlib import Pathdef insert_data(collection, partition_name, partition_file):if partition_name ! default:rsp collection.create_partition(partition_name)if not rsp:raise DashVectorException(rsp.code, reasonrsp.message)with open(partition_file) as f:for line in f:if line.strip():json_data json.loads(line)rsp collection.insert([Doc(idjson_data[pk], vectorjson_data[vector], fieldsjson_data[fields])])if not rsp:raise DashVectorException(rsp.code, reasonrsp.message) if __name__ __main__:milvus_dump_path f{YOUR_MILVUS_COLLECTION_NAME}.dumpclient dashvector.Client(api_keyYOUR_API_KEY,endpointYOUR_CLUSTER_ENDPOINT)# create collectioncollection client.get(YOUR_DASHVECTOR_COLLECTION_NAME)partition_files glob.glob(milvus_dump_path/*.txt, recursiveFalse)for partition_file in partition_files:# create partitionpartition_name Path(partition_file).steminsert_data(collection, partition_name, partition_file)