/**
* MailArchiver is an application that provides services for storing and managing e-mail messages through a Web Services SOAP interface.
* Copyright (C) 2012 Marcio Andre Scholl Levien and Fernando Alberto Reuter Wendt and Jose Ronaldo Nogueira Fonseca Junior
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
/******************************************************************************\
*
* This product was developed by
*
* SERVIÇO FEDERAL DE PROCESSAMENTO DE DADOS (SERPRO),
*
* a government company established under Brazilian law (5.615/70),
* at Department of Development of Porto Alegre.
*
\******************************************************************************/
package serpro.mailarchiver.util;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.br.BrazilianAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.queryParser.ParseException;
import org.apache.lucene.queryParser.QueryParser;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
@Configurable(preConstruction=true)
public class LuceneIndex {
private static final Logger log = Logger.getLocalLogger();
public enum Order {
DateAsc ("date", SortField.STRING, false),
DateDesc ("date", SortField.STRING, true),
SubjectAsc ("subject", SortField.STRING, false),
SubjectDesc ("subject", SortField.STRING, true),
FromAsc ("from", SortField.STRING, false),
FromDesc ("from", SortField.STRING, true),
FromMboxAsc ("from_mbox", SortField.STRING, false),
FromMboxDesc ("from_mbox", SortField.STRING, true),
SenderAsc ("sender", SortField.STRING, false),
SenderDesc ("sender", SortField.STRING, true),
SenderMboxAsc ("sender_mbox", SortField.STRING, false),
SenderMboxDesc ("sender_mbox", SortField.STRING, true),
ToAsc ("to", SortField.STRING, false),
ToDesc ("to", SortField.STRING, true),
ToMboxAsc ("to_mbox", SortField.STRING, false),
ToMboxDesc ("to_mbox", SortField.STRING, true),
CcAsc ("cc", SortField.STRING, false),
CcDesc ("cc", SortField.STRING, true),
CcMboxAsc ("cc_mbox", SortField.STRING, false),
CcMboxDesc ("cc_mbox", SortField.STRING, true),
BccAsc ("bcc", SortField.STRING, false),
BccDesc ("bcc", SortField.STRING, true),
BccMboxAsc ("bcc_mbox", SortField.STRING, false),
BccMboxDesc ("bcc_mbox", SortField.STRING, true);
private final SortField sortField;
Order(String field, int type, boolean reverse) {
sortField = new SortField(field, type, reverse);
}
}
private Directory directory;
private Analyzer analyzer;
private TieredMergePolicy mergePolicy;
private IndexWriterConfig writerConfig;
private IndexWriter writer;
private static final long MAX_INACTIVITY_INTERVAL = 300000;
private final Timer closeTimer;
private TimerTask closeTask;
private long lastRequestTime;
public synchronized void addMessage(Document doc) throws IOException {
insureOpenness();
try {
writer.addDocument(doc);
writer.commit();
}
catch(OutOfMemoryError ex) {
close();
log.error(ex);
throw new IOException(ex);
}
}
public synchronized void deleteMessage(String messageId) throws IOException {
insureOpenness();
Term term = new Term("message_id", messageId);
try {
writer.deleteDocuments(term);
writer.commit();
}
catch(OutOfMemoryError ex) {
close();
log.error(ex);
throw new IOException(ex);
}
}
public synchronized String[] search(String queryExpression, Order... sortOrder) throws IOException {
insureOpenness();
IndexReader reader = null;
IndexSearcher searcher = null;
try {
reader = IndexReader.open(writer, true);
searcher = new IndexSearcher(reader);
QueryParser parser = new QueryParser(Version.LUCENE_35, "body", analyzer);
//ComplexPhraseQueryParser parser = new ComplexPhraseQueryParser(Version.LUCENE_35, "body", analyzer);
parser.setAllowLeadingWildcard(true);
try {
Query query = parser.parse(queryExpression);
List sortFields = new ArrayList();
for(Order order : sortOrder) {
if( ! sortFields.contains(order.sortField)) {
sortFields.add(order.sortField);
}
}
TopDocs results;
if(sortFields.size() > 0) {
Sort sort = new Sort();
sort.setSort(sortFields.toArray(new SortField[sortFields.size()]));
results = searcher.search(query, Integer.MAX_VALUE, sort);
}
else {
results = searcher.search(query, Integer.MAX_VALUE);
}
ScoreDoc[] hits = results.scoreDocs;
String[] oids = new String[hits.length];
for(int i = 0; i < hits.length; i++) {
Document doc = searcher.doc(hits[i].doc);
oids[i] = doc.get("message_id");
}
return oids;
}
catch(ParseException ex) {
log.error(ex);
return new String[]{};
}
}
finally {
close(searcher);
close(reader);
}
}
//--------------------------------------------------------------------------
@Autowired
private UserAppConfig userAppConfig;
private final Path absolutePath;
public final Path getAbsolutePath() {
return absolutePath;
}
private static final Map instances = new HashMap();
public static LuceneIndex getInstance(String userId) {
synchronized (instances) {
LuceneIndex instance = instances.get(userId);
if (instance == null) {
instance = new LuceneIndex(userId);
instances.put(userId, instance);
}
return instance;
}
}
protected LuceneIndex(String userId) {
absolutePath = userAppConfig.SERVER.getArchiveDir()
.resolve("index")
.resolve(userId);
closeTimer = new Timer("lucene-close-timer-" + userId, true);
}
private synchronized void insureOpenness() throws IOException {
lastRequestTime = System.currentTimeMillis();
if(writer == null) {
close();
System.out.println("Opening Lucene index");
directory = FSDirectory.open(absolutePath.toFile());
if(IndexWriter.isLocked(directory)) {
log.warn("Lucene directory is locked");
IndexWriter.unlock(directory);
}
if(IndexWriter.isLocked(directory)) {
log.error("Lucene directory still locked");
throw new IOException("Lucene directory locked");
}
analyzer = new BrazilianAnalyzer(Version.LUCENE_35);
writerConfig = new IndexWriterConfig(Version.LUCENE_35, analyzer);
writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND);
mergePolicy = new TieredMergePolicy();
mergePolicy.setSegmentsPerTier(4);
mergePolicy.setMaxMergeAtOnce(4);
mergePolicy.setNoCFSRatio(1);
writerConfig.setMergePolicy(mergePolicy);
writer = new IndexWriter(directory, writerConfig);
closeTimer.schedule(createCloseTask(), MAX_INACTIVITY_INTERVAL);
}
}
private TimerTask createCloseTask() {
closeTask = new TimerTask() {
@Override
public void run() {
synchronized(LuceneIndex.this) {
long currentTime = System.currentTimeMillis();
long inactivityInterval = currentTime - lastRequestTime;
if(inactivityInterval >= MAX_INACTIVITY_INTERVAL) {
System.out.println("Closing idle Lucene index to save resources");
close();
}
else {
closeTimer.schedule(createCloseTask(), MAX_INACTIVITY_INTERVAL - inactivityInterval);
}
}
}
};
return closeTask;
}
public synchronized void close() {
if(closeTask != null) {
closeTask.cancel();
closeTask = null;
}
closeTimer.purge();
close(writer);
writer = null;
close(directory);
directory = null;
close(mergePolicy);
mergePolicy = null;
close(analyzer);
analyzer = null;
}
private void close(Closeable resource) {
if(resource != null) {
try {
resource.close();
}
catch(Exception ex) {
log.error(ex);
}
}
}
@Override
protected void finalize() throws Throwable {
try {
close();
}
finally {
super.finalize();
}
}
}