Milvus Vector是一个矢量数据库,我们也可以使用其他矢量数据库,如果使用其他库替换的话,有许多步骤完全相同并且改动并不复杂。


Cornel University已将整个Arxiv语料库上传到Kaggle,并根据CC0:公共领域许可证获得许可。我们可以使用Kaggle API直接下载数据集。


  1. # Create the necessary directories
  2. mkdir -p semantic_similarity/notebooks semantic_similarity/data semantic_similarity/milvus
  3. # CD into the data directory
  4. cd semantic_similarity/data
  5. # Create and activate a conda environment
  6. conda create -n semantic_similarity python=3.9
  7. conda activate semantic_similarity
  8. ## Create Virtual Environment using venv if not using conda
  9. # python -m venv semantic_similarity
  10. # source semantic_similarity/bin/activate
  11. # Pip install the necessary libraries
  12. pip install jupyterlab kaggle matplotlib scikit-learn tqdm ipywidgets
  13. pip install "dask[complete]" sentence-transformers
  14. pip install pandas pyarrow pymilvus protobuf==3.20.0
  15. # Download data using the kaggle API
  16. kaggle datasets download -d Cornell-University/arxiv
  17. # Unzip the data into the local directory
  18. unzip arxiv.zip
  19. # Delete the Zip file
  20. rm arxiv.zip


我们从Kaggle下载的数据是一个3.3GB JSON文件,其中包含大约200万篇论文!为了有效地处理如此大的数据集,使用PANDA将整个数据集加载到内存中并不是一个好主意。为了处理这样大的数据,我们选择使用DASK将数据分为多个分区,并且仅将一些需要处理的分区加载到内存中。


Dask是一个开源库,可以让我们使用类似于PANDA的API进行并行计算。通过运行“ pip install dask[complete]”在本地计算机上进行安装。安装完成后要导入必要的库。

  1. import dask.bag as db
  2. import json
  3. from datetime import datetime
  4. import time
  5. data_path = '../data/arxiv-metadata-oai-snapshot.json'

我们将使用两个有效地处理大型ARXIV JSON文件的DASK的组件。

  • Dask Bag:使我们可以将JSON文件加载到固定大小的块中,并在每行数据上运行一些预处理功能

步骤1:将JSON文件加载到Dask Bag中

将JSON文件加载到一个Dask Bag中,每个块的大小为10MB。可以调整blocksize参数,控制每个块的大小。然后使用.map()函数将JSON.LOADS函数应用于Dask Bag的每一行,将JSON字符串解析为Python字典。

  1. # Read the file in blocks of 10MB and parse the JSON.
  2. papers_db = db.read_text(data_path, blocksize="10MB").map(json.loads)
  3. # Print the first row
  4. papers_db.take(1)




text_col():此函数是使用“ [sep]”令牌组合“标题”和“摘要”字段,以便我们可以将这些文本发送到SPECTRE embedding模型中。


  1. def v1_date(row):
  2. """
  3. For each row in the dask bag,
  4. find the date of the first version of the paper
  5. and add it to the row as a new column
  6. Args:
  7. row: a row of the dask bag
  8. Returns:
  9. A row of the dask bag with added "unix_time" column
  10. """
  11. versions = row["versions"]
  12. date = None
  13. for version in versions:
  14. if version["version"] == "v1":
  15. date = datetime.strptime(version["created"], "%a, %d %b %Y %H:%M:%S %Z")
  16. date = int(time.mktime(date.timetuple()))
  17. row["unix_time"] = date
  18. return row
  19. def text_col(row):
  20. """
  21. It takes a row of a dataframe, adds a new column called 'text'
  22. that is the concatenation of the 'title' and 'abstract' columns
  23. Args:
  24. row: the row of the dataframe
  25. Returns:
  26. A row with the text column added.
  27. """
  28. row["text"] = row["title"] + "[SEP]" + row["abstract"]
  29. return row
  30. def filters(row):
  31. """
  32. For each row in the dask bag, only keep the row if it meets the filter criteria
  33. Args:
  34. row: the row of the dataframe
  35. Returns:
  36. Boolean mask
  37. """
  38. return ((len(row["id"])<16) and
  39. (len(row["categories"])<200) and
  40. (len(row["title"])<4096) and
  41. (len(row["abstract"])<65535) and
  42. ("cs." in row["categories"]) # Keep only CS papers
  43. )

