]> source.dussan.org Git - archiva.git/blob
634e5edd3213f65c79a287a781bce21e91fab27b
[archiva.git] /
1 package org.apache.archiva.scheduler.repository;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.archiva.metadata.repository.MetadataRepository;
23 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
24 import org.apache.archiva.metadata.repository.RepositorySession;
25 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
26 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
27 import org.apache.archiva.scheduler.ArchivaTaskScheduler;
28 import org.apache.maven.archiva.common.ArchivaException;
29 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
30 import org.apache.maven.archiva.configuration.ConfigurationEvent;
31 import org.apache.maven.archiva.configuration.ConfigurationListener;
32 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
33 import org.codehaus.plexus.taskqueue.TaskQueue;
34 import org.codehaus.plexus.taskqueue.TaskQueueException;
35 import org.codehaus.redback.components.scheduler.CronExpressionValidator;
36 import org.codehaus.redback.components.scheduler.Scheduler;
37 import org.quartz.CronTrigger;
38 import org.quartz.JobDataMap;
39 import org.quartz.JobDetail;
40 import org.quartz.SchedulerException;
41 import org.quartz.impl.JobDetailImpl;
42 import org.quartz.impl.triggers.CronTriggerImpl;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.springframework.stereotype.Service;
46
47 import javax.annotation.PostConstruct;
48 import javax.annotation.PreDestroy;
49 import javax.inject.Inject;
50 import javax.inject.Named;
51 import java.text.ParseException;
52 import java.util.ArrayList;
53 import java.util.HashSet;
54 import java.util.List;
55 import java.util.Set;
56
57 /**
58  * Default implementation of a scheduling component for archiva.
59  */
60 @Service( "archivaTaskScheduler#repository" )
61 public class RepositoryArchivaTaskScheduler
62     implements ArchivaTaskScheduler<RepositoryTask>, ConfigurationListener
63 {
64     private Logger log = LoggerFactory.getLogger( RepositoryArchivaTaskScheduler.class );
65
66     /**
67      * plexus.requirement
68      */
69     @Inject
70     private Scheduler scheduler;
71
72     @Inject
73     private CronExpressionValidator cronValidator;
74
75     /**
76      * plexus.requirement role-hint="repository-scanning"
77      */
78     @Inject
79     @Named( value = "taskQueue#repository-scanning" )
80     private TaskQueue repositoryScanningQueue;
81
82     /**
83      * plexus.requirement
84      */
85     @Inject
86     private ArchivaConfiguration archivaConfiguration;
87
88     /**
89      * plexus.requirement
90      */
91     @Inject
92     @Named( value = "repositoryStatisticsManager#default" )
93     private RepositoryStatisticsManager repositoryStatisticsManager;
94
95     /**
96      * TODO: could have multiple implementations
97      * <p/>
98      * plexus.requirement
99      */
100     @Inject
101     private RepositorySessionFactory repositorySessionFactory;
102
103     private static final String REPOSITORY_SCAN_GROUP = "rg";
104
105     private static final String REPOSITORY_JOB = "rj";
106
107     private static final String REPOSITORY_JOB_TRIGGER = "rjt";
108
109     static final String TASK_QUEUE = "TASK_QUEUE";
110
111     static final String TASK_REPOSITORY = "TASK_REPOSITORY";
112
113     public static final String CRON_HOURLY = "0 0 * * * ?";
114
115     private Set<String> jobs = new HashSet<String>();
116
117     private List<String> queuedRepos = new ArrayList<String>();
118
119     @PostConstruct
120     public void startup()
121         throws ArchivaException
122     {
123         archivaConfiguration.addListener( this );
124
125         List<ManagedRepositoryConfiguration> repositories =
126             archivaConfiguration.getConfiguration().getManagedRepositories();
127
128         RepositorySession repositorySession = repositorySessionFactory.createSession();
129         try
130         {
131             MetadataRepository metadataRepository = repositorySession.getRepository();
132             for ( ManagedRepositoryConfiguration repoConfig : repositories )
133             {
134                 if ( repoConfig.isScanned() )
135                 {
136                     try
137                     {
138                         scheduleRepositoryJobs( repoConfig );
139                     }
140                     catch ( SchedulerException e )
141                     {
142                         throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
143                     }
144
145                     try
146                     {
147                         if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
148                         {
149                             queueInitialRepoScan( repoConfig );
150                         }
151                     }
152                     catch ( MetadataRepositoryException e )
153                     {
154                         log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: "
155                                       + e.getMessage(), e );
156                     }
157                 }
158             }
159         }
160         finally
161         {
162             repositorySession.close();
163         }
164
165     }
166
167
168     @PreDestroy
169     public void stop()
170         throws SchedulerException
171     {
172         for ( String job : jobs )
173         {
174             scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
175         }
176         jobs.clear();
177         queuedRepos.clear();
178
179     }
180
181     @SuppressWarnings( "unchecked" )
182     public boolean isProcessingRepositoryTask( String repositoryId )
183     {
184         synchronized ( repositoryScanningQueue )
185         {
186             List<RepositoryTask> queue = null;
187
188             try
189             {
190                 queue = repositoryScanningQueue.getQueueSnapshot();
191             }
192             catch ( TaskQueueException e )
193             {
194                 // not possible with plexus-taskqueue implementation, ignore
195             }
196
197             for ( RepositoryTask queuedTask : queue )
198             {
199                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
200                 {
201                     return true;
202                 }
203             }
204             return false;
205         }
206     }
207
208     @SuppressWarnings( "unchecked" )
209     private boolean isProcessingRepositoryTask( RepositoryTask task )
210     {
211         synchronized ( repositoryScanningQueue )
212         {
213             List<RepositoryTask> queue = null;
214
215             try
216             {
217                 queue = repositoryScanningQueue.getQueueSnapshot();
218             }
219             catch ( TaskQueueException e )
220             {
221                 // not possible with plexus-taskqueue implementation, ignore
222             }
223
224             for ( RepositoryTask queuedTask : queue )
225             {
226                 if ( task.equals( queuedTask ) )
227                 {
228                     return true;
229                 }
230             }
231             return false;
232         }
233     }
234
235     public void queueTask( RepositoryTask task )
236         throws TaskQueueException
237     {
238         synchronized ( repositoryScanningQueue )
239         {
240             if ( isProcessingRepositoryTask( task ) )
241             {
242                 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
243             }
244             else
245             {
246                 // add check if the task is already queued if it is a file scan
247                 repositoryScanningQueue.put( task );
248             }
249         }
250     }
251
252     public boolean unQueueTask( RepositoryTask task )
253         throws TaskQueueException
254     {
255         synchronized ( repositoryScanningQueue )
256         {
257             if ( isProcessingRepositoryTask( task ) )
258             {
259                 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
260                 return false;
261             }
262             else
263             {
264                 return repositoryScanningQueue.remove( task );
265             }
266         }
267     }
268     public void configurationEvent( ConfigurationEvent event )
269     {
270         if ( event.getType() == ConfigurationEvent.SAVED )
271         {
272             for ( String job : jobs )
273             {
274                 try
275                 {
276                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
277                 }
278                 catch ( SchedulerException e )
279                 {
280                     log.error( "Error restarting the repository scanning job after property change." );
281                 }
282             }
283             jobs.clear();
284
285             List<ManagedRepositoryConfiguration> repositories =
286                 archivaConfiguration.getConfiguration().getManagedRepositories();
287
288             for ( ManagedRepositoryConfiguration repoConfig : repositories )
289             {
290                 if ( repoConfig.getRefreshCronExpression() != null )
291                 {
292                     try
293                     {
294                         scheduleRepositoryJobs( repoConfig );
295                     }
296                     catch ( SchedulerException e )
297                     {
298                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
299                     }
300                 }
301             }
302         }
303     }
304
305     @SuppressWarnings( "unchecked" )
306     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
307                                          MetadataRepository metadataRepository )
308         throws MetadataRepositoryException
309     {
310         return repositoryStatisticsManager.getLastStatistics( metadataRepository, repoConfig.getId() ) != null;
311     }
312
313     // MRM-848: Pre-configured repository initially appear to be empty
314     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
315     {
316         String repoId = repoConfig.getId();
317         RepositoryTask task = new RepositoryTask();
318         task.setRepositoryId( repoId );
319
320         if ( !queuedRepos.contains( repoId ) )
321         {
322             log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." );
323
324             try
325             {
326                 queuedRepos.add( repoConfig.getId() );
327                 this.queueTask( task );
328             }
329             catch ( TaskQueueException e )
330             {
331                 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
332             }
333         }
334     }
335
336     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
337         throws SchedulerException
338     {
339         if ( repoConfig.getRefreshCronExpression() == null )
340         {
341             log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
342             return;
343         }
344
345         if ( !repoConfig.isScanned() )
346         {
347             log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
348             return;
349         }
350
351         // get the cron string for these database scanning jobs
352         String cronString = repoConfig.getRefreshCronExpression();
353
354         if ( !cronValidator.validate( cronString ) )
355         {
356             log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId()
357                           + "] is invalid.  Defaulting to hourly." );
358             cronString = CRON_HOURLY;
359         }
360
361         // setup the unprocessed artifact job
362         JobDetailImpl repositoryJob =
363             new JobDetailImpl( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
364
365         repositoryJob.getJobDataMap().put( TASK_QUEUE, repositoryScanningQueue );
366         repositoryJob.getJobDataMap().put( TASK_REPOSITORY, repoConfig.getId() );
367
368         try
369         {
370             CronTriggerImpl trigger =
371                 new CronTriggerImpl( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
372
373             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
374             scheduler.scheduleJob( repositoryJob, trigger );
375         }
376         catch ( ParseException e )
377         {
378             log.error( "ParseException in repository scanning cron expression, disabling repository scanning for '"
379                            + repoConfig.getId() + "': " + e.getMessage() );
380         }
381
382     }
383 }