/** * MailArchiver is an application that provides services for storing and managing e-mail messages through a Web Services SOAP interface. * Copyright (C) 2012 Marcio Andre Scholl Levien and Fernando Alberto Reuter Wendt and Jose Ronaldo Nogueira Fonseca Junior * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ /******************************************************************************\ * * This product was developed by * * SERVIÇO FEDERAL DE PROCESSAMENTO DE DADOS (SERPRO), * * a government company established under Brazilian law (5.615/70), * at Department of Development of Porto Alegre. * \******************************************************************************/ package serpro.mailarchiver.util; import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.br.BrazilianAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexWriterConfig.OpenMode; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.queryParser.ParseException; import org.apache.lucene.queryParser.QueryParser; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Configurable; @Configurable(preConstruction=true) public class LuceneIndex { private static final Logger log = Logger.getLocalLogger(); public enum Order { DateAsc ("date", SortField.STRING, false), DateDesc ("date", SortField.STRING, true), SubjectAsc ("subject", SortField.STRING, false), SubjectDesc ("subject", SortField.STRING, true), FromAsc ("from", SortField.STRING, false), FromDesc ("from", SortField.STRING, true), FromMboxAsc ("from_mbox", SortField.STRING, false), FromMboxDesc ("from_mbox", SortField.STRING, true), SenderAsc ("sender", SortField.STRING, false), SenderDesc ("sender", SortField.STRING, true), SenderMboxAsc ("sender_mbox", SortField.STRING, false), SenderMboxDesc ("sender_mbox", SortField.STRING, true), ToAsc ("to", SortField.STRING, false), ToDesc ("to", SortField.STRING, true), ToMboxAsc ("to_mbox", SortField.STRING, false), ToMboxDesc ("to_mbox", SortField.STRING, true), CcAsc ("cc", SortField.STRING, false), CcDesc ("cc", SortField.STRING, true), CcMboxAsc ("cc_mbox", SortField.STRING, false), CcMboxDesc ("cc_mbox", SortField.STRING, true), BccAsc ("bcc", SortField.STRING, false), BccDesc ("bcc", SortField.STRING, true), BccMboxAsc ("bcc_mbox", SortField.STRING, false), BccMboxDesc ("bcc_mbox", SortField.STRING, true); private final SortField sortField; Order(String field, int type, boolean reverse) { sortField = new SortField(field, type, reverse); } } private Directory directory; private Analyzer analyzer; private TieredMergePolicy mergePolicy; private IndexWriterConfig writerConfig; private IndexWriter writer; private static final long MAX_INACTIVITY_INTERVAL = 300000; private final Timer closeTimer; private TimerTask closeTask; private long lastRequestTime; public synchronized void addMessage(Document doc) throws IOException { insureOpenness(); try { writer.addDocument(doc); writer.commit(); } catch(OutOfMemoryError ex) { close(); log.error(ex); throw new IOException(ex); } } public synchronized void deleteMessage(String messageId) throws IOException { insureOpenness(); Term term = new Term("message_id", messageId); try { writer.deleteDocuments(term); writer.commit(); } catch(OutOfMemoryError ex) { close(); log.error(ex); throw new IOException(ex); } } public synchronized String[] search(String queryExpression, Order... sortOrder) throws IOException { insureOpenness(); IndexReader reader = null; IndexSearcher searcher = null; try { reader = IndexReader.open(writer, true); searcher = new IndexSearcher(reader); QueryParser parser = new QueryParser(Version.LUCENE_35, "body", analyzer); //ComplexPhraseQueryParser parser = new ComplexPhraseQueryParser(Version.LUCENE_35, "body", analyzer); parser.setAllowLeadingWildcard(true); try { Query query = parser.parse(queryExpression); List sortFields = new ArrayList(); for(Order order : sortOrder) { if( ! sortFields.contains(order.sortField)) { sortFields.add(order.sortField); } } TopDocs results; if(sortFields.size() > 0) { Sort sort = new Sort(); sort.setSort(sortFields.toArray(new SortField[sortFields.size()])); results = searcher.search(query, Integer.MAX_VALUE, sort); } else { results = searcher.search(query, Integer.MAX_VALUE); } ScoreDoc[] hits = results.scoreDocs; String[] oids = new String[hits.length]; for(int i = 0; i < hits.length; i++) { Document doc = searcher.doc(hits[i].doc); oids[i] = doc.get("message_id"); } return oids; } catch(ParseException ex) { log.error(ex); return new String[]{}; } } finally { close(searcher); close(reader); } } //-------------------------------------------------------------------------- @Autowired private UserAppConfig userAppConfig; private final Path absolutePath; public final Path getAbsolutePath() { return absolutePath; } private static final Map instances = new HashMap(); public static LuceneIndex getInstance(String userId) { synchronized (instances) { LuceneIndex instance = instances.get(userId); if (instance == null) { instance = new LuceneIndex(userId); instances.put(userId, instance); } return instance; } } protected LuceneIndex(String userId) { absolutePath = userAppConfig.SERVER.getArchiveDir() .resolve("index") .resolve(userId); closeTimer = new Timer("lucene-close-timer-" + userId, true); } private synchronized void insureOpenness() throws IOException { lastRequestTime = System.currentTimeMillis(); if(writer == null) { close(); System.out.println("Opening Lucene index"); directory = FSDirectory.open(absolutePath.toFile()); if(IndexWriter.isLocked(directory)) { log.warn("Lucene directory is locked"); IndexWriter.unlock(directory); } if(IndexWriter.isLocked(directory)) { log.error("Lucene directory still locked"); throw new IOException("Lucene directory locked"); } analyzer = new BrazilianAnalyzer(Version.LUCENE_35); writerConfig = new IndexWriterConfig(Version.LUCENE_35, analyzer); writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND); mergePolicy = new TieredMergePolicy(); mergePolicy.setSegmentsPerTier(4); mergePolicy.setMaxMergeAtOnce(4); mergePolicy.setNoCFSRatio(1); writerConfig.setMergePolicy(mergePolicy); writer = new IndexWriter(directory, writerConfig); closeTimer.schedule(createCloseTask(), MAX_INACTIVITY_INTERVAL); } } private TimerTask createCloseTask() { closeTask = new TimerTask() { @Override public void run() { synchronized(LuceneIndex.this) { long currentTime = System.currentTimeMillis(); long inactivityInterval = currentTime - lastRequestTime; if(inactivityInterval >= MAX_INACTIVITY_INTERVAL) { System.out.println("Closing idle Lucene index to save resources"); close(); } else { closeTimer.schedule(createCloseTask(), MAX_INACTIVITY_INTERVAL - inactivityInterval); } } } }; return closeTask; } public synchronized void close() { if(closeTask != null) { closeTask.cancel(); closeTask = null; } closeTimer.purge(); close(writer); writer = null; close(directory); directory = null; close(mergePolicy); mergePolicy = null; close(analyzer); analyzer = null; } private void close(Closeable resource) { if(resource != null) { try { resource.close(); } catch(Exception ex) { log.error(ex); } } } @Override protected void finalize() throws Throwable { try { close(); } finally { super.finalize(); } } }