步骤3:在Dask Bag上运行预处理辅助函数

如下所示,我们可以使用.map()和.filter()函数在Dask Bag的每一行上运行。由于Dask支持方法链,因此我们可以仅保留一些必需的列,然后删除不需要的列。

  1. # Specify columns to keep in the final table
  2. cols_to_keep = ["id", "categories", "title", "abstract", "unix_time", "text"]
  3. # Apply the pre-processing
  4. papers_db = (
  5. papers_db.map(lambda row: v1_date(row))
  6. .map(lambda row: text_col(row))
  7. .map(
  8. lambda row: {
  9. key: value
  10. for key, value in row.items()
  11. if key in cols_to_keep
  12. }
  13. )
  14. .filter(filters)
  15. )
  16. # Print the first row
  17. papers_db.take(1)

步骤4:将Dask Bag转换为DASK DATAFRAME

数据加载的最后一步是将Dask Bag转换为DASK DATAFRAME,这样我们可以使用类似Pandas的API进行访问。

  1. # Convert the Dask Bag to a Dask Dataframe
  2. schema = {
  3. "id": str,
  4. "title": str,
  5. "categories": str,
  6. "abstract": str,
  7. "unix_time": int,
  8. "text": str,
  9. }
  10. papers_df = papers_db.to_dataframe(meta=schema)
  11. # Display first 5 rows
  12. papers_df.head()




使用Docker安装Milvus Vector数据库很简单,因此我们首先需要安装Docker。然后就是下载Docker-compose.yml并启动Docker容器,如下所示!MILVUS.IO网站提供了许多其他选择来安装Milvus单机版和Milvus群集版;如果需要在Kubernetes群集上安装或离线安装,请参考具体文档。

  1. # CD into milvus directory
  2. cd semantic_similarity/milvus
  3. # Download the Standalone version of Milvus docker compose
  4. wget https://github.com/milvus-io/milvus/releases/download/v2.1.0/milvus-standalone-docker-compose.yml -O ./docker-compose.yml
  5. # Run the Milvus server docker container on your local
  6. sudo docker-compose up -d


我们可以使用Pymilvus库与Milvus Vector数据库服务进行交互。emb_dim参数是文本转换为嵌入的维度。在SPECTRE的情况下,嵌入维度为768。

  1. # Make sure a Milvus server is already running
  2. from pymilvus import connections, utility
  3. from pymilvus import Collection, CollectionSchema, FieldSchema, DataType
  4. # Connect to Milvus server
  5. connections.connect(alias="default", host="localhost", port="19530")
  6. # Collection name
  7. collection_name = "arxiv"
  8. # Embedding size
  9. emb_dim = 768
  10. # # Check for existing collection and drop if exists
  11. # if utility.has_collection(collection_name):
  12. # print(utility.list_collections())
  13. # utility.drop_collection(collection_name)

Milvus的集合是类似于传统数据库中的表格。要创建一个集合,首先需要指定集合的模式。在本文示例中利用Milvus 2.1字符串索引和字段来存储与每篇论文相关的所有必要元数据。主键idx和其他字段categories、title、abstract是VARCHAR数据类型,而嵌入是包含emb_dim维度嵌入的FLOAT_VECTOR字段。Milvus支持多种数据类型,如下所示。

  1. # Create a schema for the collection
  2. idx = FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=16)
  3. categories = FieldSchema(name="categories", dtype=DataType.VARCHAR, max_length=200)
  4. title = FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=4096)
  5. abstract = FieldSchema(name="abstract", dtype=DataType.VARCHAR, max_length=65535)
  6. unix_time = FieldSchema(name="unix_time", dtype=DataType.INT64)
  7. embedding = FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=emb_dim)
  8. # Fields in the collection
  9. fields = [idx, categories, title, abstract, unix_time, embedding]
  10. schema = CollectionSchema(
  11. fields=fields, description="Semantic Similarity of Scientific Papers"
  12. )
  13. # Create a collection with the schema
  14. collection = Collection(
  15. name=collection_name, schema=schema, using="default", shards_num=10
  16. )



我们需要将Dask DATAFRAME中的文本转换为嵌入向量来进行语义相似度搜索。所以首先需要生成文本的嵌入。本文将使用名为SPECTRE的SBERT双编码器模型。

