执行增量 Map-减少

在本页面

Map-reduce 操作可以处理复杂的聚合任务。为了执行 map-reduce 操作,MongoDB 提供了mapReduce命令,并在mongo shell 中提供了db.collection.mapReduce() wrapper 方法。

如果 map-reduce 数据集不断增长,则可能需要执行增量 map-reduce 而不是每次都对整个数据集执行 map-reduce 操作。

要执行增量 Map 减少:

  • 在当前集合上运行 map-reduce 作业,然后将结果输出到单独的集合。

  • 当您有更多数据要处理时,请使用以下命令运行后续的 map-reduce 作业:

  • query参数,该参数指定仅与新文档匹配的条件。

  • out参数,该参数指定reduce动作以将新结果合并到现有输出集合中。

请考虑以下示例,在该示例中,您计划在每天结束时运行的sessions集合上进行 map-reduce 操作。

Data Setup

sessions集合包含每天记录用户会话的文档,例如:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 } );

db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 } );

当前集合的初始 Map-Reduce

运行第一个 map-reduce 操作,如下所示:

  • 定义将useridMap 到包含字段useridtotal_timecountavg_time的对象的 Map 函数:
var mapFunction = function() {
                      var key = this.userid;
                      var value = {
                                    userid: this.userid,
                                    total_time: this.length,
                                    count: 1,
                                    avg_time: 0
                                   };

                      emit( key, value );
                  };
  • 使用两个参数keyvalues定义相应的 reduce 函数,以计算总时间和计数。 key对应于userid,而values是一个数组,其元素对应于mapFunction中 Map 到userid的各个对象。
var reduceFunction = function(key, values) {

                        var reducedObject = {
                                              userid: key,
                                              total_time: 0,
                                              count:0,
                                              avg_time:0
                                            };

                        values.forEach( function(value) {
                                              reducedObject.total_time += value.total_time;
                                              reducedObject.count += value.count;
                                        }
                                      );
                        return reducedObject;
                     };
  • 使用两个参数keyreducedValue定义 finalize 函数。该函数修改reducedValue文档以添加另一个字段average并返回修改后的文档。
var finalizeFunction = function (key, reducedValue) {

                          if (reducedValue.count > 0)
                              reducedValue.avg_time = reducedValue.total_time / reducedValue.count;

                          return reducedValue;
                       };
  • 使用mapFunctionreduceFunctionfinalizeFunction函数对session集合执行 map-reduce。将结果输出到集合session_stat。如果session_stat集合已经存在,则该操作将替换内容:
db.sessions.mapReduce( mapFunction,
                       reduceFunction,
                       {
                         out: "session_stat",
                         finalize: finalizeFunction
                       }
                     )

后续增量 Map-Reduce

以后,随着sessions集合的增长,您可以运行其他 map-reduce 操作。例如,将新文档添加到sessions集合中:

db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );

在一天结束时,对sessions集合执行增量 map-reduce,但使用query字段仅选择新文档。将结果输出到集合session_stat,但将reduce的内容与增量 map-reduce 的结果一起输出:

db.sessions.mapReduce( mapFunction,
                       reduceFunction,
                       {
                         query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
                         out: { reduce: "session_stat" },
                         finalize: finalizeFunction
                       }
                     );