<< ..

How to use Pymongo

发布时间:

This document is not completed and will be updated anytime.

使用pymongo写mongodb的查询脚本可享诸多优点,pymongo可以同时操作多个库, 方便的操作文档的数据结构。

引入pymongo

>>> import pymongo

MongoClient

使用mongoClient与服务器建立连接。

>>> from pymongo import MongoClient
>>> client = MongoClient('localhost', 27017)
# 或者也可以酱紫
>>> client = MongoClient('mongodb://localhost:27017/')

连接到DB

>>> db = client.test_database
# 为了避免一些数据库命名带有符号,推荐使用这种方法来连接:
>>> db = client['test-users']

Collection

紧接着上面,连接到数据库后,选中其中的一个collection:

>>> collection = db['users-collection']

即便是users-collection这个collection不存在,pymongo也会连接成功。当第一条记录insert进去之后,这个表会自动创建。

Documents操作

>>> import datetime
>>> post = {"author":"Mike",
>>> 		  "text":"First Blog Post",
>>> 		  "tags":["mongodb","python","pymongo"],
>>> 		  "date": datetime.datetime.utcnow()
>>> 	    }

上述代码中,日期处理使用的是datetime,pymongo会将datetime的数据类型处理为mongo的时间数据类型。

插入操作:

>>> posts = db.posts
>>> post_id = posts.insert_one(post).inserted_id
>>> post_id
ObjectId('...')

获取一条记录find_one():

>>> posts.find_one()
{u'date': datetime.datetime(...), u'text': u'My first blog post!', u'_id': ObjectId('...'), u'author': u'Mike...

Querying By ObjectId

如果查询中,需要用到ObjectId,直接手工拼一个Object的string是不行的,需要用到object包,转换成真正的Object数据类型。用法简单:

	from bson.objectid import ObjectId
	def get(post_id):
		document = client.db.collection.find_one({"_id":"ObjectId(post_id)"})

批量插入操作

数据迁移必备 —— insert_many(), 增量式批量插入。

>>> new_posts = [{"author": "Mike",
...               "text": "Another post!",
...               "tags": ["bulk", "insert"],
...               "date": datetime.datetime(2009, 11, 12, 11, 14)},
...              {"author": "Eliot",
...               "title": "MongoDB is fun",
...               "text": "and pretty easy too!",
...               "date": datetime.datetime(2009, 11, 10, 10, 45)}]
>>> result = posts.insert_many(new_posts)
>>> result.inserted_ids
[ObjectId('...'), ObjectId('...')]

遍历查询

pymongo的find()结合for...in...语法:

>>> for post in posts.find({"author": "Mike"}):
...   post
...
{u'date': datetime.datetime(...), u'text': u'My first blog post!', u'_id': ObjectId('...'), u'author': u'Mike', u'tags': [u'mongodb', u'python', u'pymongo']}
{u'date': datetime.datetime(2009, 11, 12, 11, 14), u'text': u'Another post!', u'_id': ObjectId('...'), u'author': u'Mike', u'tags': [u'bulk', u'insert']}

结合pipe语法遍历:

>>> d = datetime.datetime(2009, 11, 12, 12)
>>> for post in posts.find({"date": {"$lt": d}}).sort("author"):
...   print post
...
{u'date': datetime.datetime(2009, 11, 10, 10, 45), u'text': u'and pretty easy too!', u'_id': ObjectId('...'), u'author': u'Eliot', u'title': u'MongoDB is fun'}
{u'date': datetime.datetime(2009, 11, 12, 11, 14), u'text': u'Another post!', u'_id': ObjectId('...'), u'author': u'Mike', u'tags': [u'bulk', u'insert']}

Counting

PV、UV什么的,难免要count()一下:

>>> posts.count()
3
>>> posts.find({"truthOfLife": 42}).count()
1

Aggregation

有了聚合,基本可以宣布就是在写mongo查询语句了。 我们一般写个pipe = [][]里包着查询条件。

As python dictionaries don’t maintain order you should use SON or collections.OrderedDict where explicit ordering is required eg “$sort”:

>>> from bson.son import SON
>>> pipeline = [
...     {"$unwind": "$tags"},
...     {"$group": {"_id": "$tags", "count": {"$sum": 1}}},
...     {"$sort": SON([("count", -1), ("_id", -1)])}
... ]
>>> list(db.things.aggregate(pipeline))
[{u'count': 3, u'_id': u'cat'}, {u'count': 2, u'_id': u'dog'}, {u'count': 1, u'_id': u'mouse'}]

综合示例

以下是一个非常典型的查询脚本,了解上述基本用法后,很轻易的就能上手pymongo了。

$ vim countH5PagePv.py

# _*_ coding: utf8 _*_  声明编码,否则遇到汉字会报错

# 一堆包调用
from pymongo import MongoClient
from bson.objectid import ObjectId
import datetime

# 链接到数据库和表
# connect to db
dbClient = MongoClient('mongodb://localhost:27017')
db = dbClient['yangcong25']

# open collections
users = db['users']

# 定义了一个日期区间
startDate = datetime.datetime(2015, 10, 1)
endDate   = datetime.datetime(2015, 10, 2)

# 一个典型的aggregation函数调用,最终返回的是list()
def pvOfPage(startDate, endDate):
    pipeLine = [
        {"$match": {
            "createdBy": {
                "$gte": startDate,
                "$lte": endDate
            },
           "event": "enterSite"
        }}
    ]
    return list(users.aggregate(pipeLine))

pvCountFromH5 = pvOfH5Page(startDate, endDate)

# 用len()计算返回list的长度作为PV值
print("PV of Page : ")
print(len(pvCountFromH5))