Как подружить Bagri и MongoDB
После достаточно неплохого отклика, решил написать статью о том как можно наращивать функционал Bagri путем написания расширений (extensions) используя встроенный API системы.
На данный момент Bagri публикует два API для подключения к внешним системам: DataFormat API и DataStore API.
Первый предназначен для парсинга документов в новом формате и преобразования их во внутренний формат системы, а также для обратного построения документов в новом формате из внутреннего представления системы.
Второй API служит для загрузки/сохранения/удаления документов из внешних систем хранения. Зачастую, для подключения к новому источнику документов необходимо реализовать оба интерфейса.
Я покажу, как реализовать DataStore connector к MongoDB и использовать его в качестве системы хранения документов. В данном случае реализации DataFormat API не требуется, поскольку Mongo предоставляет документы в JSON формате, который изначально поддерживается системой.
Сразу хочу сделать пару замечаний:
- Практическая польза от такого коннектора? Очевидно, Mongo можно просто использовать в качестве централизованного хранилища документов. Так же он может быть полезен в сценариях, описанных в данной статье, когда данные уже хранятся в Mongo, но ее возможностей стало не хватать для развития функционала системы;
- Я не являюсь знатоком MongoDB, если есть более оптимальные способы работы с ней, я буду рад их услышать;
Итак, начнем.
DataStore API предполагает реализацию интерфейса com.bagri.xdm.cache.api.DocumentStore:
public interface DocumentStore {
/**
* Lifecycle method. Invoked when the store initialized.
*
* @param context the environment context
*/
void init(Map context);
/**
* Lifecycle method. Invoked when parent schema is closing
*/
void close();
/**
* Load document from persistent store
*
* @param key the document key
* @return XDM Document instance if corresponding document found, null otherwise
*/
Document loadDocument(DocumentKey key);
/**
* Load bunch of documents from persistent store
*
* @param keys the collection of document keys to load
* @return the map of loaded documents with their keys
*/
Map loadAllDocuments(Collection keys);
/**
* Load document keys. Can do it in synch or asynch way.
*
* @return iterator over found document keys
*/
Iterable loadAllDocumentKeys();
/**
* Stores document to persistent store.
*
* @param key the document key
* @param value the XDM document instance
*/
void storeDocument(DocumentKey key, Document value);
/**
* Stores bunch of documents to persistent store
*
* @param entries the map of document keys and corresponding document instances
*/
void storeAllDocuments(Map entries);
/**
* Deletes document from persistent store
*
* @param key the document key
*/
void deleteDocument(DocumentKey key);
/**
* Deletes bunch o documents from persistent store
*
* @param keys the keys identifying documents to be deleted
*/
void deleteAllDocuments(Collection keys);
}
Проще всего будет наследовать наш класс, реализующий интерфейс DocumentStore, от абстрактного класса DocumentStoreBase, предоставляющего набор вспомогательных методов для доступа к контексту системы. Начнем с обработки параметров подключения к внешней системе и его инициализации.
Для подключения к Mongo нам понадобятся адрес сервера mongod, имя базы данных и имена коллекций, из которых мы хотим загружать документы. Определим имена для этих параметров: mongo.db.uri, mongo.db.database, mongo.db.collections. Тогда код инициализации подключения к серверу mongo может выглядеть таким образом:
public class MongoDBStore extends DocumentStoreBase implements DocumentStore {
private MongoClient client;
private MongoDatabase db;
private Map> clMap = new HashMap<>();
@Override
public void init(Map context) {
super.setContext(context);
String uri = (String) context.get("mongo.db.uri");
MongoClientURI mcUri = new MongoClientURI(uri);
client = new MongoClient(mcUri);
String dbName = (String) context.get("mongo.db.database");
db = client.getDatabase(dbName);
String clNames = (String) context.get("mongo.db.collections");
boolean all = "*".equals(clNames);
List clns = Arrays.asList(clNames.split(","));
for (String clName: db.listCollectionNames()) {
if (all || clns.contains(clName)) {
MongoCollection cln = db.getCollection(clName);
clMap.put(clName, cln);
}
}
}
@Override
public void close() {
client.close();
}
}
Метод init принимает параметры подключения к серверу mongod из контекста, устанавливает соединение и кэширует объект MongoCollection для каждой коллекции, объявленной для загрузки. Теперь нам нужно реализовать метод для загрузки всех ключей документов.
private String getMappingKey(String id, String cln) {
return id + "::" + cln;
}
private String[] getMappingParts(String keyMap) {
return keyMap.split("::");
}
@Override
public Iterable loadAllDocumentKeys() {
SchemaRepository repo = getRepository();
if (repo == null) {
return null;
}
String id;
DocumentKey key;
Map keyMap = new HashMap<>();
for (MongoCollection cln: clMap.values()) {
String clName = cln.getNamespace().getCollectionName();
// load _ids only
MongoCursor cursor = cln.find().projection(include("_id”)).iterator();
while (cursor.hasNext()) {
org.bson.Document doc = cursor.next();
id = doc.get("_id”).toString();
// TODO: handle possible duplicates via revisions
key = repo.getFactory().newDocumentKey(id, 0, 1);
keyMap.put(key, getMappingKey(id, clName));
}
}
PopulationManagement popManager = repo.getPopulationManagement();
popManager.setKeyMappings(keyMap);
return keyMap.keySet();
}
И методы загрузки документов, соответствующих загруженным ключам:
@Override
public Map loadAllDocuments(Collection keys) {
Map entries = new HashMap<>(keys.size());
for (DocumentKey key: keys) {
Document doc = loadDocument(key);
if (doc != null) {
entries.put(key, doc);
}
}
return entries;
}
@Override
public Document loadDocument(DocumentKey key) {
SchemaRepository repo = getRepository();
Document doc = null;
PopulationManagement popManager = repo.getPopulationManagement();
String id = popManager.getKeyMapping(key);
if (id == null) {
return null;
}
String[] mParts = getMappingParts(id);
Document newDoc = null;
int[] clns = null;
com.bagri.xdm.system.Collection xcl = repo.getSchema().getCollection(mParts[1]);
if (xcl != null) {
clns = new int[] {xcl.getId()};
}
MongoCollection cln = clMap.get(mParts[1]);
Object oid;
Date creDate;
try {
oid = new ObjectId(mParts[0]);
creDate = ((ObjectId) oid).getDate();
} catch (IllegalArgumentException ex) {
oid = mParts[0];
creDate = new Date();
}
org.bson.Document mongoDoc = cln.find(eq("_id", oid)).first();
String content = mongoDoc.toJson(new JsonWriterSettings(true));
try {
DocumentManagementl docMgr = (DocumentManagement) repo.getDocumentManagement();
newDoc = docMgr.createDocument(key, mParts[0], content, "JSON”, creDate, "owner", 1, clns, true);
} catch (XDMException ex) {
// TODO: log error, but do not stop the whole loading
}
return doc;
}
Для начала этого достаточно, методы storeDocument/storeAllDocuments и deleteDocument/deleteAllDocuments предлагаю реализовать читателю самостоятельно. Также прошу учесть, что приведенный выше код предназначен только в целях демонстрации процесса реализации коннектора и не обрабатывает различные исключительные ситуации и возможные дополнительные параметры конфигурации. Полный код коннектора можно посмотреть и собрать из репозитория bagri-extensions.
Теперь нам нужно зарегистрировать DataStore connector и объявить схему, которая будет его использовать. Для этого нам необходимо добавить конфигурацию коннектора в файл
1
2016-08-01T16:17:20.542+03:00
admin
MongoDB data store
true
com.bagri.samples.MongoDBStore
mongodb://localhost:27017
test
*
Мы протестируем работу коннектора на примере коллекции restaurants, рассматриваемой во многих примерах работы с MongoDB. Загрузите коллекцию тестовых документов в Mongo, как показано здесь: docs.mongodb.com/getting-started/shell/import-data. Теперь зарегистрируем схему для работы с MongoDB и настроим ее на загрузку данных из данной коллекции. В том же файле config.xml добавим новую схему в раздел schemas:
Добавляем следующие параметры:
true
mongo
restaurants
1
2016-08-01T21:30:58.096+04:00
admin
Schema for MongoDB
1024
false
0
true
10
true
true
mongo
restaurants
JSON
10300
10400
1
1000000
1
../data/mongo
0
1
0
16
1
true
false
0
localhost
0
157
true
0
25
0
true
32
file:///../data/mongo/
2
1
0
1
1
2
1
0
1
2
1
1
2016-08-01T01:01:26.965+03:00
admin
Mongo restaurants collection
true
Кроме того, нужно создать новый файл с профилем сервера. В каталоге
xdm.cluster.node.schemas=Mongo
Удостоверьтесь, что сервер MongoDB запущен и ждет подключений по адресу, указанному в настройках коннектора. Теперь можно стартовать сервер Bagri. В каталоге
показывающие, что в коннектор инициализировал схему Mongo и загрузил в нее 25359 документов из внешнего сервера MongoDB.
Теперь я покажу, как можно манипулировать документами JSON с помощью запросов XQuery.
Для интерактивного выполнения запросов XQuery нам понадобится клиент, позволяющий это делать. С дистрибутивом Bagri поставляется plugin к VisualVM, предоставляющий данную функциональность. Инструкцию по его установке смотрите здесь.
Запустите административный сервер Bagri
, а закладка QueryManagement с запросами XQuery. Выполните следующий простой запрос для выборки ресторана по его идентификатору:
declare namespace m="http://www.w3.org/2005/xpath-functions/map";
let $props := map{'method': 'json', 'indent': fn:true()}
for $uri in fn:uri-collection("restaurants")
let $map := fn:json-doc($uri)
where m:get($map, 'restaurant_id') = '40362098'
return (fn:serialize($map, $props), '\&\#xa;')
* Обратите внимание, что символ перевода строки в самой последней строке экранирован \, так как Хабр превращает его в действительный перевод строки, так что при выполнении запроса символ \ нужно убрать.
Или другой, для выборки ресторанов по типу кухни:
declare namespace m="http://www.w3.org/2005/xpath-functions/map";
let $props := map{'method': 'json'}
for $uri in fn:uri-collection("restaurants")
let $map := fn:json-doc($uri)
where m:get($map, 'cuisine') = 'Bakery'
return (fn:serialize($map, $props), '\&\#xa;')
XQuery легко позволяет делать любые выборки, доступные в Mongo (кроме запросов по гео-индексам, прямо из коробки они еще не поддерживаются).
Теперь я покажу запросы, которые не поддерживаются в MongoDB: JOIN. Для этого можно привести данные в коллекции restaurants к более нормализованному виду, например отделить отзывы о ресторанах от данных по самому ресторану и сохранить их в разных коллекциях.
Выполните данный запрос и сохраните результаты в файл, потом сделайте импорт полученных данных в MongoDB, в коллекцию rest-short.
declare namespace m="http://www.w3.org/2005/xpath-functions/map";
let $props := map{'method': 'json'}
for $uri in fn:uri-collection("restaurants")
let $rest := fn:json-doc($uri)
let $rest := m:remove($rest, '_id')
let $rest := m:remove($rest, 'grades')
return (fn:serialize($rest, $props), '\&\#xa;')
Следующий запрос выводит данные по отзывам. Так же сохраните их в отдельный файл и затем импортируйте в MongoDB в коллекцию grades.
declare namespace a="http://www.w3.org/2005/xpath-functions/array";
declare namespace m="http://www.w3.org/2005/xpath-functions/map";
let $props := map{'method': 'json'}
for $uri in fn:uri-collection("restaurants")
let $rest := fn:json-doc($uri)
let $grades := m:get($rest, 'grades')
return
for $i in (1 to a:size($grades))
let $grade := a:get($grades, $i)
let $date := m:get($grade, 'date')
return ('{"restaurant_id": "', m:get($rest, 'restaurant_id'),
'", "date": ', fn:serialize($date, $props),
', "grade": "', m:get($grade, 'grade'),
'", "score": "', m:get($grade, 'score'), '"}', '\&\#xa;')
Теперь поправьте настройки схемы, чтобы заявить новые коллекции для загрузки:
………...
rest-short, grades
……...
1
2016-08-01T01:01:26.965+03:00
admin
Restaurant headers collection
true
1
2016-08-01T01:01:26.965+03:00
admin
Restaurant grades collection
true
Рестартуйте сервер Bagri для загрузки новых коллекций с данными. Теперь можно проверить, как работают join«ы. Выполните следующий запрос для формирования полной структуры restaurants из двух коллекций:
declare namespace m="http://www.w3.org/2005/xpath-functions/map";
let $props := map{'method': 'json'}
for $ruri in fn:uri-collection("rest-short")
let $rest := fn:json-doc($ruri)
let $rid := m:get($rest, 'restaurant_id')
let $addr := m:get($rest, 'address')
let $txt := ('{"restaurant_id": "', $rid,
'", "cuisine": "', m:get($rest, 'cuisine'),
'", "name": "', m:get($rest, 'name'),
'", "borough": "', m:get($rest, 'borough'),
'", "address": ', fn:serialize($addr, $props),
', "grades": [')
return ($txt, fn:string-join(
for $guri in fn:uri-collection("grades")
let $grade := fn:json-doc($guri)
let $gid := m:get($grade, 'restaurant_id')
where $gid = $rid
return fn:serialize(m:remove(m:remove($grade, '_id'), 'restaurant_id'), $props), ', '), ']}\&\#xa;')
Итак, мы с вами рассмотрели как можно реализовать DataStore connector к MongoDB и использовать его в качестве системы хранения документов. Надеюсь эта статья сможет стать для вас отправной точкой для написания других расширений Багри или просто побудит вас более подробно ознакомиться с этим интересным продуктом. Проекту всегда требуются Java разработчки заинтересованные в развитии Bagri, более подробно на код проекта можно посмотреть на Гитхабе.