# 整合官方和常用示例
# https://api.mongodb.com/python/current/py-modindex.html
from pymongo import MongoClient
from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne
from bson.objectid import ObjectId
from bson.son import SON
from bson import json_util, CodecOptions
import datetime
from pprint import pprint
import pymongo
from bson.code import Code
import urllib.parse
import ssl
from pymongo import errors
from pymongo import WriteConcern
import pytz
import gridfs
import multiprocessing
client = MongoClient(host="", port=27017)
all_databases = client.list_database_names()
# using dictionary style access
db = client["AdminConfigDB"]
all_collections = db.collection_names()
# using nomal style
collection = db.arc_AdminConf
# pprint(collection.find_one({}))
# for collection in collection.find({"flush":False}).sort("productId"):
# pprint(collection)
# 以product_id升序创建索引
# create_index = collection.create_index([('product_id', pymongo.ASCENDING)], unique=True)
# 打印集合索引信息
# pprint(sorted(list(collection.index_information())))
db2 = client.TestData
collection2 = db2.things
# result = collection2.insert_many([{"x": 1, "tags": ["dog", "cat"]},
# {"x": 2, "tags": ["cat"]},
# {"x": 2, "tags": ["mouse", "cat", "dog"]},
# {"x": 3, "tags": []}])
# pprint(result.inserted_ids)
# Aggregation Framework示例
# pipeline = [
# {"$unwind": "$tags"},
# {"$group": {"_id": "$tags", "count": {"$sum": 1}}},
# {"$sort": SON([("count", -1), ("_id", -1)])}]
# pprint(list(collection2.aggregate(pipeline)))
# Map/Reduce示例
# mapper = Code(
# """
# function () {
# this.tags.forEach(function(z) {
# emit(z, 1);
# });
# }
# """
# )
# reducer = Code(
# """
# function (key, values) {
# var total = 0;
# for (var i = 0; i < values.length; i++) {
# total += values[i];
# }
# return total;
# }
# """
# )
# result = collection2.map_reduce(mapper, reducer, "map_reduce_result")
# for doc in result.find():
# pprint(doc)
# results = collection2.map_reduce(
# mapper, reducer, "myresults", query={"x": {"$lt": 2}})
# for doc in results.find():
# pprint(doc)
# 认证示例
# username = urllib.parse.quote_plus('user')
# password = urllib.parse.quote_plus('pass/word')
# client = MongoClient('mongodb://%s:%s@' % (username, password))
# version3.7支持SCRAM-SHA-256
# client = MongoClient('example.com',
# username='user',
# password='password',
# authSource='the_database',
# authMechanism='SCRAM-SHA-256')
# mongodb uri连接方式
# uri = "mongodb://user:password@example.com/?authSource=the_database&authMechanism=SCRAM-SHA-256"
# client = MongoClient(uri)
# mongodb-x509认证
# client = MongoClient('example.com',
# username="<X.509 derived username>",
# authMechanism="MONGODB-X509",
# ssl=True,
# ssl_certfile='/path/to/client.pem',
# ssl_cert_reqs=ssl.CERT_REQUIRED,
# ssl_ca_certs='/path/to/ca.pem')
# 复制一个数据库
# client.admin.command('copydb', fromdb='src_db_name', todb='dst_db_name', fromhost='src_host_ip')
# 批量插入
# _id对于大多数高写入量的应用程序而言,对于插入的文档本身没有_id字段时,在插入时自动创建代价较高。inserted_ids表示按提供_id的顺序插入文档
# collection2.insert_many([{'x': i} for i in range(10000)]).inserted_ids
# print(collection2.count_documents({}))
# 批量删除
# collection2.delete_many({'x':{"$gte": 3}})
# bulk write,混合批量写入
# 添加write_concern 写关注
# collection2 = db2.get_collection('things', write_concern=WriteConcern(w=2, wtimeout=10))
# try:
# result = collection2.bulk_write([
# DeleteMany({}), # Remove all documents from the previous example.
# InsertOne({'_id': 1}),
# InsertOne({'_id': 2}),
# InsertOne({'_id': 3}),
# UpdateOne({'_id': 1}, {'$set': {'foo': 'bar'}}),
# UpdateOne({'_id': 4}, {'$inc': {'j': 1}}, upsert=True),
# ReplaceOne({'j': 1}, {'j': 2})])
# except errors.BulkWriteError as bwe:
# pprint(bwe.details)
# pprint(result.bulk_api_result)
# 日期时间和时区(mongodb默认假定时间以UTC)
# result = db2.objects.insert_one({"last_modified": datetime.datetime.utcnow()})
# tz_aware选项,该选项启用“感知” datetime.datetime对象.即知道其所在时区的日期时间
# result = db2.demo.insert_one( {'date': datetime.datetime(2019, 11, 28, 14, 0, 0)})
# db2.demo.find_one()['date']
# datetime.datetime(2019, 11, 28, 14, 0)
# options = CodecOptions(tz_aware=True)
# db2.get_collection('demo', codec_options=options).find_one()['date']
# 使用时区保存日期时间
# 存储datetime.datetime指定时区的对象,即tzinfo属性不是None时,PyMongo会将这些日期时间自动转换为UTC
# pacific = pytz.timezone('Asia/Shanghai')
# aware_datetime = pacific.localize( datetime.datetime(2019, 11, 28, 14, 0, 0))
# result = db2.demo.insert_one({"date_tz": aware_datetime})
# datetime.datetime(2019, 11, 28, 14, 0)
# 地理空间索引示例
# https://api.mongodb.com/python/current/examples/geo.html
# GridFS示例
# 每个GridFS实例都是使用特定Database实例创建的,并将在特定实例上运行
# db = MongoClient().gridfs_example
# fs = gridfs.GridFS(db)
# 将数据写入gridfs,put()在GridFS中创建一个新文件,并返回文件文档"_id"密钥的值
# data = fs.put(b"hello world")
# get()方法取回文件内容,get()返回类似文件对象,调用read()方法获取文件内容
# content = fs.get(data).read()
# 除了将str作为GridFS文件放置外,还可以放置任何类似文件的对象(带有read() 方法的对象)。GridFS将自动处理按块大小的段读取文件。还可以将其他属性作为关键字参数添加到文件中
# b = fs.put(fs.get(a), filename="foo", bar="baz")
# out = fs.get(b)
# out.read()
# out.filename
# out.bar
# out.upload_date
# 可拖尾游标,客户端用尽游标中所有结果后自动关闭游标,但对于上限集合(copped集合)可以使用可拖尾的游标
# https://api.mongodb.com/python/current/examples/tailable.html
# 自定义类型
* TypeEncoder:将其子类化以定义将自定义Python类型编码为已知BSON类型的编解码器。用户必须实现 python_type属性/属性和transform_python方法。
* TypeDecoder:将其子类化以定义将特定BSON类型解码为自定义Python类型的编解码器。用户必须实现bson_type属性/属性和transform_bson方法。
* TypeCodec:此方法的子类以定义可以对自定义类型进行编码和解码的编解码器。用户必须实现 python_type和bson_type属性/属性以及 transform_python和transform_bson方法。
自定义类型的类型编解码器仅需要定义如何将 Decimal实例转换为 Decimal128实例,反之亦然
from bson.decimal128 import Decimal128
from bson.codec_options import TypeCodec
class DecimalCodec(TypeCodec):
python_type = Decimal # the Python type acted upon by this type codec
bson_type = Decimal128 # the BSON type acted upon by this type codec
def transform_python(self, value):
# Function that transforms a custom type value into a type that BSON can encode
return Decimal128(value)
def transform_bson(self, value):
# Function that transforms a vanilla BSON type value into our custom type
return value.to_decimal()
decimal_codec = DecimalCodec()
# 开始对自定义类型对象进行编码和解码之前,我们必须首先将相应的编解码器告知PyMongo。这是通过创建一个TypeRegistry实例来完成
# 以使用任意数量的类型编解码器实例化类型注册表。一旦实例化,注册表是不可变的,将编解码器添加到注册表的唯一方法是创建一个新的注册表
from bson.codec_options import TypeRegistry
type_registry = TypeRegistry([decimal_codec])
# 使用CodecOptions实例定义一个实例,type_registry并使用它来获取一个Collection理解Decimal数据类型的对象
# mongodb跨数据库查询、跨表(集合)、跨服务器查询都可根据以下方式修改
# 查询data下product集合以条件gaId为1不重复的paId
# 使用此paId作为查询pa下pa_info集合以条件pa_id等于paId且v为1的文档
# data = client.data
# product = data.product
# pa = client.pa
# pa_info = pa.pa_info
# pipeline = [
# {"$match": {"gaId": 1}},
# {"$sort": {"paId": -1}},
# {"$group": {"_id": "$paId"}},
# {"$project": {"paId": 1.0}},
# ]
# cursor = pa_info.aggregate(pipeline, allowDiskUse=False)
# try:
# for doc in cursor:
# doc_value = doc['_id']
# pa_result = pa_info.find({"pa_id": doc_value, "v":1})
# for pa_doc in pa_result:
# # 查询到的结果写入到其他集合
# result_insert = collection2.insert_many([pa_doc])
# # pass
# finally:
# client.close()
# 父进程和每个子进程必须创建自己的MongoClient实例
# Each process creates its own instance of MongoClient.
# def func():
# db = pymongo.MongoClient().mydb
# # Do something with db.
# proc = multiprocessing.Process(target=func)
# proc.start()