DATADOC-255 - Add to MongoOperations and executeCommand with an additional integer options argument
DATADOC-256 - Update to use MongoDB driver version 2.6.5 DATADOC-7 - Support for map-reduce operations in MongoTemplate
This commit is contained in:
@@ -13,7 +13,7 @@
|
||||
<name>Spring Data MongoDB Support</name>
|
||||
|
||||
<properties>
|
||||
<mongo.version>2.5.3</mongo.version>
|
||||
<mongo.version>2.6.5</mongo.version>
|
||||
<querydsl.version>2.2.0</querydsl.version>
|
||||
</properties>
|
||||
|
||||
|
||||
@@ -28,6 +28,8 @@ import org.springframework.data.mongodb.core.convert.MongoConverter;
|
||||
import org.springframework.data.mongodb.core.geo.GeoResult;
|
||||
import org.springframework.data.mongodb.core.geo.GeoResults;
|
||||
import org.springframework.data.mongodb.core.index.IndexDefinition;
|
||||
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
|
||||
import org.springframework.data.mongodb.core.mapreduce.MapReduceResults;
|
||||
import org.springframework.data.mongodb.core.query.NearQuery;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.Update;
|
||||
@@ -67,6 +69,15 @@ public interface MongoOperations {
|
||||
*/
|
||||
CommandResult executeCommand(DBObject command);
|
||||
|
||||
/**
|
||||
* Execute a MongoDB command. Any errors that result from executing this command will be converted into Spring's DAO
|
||||
* exception hierarchy.
|
||||
*
|
||||
* @param command a MongoDB command
|
||||
* @param options query options to use
|
||||
*/
|
||||
CommandResult executeCommand(DBObject command, int options);
|
||||
|
||||
/**
|
||||
* Execute a MongoDB query and iterate over the query results on a per-document basis with a DocumentCallbackHandler.
|
||||
*
|
||||
@@ -254,6 +265,53 @@ public interface MongoOperations {
|
||||
* @return the converted collection
|
||||
*/
|
||||
<T> List<T> findAll(Class<T> entityClass, String collectionName);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Execute a map-reduce operation. The map-reduce operation will be formed with an output type of INLINE
|
||||
* @param mapFunction The JavaScript map function
|
||||
* @param reduceFunction The JavaScript reduce function
|
||||
* @param mapReduceOptions Options that specify detailed map-reduce behavior
|
||||
* @param entityClass The parameterized type of the returned list
|
||||
* @return The results of the map reduce operation
|
||||
*/
|
||||
<T> MapReduceResults<T> mapReduce(String mapFunction, String reduceFunction, Class<T> entityClass );
|
||||
|
||||
|
||||
/**
|
||||
* Execute a map-reduce operation that takes additional map-reduce options.
|
||||
* @param mapFunction The JavaScript map function
|
||||
* @param reduceFunction The JavaScript reduce function
|
||||
* @param mapReduceOptions Options that specify detailed map-reduce behavior
|
||||
* @param entityClass The parameterized type of the returned list
|
||||
* @return The results of the map reduce operation
|
||||
*/
|
||||
<T> MapReduceResults<T> mapReduce(String mapFunction, String reduceFunction, MapReduceOptions mapReduceOptions, Class<T> entityClass );
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Execute a map-reduce operation that takes a query. The map-reduce operation will be formed with an output type of INLINE
|
||||
* @param query The query to use to select the data for the map phase
|
||||
* @param mapFunction The JavaScript map function
|
||||
* @param reduceFunction The JavaScript reduce function
|
||||
* @param mapReduceOptions Options that specify detailed map-reduce behavior
|
||||
* @param entityClass The parameterized type of the returned list
|
||||
* @return The results of the map reduce operation
|
||||
*/
|
||||
<T> MapReduceResults<T> mapReduce(Query query, String mapFunction, String reduceFunction, Class<T> entityClass );
|
||||
|
||||
/**
|
||||
* Execute a map-reduce operation that takes a query and additional map-reduce options
|
||||
* @param query The query to use to select the data for the map phase
|
||||
* @param mapFunction The JavaScript map function
|
||||
* @param reduceFunction The JavaScript reduce function
|
||||
* @param mapReduceOptions Options that specify detailed map-reduce behavior
|
||||
* @param entityClass The parameterized type of the returned list
|
||||
* @return The results of the map reduce operation
|
||||
*/
|
||||
<T> MapReduceResults<T> mapReduce(Query query, String mapFunction, String reduceFunction, MapReduceOptions mapReduceOptions, Class<T> entityClass );
|
||||
|
||||
/**
|
||||
* Returns {@link GeoResult} for all entities matching the given {@link NearQuery}. Will consider entity mapping
|
||||
|
||||
@@ -34,6 +34,8 @@ import com.mongodb.DB;
|
||||
import com.mongodb.DBCollection;
|
||||
import com.mongodb.DBCursor;
|
||||
import com.mongodb.DBObject;
|
||||
import com.mongodb.MapReduceCommand;
|
||||
import com.mongodb.MapReduceOutput;
|
||||
import com.mongodb.Mongo;
|
||||
import com.mongodb.MongoException;
|
||||
import com.mongodb.WriteConcern;
|
||||
@@ -77,6 +79,8 @@ import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
|
||||
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
|
||||
import org.springframework.data.mongodb.core.mapreduce.MapReduceResults;
|
||||
import org.springframework.data.mongodb.core.query.NearQuery;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.Update;
|
||||
@@ -254,6 +258,23 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware {
|
||||
}
|
||||
});
|
||||
|
||||
logCommandExecutionError(command, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
public CommandResult executeCommand(final DBObject command, final int options) {
|
||||
|
||||
CommandResult result = execute(new DbCallback<CommandResult>() {
|
||||
public CommandResult doInDB(DB db) throws MongoException, DataAccessException {
|
||||
return db.command(command, options);
|
||||
}
|
||||
});
|
||||
|
||||
logCommandExecutionError(command, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
protected void logCommandExecutionError(final DBObject command, CommandResult result) {
|
||||
String error = result.getErrorMessage();
|
||||
if (error != null) {
|
||||
// TODO: DATADOC-204 allow configuration of logging level / throw
|
||||
@@ -262,7 +283,6 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware {
|
||||
// command.toString() + " failed: " + error);
|
||||
LOGGER.warn("Command execution of " + command.toString() + " failed: " + error);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public void executeQuery(Query query, String collectionName, DocumentCallbackHandler dch) {
|
||||
@@ -273,9 +293,10 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware {
|
||||
DBObject queryObject = query.getQueryObject();
|
||||
DBObject fieldsObject = query.getFieldsObject();
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("find using query: " + queryObject + " fields: " + fieldsObject + " in collection: " + collectionName);
|
||||
LOGGER.debug("find using query: " + queryObject + " fields: " + fieldsObject + " in collection: "
|
||||
+ collectionName);
|
||||
}
|
||||
this.executeQueryInternal(new FindCallback(queryObject, fieldsObject), preparer, dch, collectionName);
|
||||
this.executeQueryInternal(new FindCallback(queryObject, fieldsObject), preparer, dch, collectionName);
|
||||
}
|
||||
|
||||
public <T> T execute(DbCallback<T> action) {
|
||||
@@ -806,6 +827,98 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware {
|
||||
entityClass), collectionName);
|
||||
}
|
||||
|
||||
public <T> MapReduceResults<T> mapReduce(String mapFunction, String reduceFunction, Class<T> entityClass) {
|
||||
return mapReduce(null, mapFunction, reduceFunction, new MapReduceOptions().outputTypeInline(), entityClass);
|
||||
}
|
||||
|
||||
public <T> MapReduceResults<T> mapReduce(String mapFunction, String reduceFunction,
|
||||
MapReduceOptions mapReduceOptions, Class<T> entityClass) {
|
||||
return mapReduce(null, mapFunction, reduceFunction, mapReduceOptions, entityClass);
|
||||
}
|
||||
|
||||
public <T> MapReduceResults<T> mapReduce(Query query, String mapFunction, String reduceFunction, Class<T> entityClass) {
|
||||
return mapReduce(query, mapFunction, reduceFunction, new MapReduceOptions().outputTypeInline(), entityClass);
|
||||
}
|
||||
|
||||
public <T> MapReduceResults<T> mapReduce(Query query, String mapFunction, String reduceFunction,
|
||||
MapReduceOptions mapReduceOptions, Class<T> entityClass) {
|
||||
DBCollection inputCollection = getCollection(this.determineCollectionName(entityClass));
|
||||
MapReduceCommand command = new MapReduceCommand(inputCollection, mapFunction, reduceFunction,
|
||||
mapReduceOptions.getOutputCollection(), mapReduceOptions.getOutputType(), null);
|
||||
|
||||
DBObject commandObject = copyQuery(query, copyMapReduceOptions(mapReduceOptions, command));
|
||||
|
||||
CommandResult commandResult = null;
|
||||
try {
|
||||
if (command.getOutputType() == MapReduceCommand.OutputType.INLINE) {
|
||||
commandResult = executeCommand(commandObject, getDb().getOptions());
|
||||
} else {
|
||||
commandResult = executeCommand(commandObject);
|
||||
}
|
||||
commandResult.throwOnError();
|
||||
} catch (RuntimeException ex) {
|
||||
this.potentiallyConvertRuntimeException(ex);
|
||||
}
|
||||
|
||||
MapReduceOutput mapReduceOutput = new MapReduceOutput(inputCollection, commandObject, commandResult);
|
||||
List<T> mappedResults = new ArrayList<T>();
|
||||
DbObjectCallback<T> callback = new ReadDbObjectCallback<T>(mongoConverter, entityClass);
|
||||
for (DBObject dbObject : mapReduceOutput.results()) {
|
||||
mappedResults.add(callback.doWith(dbObject));
|
||||
}
|
||||
|
||||
MapReduceResults<T> mapReduceResult = new MapReduceResults<T>(mappedResults, commandResult);
|
||||
return mapReduceResult;
|
||||
|
||||
}
|
||||
|
||||
private DBObject copyQuery(Query query, DBObject copyMapReduceOptions) {
|
||||
if (query != null) {
|
||||
if (query.getSkip() != 0 || query.getFieldsObject() != null) {
|
||||
throw new InvalidDataAccessApiUsageException(
|
||||
"Can not use skip or field specification with map reduce operations");
|
||||
}
|
||||
if (query.getQueryObject() != null) {
|
||||
copyMapReduceOptions.put("query", query.getQueryObject());
|
||||
}
|
||||
if (query.getLimit() > 0) {
|
||||
copyMapReduceOptions.put("limit", query.getLimit());
|
||||
}
|
||||
if (query.getSortObject() != null) {
|
||||
copyMapReduceOptions.put("sort", query.getSortObject());
|
||||
}
|
||||
}
|
||||
return copyMapReduceOptions;
|
||||
}
|
||||
|
||||
private DBObject copyMapReduceOptions(MapReduceOptions mapReduceOptions, MapReduceCommand command) {
|
||||
if (mapReduceOptions.getJavaScriptMode() != null) {
|
||||
command.addExtraOption("jsMode", true);
|
||||
}
|
||||
if (!mapReduceOptions.getExtraOptions().isEmpty()) {
|
||||
for (Map.Entry<String, Object> entry : mapReduceOptions.getExtraOptions().entrySet()) {
|
||||
command.addExtraOption(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
if (mapReduceOptions.getFinalizeFunction() != null) {
|
||||
command.setFinalize(mapReduceOptions.getFinalizeFunction());
|
||||
}
|
||||
if (mapReduceOptions.getOutputDatabase() != null) {
|
||||
command.setOutputDB(mapReduceOptions.getOutputDatabase());
|
||||
}
|
||||
if (!mapReduceOptions.getScopeVariables().isEmpty()) {
|
||||
command.setScope(mapReduceOptions.getScopeVariables());
|
||||
}
|
||||
|
||||
DBObject commandObject = command.toDBObject();
|
||||
DBObject outObject = (DBObject) commandObject.get("out");
|
||||
|
||||
if (mapReduceOptions.getOutputSharded() != null) {
|
||||
outObject.put("sharded", mapReduceOptions.getOutputSharded());
|
||||
}
|
||||
return commandObject;
|
||||
}
|
||||
|
||||
public Set<String> getCollectionNames() {
|
||||
return execute(new DbCallback<Set<String>>() {
|
||||
public Set<String> doInDB(DB db) throws MongoException, DataAccessException {
|
||||
@@ -1092,9 +1205,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware {
|
||||
throw potentiallyConvertRuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void executeQueryInternal(CollectionCallback<DBCursor> collectionCallback,
|
||||
CursorPreparer preparer, DocumentCallbackHandler callbackHandler, String collectionName) {
|
||||
|
||||
private void executeQueryInternal(CollectionCallback<DBCursor> collectionCallback, CursorPreparer preparer,
|
||||
DocumentCallbackHandler callbackHandler, String collectionName) {
|
||||
|
||||
try {
|
||||
DBCursor cursor = collectionCallback.doInCollection(getAndPrepareCollection(getDb(), collectionName));
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright 2010-2011 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.mapreduce;
|
||||
|
||||
public class MapReduceCounts {
|
||||
|
||||
private int inputCount;
|
||||
|
||||
private int emitCount;
|
||||
|
||||
private int outputCount;
|
||||
|
||||
public MapReduceCounts(int inputCount, int emitCount, int outputCount) {
|
||||
super();
|
||||
this.inputCount = inputCount;
|
||||
this.emitCount = emitCount;
|
||||
this.outputCount = outputCount;
|
||||
}
|
||||
|
||||
public int getInputCount() {
|
||||
return inputCount;
|
||||
}
|
||||
|
||||
public int getEmitCount() {
|
||||
return emitCount;
|
||||
}
|
||||
|
||||
public int getOutputCount() {
|
||||
return outputCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MapReduceCounts [inputCount=" + inputCount + ", emitCount=" + emitCount + ", outputCount=" + outputCount
|
||||
+ "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + emitCount;
|
||||
result = prime * result + inputCount;
|
||||
result = prime * result + outputCount;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
MapReduceCounts other = (MapReduceCounts) obj;
|
||||
if (emitCount != other.emitCount)
|
||||
return false;
|
||||
if (inputCount != other.inputCount)
|
||||
return false;
|
||||
if (outputCount != other.outputCount)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.mapreduce;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.mongodb.BasicDBObject;
|
||||
@@ -33,15 +34,24 @@ public class MapReduceOptions {
|
||||
|
||||
private String finalizeFunction;
|
||||
|
||||
private Map<String, Object> scopeVariables;
|
||||
private Map<String, Object> scopeVariables = new HashMap<String, Object>();
|
||||
|
||||
private Boolean jsMode;
|
||||
|
||||
private Boolean verbose = true;
|
||||
|
||||
private DBObject extraOptions = new BasicDBObject();
|
||||
private Map<String, Object> extraOptions = new HashMap<String, Object>();
|
||||
|
||||
|
||||
/**
|
||||
* Static factory method to create a Criteria using the provided key
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
public static MapReduceOptions options() {
|
||||
return new MapReduceOptions();
|
||||
}
|
||||
|
||||
/**
|
||||
* Limit the number of objects to return from the collection that is fed into the map reduce operation Often used in
|
||||
@@ -194,7 +204,40 @@ public class MapReduceOptions {
|
||||
extraOptions.put(key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public Map<String, Object> getExtraOptions() {
|
||||
return extraOptions;
|
||||
}
|
||||
|
||||
public String getFinalizeFunction() {
|
||||
return this.finalizeFunction;
|
||||
}
|
||||
|
||||
public Boolean getJavaScriptMode() {
|
||||
return this.jsMode;
|
||||
}
|
||||
|
||||
public String getOutputCollection() {
|
||||
return this.outputCollection;
|
||||
}
|
||||
|
||||
public String getOutputDatabase() {
|
||||
return this.outputDatabase;
|
||||
}
|
||||
|
||||
public Boolean getOutputSharded() {
|
||||
return this.outputSharded;
|
||||
}
|
||||
|
||||
public MapReduceCommand.OutputType getOutputType() {
|
||||
return this.outputType;
|
||||
}
|
||||
|
||||
public Map<String, Object> getScopeVariables() {
|
||||
return this.scopeVariables;
|
||||
}
|
||||
|
||||
|
||||
public DBObject getOptionsObject() {
|
||||
BasicDBObject cmd = new BasicDBObject();
|
||||
|
||||
@@ -227,13 +270,13 @@ public class MapReduceOptions {
|
||||
out.put("inline", 1);
|
||||
break;
|
||||
case REPLACE:
|
||||
out.put("replace", outputType);
|
||||
out.put("replace", outputCollection);
|
||||
break;
|
||||
case MERGE:
|
||||
out.put("merge", outputType);
|
||||
out.put("merge", outputCollection);
|
||||
break;
|
||||
case REDUCE:
|
||||
out.put("reduce", outputType);
|
||||
out.put("reduce", outputCollection);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Copyright 2011 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.mapreduce;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import com.mongodb.DBObject;
|
||||
|
||||
public class MapReduceResults<T> implements Iterable<T> {
|
||||
|
||||
private final List<T> mappedResults;
|
||||
|
||||
private DBObject rawResults;
|
||||
|
||||
private MapReduceTiming mapReduceTiming;
|
||||
|
||||
private MapReduceCounts mapReduceCounts;
|
||||
|
||||
private String outputCollection;
|
||||
|
||||
public MapReduceResults(List<T> mappedResults, DBObject rawResults) {
|
||||
Assert.notNull(mappedResults);
|
||||
Assert.notNull(rawResults);
|
||||
this.mappedResults = mappedResults;
|
||||
this.rawResults = rawResults;
|
||||
parseTiming(rawResults);
|
||||
parseCounts(rawResults);
|
||||
if (rawResults.get("result") != null) {
|
||||
this.outputCollection = (String) rawResults.get("result");
|
||||
}
|
||||
}
|
||||
|
||||
public Iterator<T> iterator() {
|
||||
return mappedResults.iterator();
|
||||
}
|
||||
|
||||
public MapReduceTiming getTiming() {
|
||||
return mapReduceTiming;
|
||||
}
|
||||
|
||||
public MapReduceCounts getCounts() {
|
||||
return mapReduceCounts;
|
||||
}
|
||||
|
||||
public String getOutputCollection() {
|
||||
return outputCollection;
|
||||
}
|
||||
|
||||
public DBObject getRawResults() {
|
||||
return rawResults;
|
||||
}
|
||||
|
||||
protected void parseTiming(DBObject rawResults) {
|
||||
DBObject timing = (DBObject) rawResults.get("timing");
|
||||
if (timing != null) {
|
||||
if (timing.get("mapTime") != null && timing.get("emitLoop") != null && timing.get("total") != null) {
|
||||
mapReduceTiming = new MapReduceTiming( (Long)timing.get("mapTime"),
|
||||
(Integer)timing.get("emitLoop"),
|
||||
(Integer)timing.get("total"));
|
||||
}
|
||||
} else {
|
||||
mapReduceTiming = new MapReduceTiming(-1,-1,-1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void parseCounts(DBObject rawResults) {
|
||||
DBObject counts = (DBObject) rawResults.get("counts");
|
||||
if (counts != null) {
|
||||
if (counts.get("input") != null && counts.get("emit") != null && counts.get("output") != null) {
|
||||
mapReduceCounts = new MapReduceCounts( (Integer)counts.get("input"), (Integer)counts.get("emit"), (Integer)counts.get("output"));
|
||||
}
|
||||
} else {
|
||||
mapReduceCounts = new MapReduceCounts(-1,-1,-1);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright 2010-2011 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.mapreduce;
|
||||
|
||||
public class MapReduceTiming {
|
||||
|
||||
private long mapTime;
|
||||
|
||||
private long emitLoopTime;
|
||||
|
||||
private long totalTime;
|
||||
|
||||
public MapReduceTiming(long mapTime, long emitLoopTime, long totalTime) {
|
||||
this.mapTime = mapTime;
|
||||
this.emitLoopTime = emitLoopTime;
|
||||
this.totalTime = totalTime;
|
||||
}
|
||||
|
||||
public long getMapTime() {
|
||||
return mapTime;
|
||||
}
|
||||
|
||||
public long getEmitLoopTime() {
|
||||
return emitLoopTime;
|
||||
}
|
||||
|
||||
public long getTotalTime() {
|
||||
return totalTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MapReduceTiming [mapTime=" + mapTime + ", emitLoopTime=" + emitLoopTime + ", totalTime=" + totalTime + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + (int) (emitLoopTime ^ (emitLoopTime >>> 32));
|
||||
result = prime * result + (int) (mapTime ^ (mapTime >>> 32));
|
||||
result = prime * result + (int) (totalTime ^ (totalTime >>> 32));
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
MapReduceTiming other = (MapReduceTiming) obj;
|
||||
if (emitLoopTime != other.emitLoopTime)
|
||||
return false;
|
||||
if (mapTime != other.mapTime)
|
||||
return false;
|
||||
if (totalTime != other.totalTime)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -29,7 +29,6 @@ public class MapReduceOptionsTests {
|
||||
@Test
|
||||
public void testFinalize() {
|
||||
MapReduceOptions o = new MapReduceOptions().finalizeFunction("code");
|
||||
assertEquals("{ \"finalize\" : \"code\"}", o.getOptionsObject().toString());
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,190 @@
|
||||
/*
|
||||
* Copyright 2011 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.mapreduce;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.springframework.data.mongodb.core.query.Criteria.where;
|
||||
import static org.springframework.data.mongodb.core.mapreduce.MapReduceOptions.options;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.mongodb.MongoDbFactory;
|
||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
||||
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
|
||||
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import com.mongodb.BasicDBObject;
|
||||
import com.mongodb.DBCollection;
|
||||
import com.mongodb.Mongo;
|
||||
|
||||
/**
|
||||
* Integration test for {@link MongoTemplate}'s Map-Reduce operations
|
||||
*
|
||||
* @author Mark Pollack
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration("classpath:infrastructure.xml")
|
||||
public class MapReduceTests {
|
||||
|
||||
private String mapFunction = "function(){ for ( var i=0; i<this.x.length; i++ ){ emit( this.x[i] , 1 ); } }";
|
||||
private String reduceFunction = "function(key,values){ var sum=0; for( var i=0; i<values.length; i++ ) sum += values[i]; return sum;}";
|
||||
|
||||
@Autowired
|
||||
MongoTemplate template;
|
||||
@Autowired
|
||||
MongoDbFactory factory;
|
||||
MongoTemplate mongoTemplate;
|
||||
|
||||
@Autowired
|
||||
@SuppressWarnings("unchecked")
|
||||
public void setMongo(Mongo mongo) throws Exception {
|
||||
|
||||
MongoMappingContext mappingContext = new MongoMappingContext();
|
||||
mappingContext.setInitialEntitySet(new HashSet<Class<?>>(Arrays.asList(ValueObject.class)));
|
||||
mappingContext.afterPropertiesSet();
|
||||
|
||||
MappingMongoConverter mappingConverter = new MappingMongoConverter(factory, mappingContext);
|
||||
mappingConverter.afterPropertiesSet();
|
||||
this.mongoTemplate = new MongoTemplate(factory, mappingConverter);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
cleanDb();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
cleanDb();
|
||||
}
|
||||
|
||||
protected void cleanDb() {
|
||||
template.dropCollection(template.getCollectionName(ValueObject.class));
|
||||
template.dropCollection("jmr1_out");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapReduce() {
|
||||
performMapReduce(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapReduceInline() {
|
||||
performMapReduce(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapReduceWithQuery() {
|
||||
performMapReduce(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapReduceInlineWithScope() {
|
||||
createMapReduceData();
|
||||
|
||||
Map<String, Object> scopeVariables = new HashMap<String, Object>();
|
||||
scopeVariables.put("exclude", "a");
|
||||
|
||||
String mapWithExcludeFunction = "function(){ for ( var i=0; i<this.x.length; i++ ){ if(this.x[i] != exclude) emit( this.x[i] , 1 ); } }";
|
||||
|
||||
MapReduceResults<ValueObject> results = mongoTemplate.mapReduce(mapWithExcludeFunction, reduceFunction,
|
||||
new MapReduceOptions().scopeVariables(scopeVariables).outputTypeInline(), ValueObject.class);
|
||||
Map<String, Float> m = copyToMap(results);
|
||||
assertEquals(3, m.size());
|
||||
assertEquals(2, m.get("b").intValue());
|
||||
assertEquals(2, m.get("c").intValue());
|
||||
assertEquals(1, m.get("d").intValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMapReduceExcludeQuery() {
|
||||
createMapReduceData();
|
||||
|
||||
Query query = new Query(where("x").ne(new String[] { "a", "b" }));
|
||||
MapReduceResults<ValueObject> results = mongoTemplate.mapReduce(query, mapFunction, reduceFunction, ValueObject.class);
|
||||
|
||||
Map<String, Float> m = copyToMap(results);
|
||||
assertEquals(3, m.size());
|
||||
assertEquals(1, m.get("b").intValue());
|
||||
assertEquals(2, m.get("c").intValue());
|
||||
assertEquals(1, m.get("d").intValue());
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void performMapReduce(boolean inline, boolean withQuery) {
|
||||
createMapReduceData();
|
||||
MapReduceResults<ValueObject> results;
|
||||
if (inline) {
|
||||
if (withQuery) {
|
||||
results = mongoTemplate.mapReduce(new Query(), mapFunction, reduceFunction, ValueObject.class);
|
||||
} else {
|
||||
results = mongoTemplate.mapReduce(mapFunction, reduceFunction, ValueObject.class);
|
||||
}
|
||||
} else {
|
||||
if (withQuery) {
|
||||
results = mongoTemplate.mapReduce(new Query(), mapFunction, reduceFunction, options().outputCollection("jmr1_out"), ValueObject.class);
|
||||
} else {
|
||||
results = mongoTemplate.mapReduce(mapFunction, reduceFunction, new MapReduceOptions().outputCollection("jmr1_out"), ValueObject.class);
|
||||
}
|
||||
}
|
||||
Map<String, Float> m = copyToMap(results);
|
||||
assertMapReduceResults(m);
|
||||
}
|
||||
|
||||
private void createMapReduceData() {
|
||||
DBCollection c = mongoTemplate.getDb().getCollection(template.getCollectionName(ValueObject.class));
|
||||
c.save(new BasicDBObject("x", new String[] { "a", "b" }));
|
||||
c.save(new BasicDBObject("x", new String[] { "b", "c" }));
|
||||
c.save(new BasicDBObject("x", new String[] { "c", "d" }));
|
||||
}
|
||||
|
||||
private Map<String, Float> copyToMap(MapReduceResults<ValueObject> results) {
|
||||
List<ValueObject> valueObjects = new ArrayList<ValueObject>();
|
||||
for (ValueObject valueObject : results) {
|
||||
valueObjects.add(valueObject);
|
||||
}
|
||||
|
||||
Map<String, Float> m = new HashMap<String, Float>();
|
||||
for (ValueObject vo : valueObjects) {
|
||||
m.put(vo.getId(), vo.getValue());
|
||||
}
|
||||
return m;
|
||||
}
|
||||
|
||||
private void assertMapReduceResults(Map<String, Float> m) {
|
||||
assertEquals(4, m.size());
|
||||
assertEquals(1, m.get("a").intValue());
|
||||
assertEquals(2, m.get("b").intValue());
|
||||
assertEquals(2, m.get("c").intValue());
|
||||
assertEquals(1, m.get("d").intValue());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package org.springframework.data.mongodb.core.mapreduce;
|
||||
|
||||
public class ValueObject {
|
||||
|
||||
private String id;
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
private float value;
|
||||
|
||||
public float getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(float value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ValueObject [id=" + id + ", value=" + value + "]";
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user