执行增量 Map-减少
在本页面
Map-reduce 操作可以处理复杂的聚合任务。为了执行 map-reduce 操作,MongoDB 提供了mapReduce命令,并在mongo shell 中提供了db.collection.mapReduce() wrapper 方法。
如果 map-reduce 数据集不断增长,则可能需要执行增量 map-reduce 而不是每次都对整个数据集执行 map-reduce 操作。
要执行增量 Map 减少:
-
在当前集合上运行 map-reduce 作业,然后将结果输出到单独的集合。
-
当您有更多数据要处理时,请使用以下命令运行后续的 map-reduce 作业:
-
query
参数,该参数指定仅与新文档匹配的条件。 -
out
参数,该参数指定reduce
动作以将新结果合并到现有输出集合中。
请考虑以下示例,在该示例中,您计划在每天结束时运行的sessions
集合上进行 map-reduce 操作。
Data Setup
sessions
集合包含每天记录用户会话的文档,例如:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );
db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );
当前集合的初始 Map-Reduce
运行第一个 map-reduce 操作,如下所示:
- 定义将
userid
Map 到包含字段userid
,total_time
,count
和avg_time
的对象的 Map 函数:
var mapFunction = function() {
var key = this.userid;
var value = {
userid: this.userid,
total_time: this.length,
count: 1,
avg_time: 0
};
emit( key, value );
};
- 使用两个参数
key
和values
定义相应的 reduce 函数,以计算总时间和计数。key
对应于userid
,而values
是一个数组,其元素对应于mapFunction
中 Map 到userid
的各个对象。
var reduceFunction = function(key, values) {
var reducedObject = {
userid: key,
total_time: 0,
count:0,
avg_time:0
};
values.forEach( function(value) {
reducedObject.total_time += value.total_time;
reducedObject.count += value.count;
}
);
return reducedObject;
};
- 使用两个参数
key
和reducedValue
定义 finalize 函数。该函数修改reducedValue
文档以添加另一个字段average
并返回修改后的文档。
var finalizeFunction = function (key, reducedValue) {
if (reducedValue.count > 0)
reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
return reducedValue;
};
- 使用
mapFunction
,reduceFunction
和finalizeFunction
函数对session
集合执行 map-reduce。将结果输出到集合session_stat
。如果session_stat
集合已经存在,则该操作将替换内容:
db.sessions.mapReduce( mapFunction,
reduceFunction,
{
out: "session_stat",
finalize: finalizeFunction
}
)
后续增量 Map-Reduce
以后,随着sessions
集合的增长,您可以运行其他 map-reduce 操作。例如,将新文档添加到sessions
集合中:
db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );
在一天结束时,对sessions
集合执行增量 map-reduce,但使用query
字段仅选择新文档。将结果输出到集合session_stat
,但将reduce
的内容与增量 map-reduce 的结果一起输出:
db.sessions.mapReduce( mapFunction,
reduceFunction,
{
query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
out: { reduce: "session_stat" },
finalize: finalizeFunction
}
);