SPECTER : Scientific Paper Embeddings using Citation-informed TransformERs。

简单的说SPECTER 是经过论文数据进行专门训练的模型,所以在选题分类、引文预测、科学论文推荐等方面的表现优于SciBERT,这就是我们选择他的文章。

通过Sentence Transformer库,使用预先训练的SPECTRE模型非常简单。只需要一行代码就可以下载预训练的模型,我们还编写了一个简单的辅助函数,将Dask dataframe分区的整个文本列转换为嵌入。

  1. from sentence_transformers import SentenceTransformer
  2. from tqdm import tqdm
  3. # Scientific Papers SBERT Model
  4. model = SentenceTransformer('allenai-specter')
  5. def emb_gen(partition):
  6. return model.encode(partition['text']).tolist()

我们可以使用dask.map_partitions() API将嵌入生成的函数应用到分区中的每一行,然后可以使用collection.insert将数据上传到Milvus。

  1. # Initialize
  2. collection = Collection(collection_name)
  3. for partition in tqdm(range(papers_df.npartitions)):
  4. # Get the dask dataframe for the partition
  5. subset_df = papers_df.get_partition(partition)
  6. # Check if dataframe is empty
  7. if len(subset_df.index) != 0:
  8. # Metadata
  9. data = [
  10. subset_df[col].values.compute().tolist()
  11. for col in ["id", "categories", "title", "abstract", "unix_time"]
  12. ]
  13. # Embeddings
  14. data += [
  15. subset_df
  16. .map_partitions(emb_gen)
  17. .compute()[0]
  18. ]
  19. # Insert data
  20. collection.insert(data)




  1. # Add an ANN index to the collection
  2. index_params = {
  3. "metric_type": "L2",
  4. "index_type": "HNSW",
  5. "params": {"efConstruction": 128, "M": 8},
  6. }
  7. collection.create_index(field_name="embedding", index_params=index_params)



  1. collection = Collection(collection_name)
  2. collection.load()


  1. def query_and_display(query_text, collection, num_results=10):
  2. # Embed the Query Text
  3. query_emb = [model.encode(query_text)]
  4. # Search Params
  5. search_params = {"metric_type": "L2", "params": {"ef": 128}}
  6. # Search
  7. query_start = datetime.now()
  8. results = collection.search(
  9. data=query_emb,
  10. anns_field="embedding",
  11. param=search_params,
  12. limit=num_results,
  13. expr=None,
  14. output_fields=["title", "abstract"],
  15. )
  16. query_end = datetime.now()
  17. # Print Results
  18. print(f"Query Speed: {(query_end - query_start).total_seconds():.2f} s")
  19. print("Results:")
  20. for res in results[0]:
  21. title = res.entity.get("title").replace("\n ", "")
  22. print(f"➡️ ID: {res.id}. L2 Distance: {res.distance:.2f}")
  23. print(f"Title: {title}")
  24. print(f"Abstract: {res.entity.get('abstract')}")


  1. # Query for papers that are similar to the SimCSE paper
  2. title = "SimCSE: Simple Contrastive Learning of Sentence Embeddings"
  3. abstract = """This paper presents SimCSE, a simple contrastive learning framework that greatly advances state-of-the-art sentence embeddings. We first describe an unsupervised approach, which takes an input sentence and predicts itself in a contrastive objective, with only standard dropout used as noise. This simple method works surprisingly well, performing on par with previous supervised counterparts. We find that dropout acts as minimal data augmentation, and removing it leads to a representation collapse. Then, we propose a supervised approach, which incorporates annotated pairs from natural language inference datasets into our contrastive learning framework by using "entailment" pairs as positives and "contradiction" pairs as hard negatives. We evaluate SimCSE on standard semantic textual similarity (STS) tasks, and our unsupervised and supervised models using BERT base achieve an average of 76.3% and 81.6% Spearman's correlation respectively, a 4.2% and 2.2% improvement compared to the previous best results. We also show -- both theoretically and empirically -- that the contrastive learning objective regularizes pre-trained embeddings' anisotropic space to be more uniform, and it better aligns positive pairs when supervised signals are available."""
  4. query_text = f"{title}[SEP]{abstract}"
  5. query_and_display(query_text, collection, num_results=10)


  1. collection.release()




作者:Marie Stephen Leo

