]> source.dussan.org Git - archiva.git/blob
cde1d29feed0489e19e1ddbceb2baf3e405360fa
[archiva.git] /
1 package org.apache.maven.archiva.scheduled;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.commons.collections.CollectionUtils;
23 import org.apache.maven.archiva.common.ArchivaException;
24 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
25 import org.apache.maven.archiva.configuration.ConfigurationEvent;
26 import org.apache.maven.archiva.configuration.ConfigurationListener;
27 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
28 import org.apache.maven.archiva.database.ArchivaDAO;
29 import org.apache.maven.archiva.database.constraints.MostRecentRepositoryScanStatistics;
30 import org.apache.maven.archiva.repository.scanner.RepositoryScanStatistics;
31 import org.apache.maven.archiva.scheduled.tasks.ArchivaTask;
32 import org.apache.maven.archiva.scheduled.tasks.DatabaseTask;
33 import org.apache.maven.archiva.scheduled.tasks.RepositoryTask;
34 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskNameSelectionPredicate;
35 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskSelectionPredicate;
36 import org.apache.maven.archiva.scheduled.tasks.TaskCreator;
37 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
38 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
39 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
40 import org.codehaus.plexus.scheduler.CronExpressionValidator;
41 import org.codehaus.plexus.scheduler.Scheduler;
42 import org.codehaus.plexus.taskqueue.Task;
43 import org.codehaus.plexus.taskqueue.TaskQueue;
44 import org.codehaus.plexus.taskqueue.TaskQueueException;
45 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
46 import org.quartz.CronTrigger;
47 import org.quartz.JobDataMap;
48 import org.quartz.JobDetail;
49 import org.quartz.SchedulerException;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import java.text.ParseException;
54 import java.util.ArrayList;
55 import java.util.HashSet;
56 import java.util.List;
57 import java.util.Set;
58
59 /**
60  * Default implementation of a scheduling component for archiva.
61  *
62  * @plexus.component role="org.apache.maven.archiva.scheduled.ArchivaTaskScheduler" role-hint="default"
63  */
64 public class DefaultArchivaTaskScheduler
65     implements ArchivaTaskScheduler, Startable, ConfigurationListener
66 {
67     private Logger log = LoggerFactory.getLogger( DefaultArchivaTaskScheduler.class );
68     
69     /**
70      * @plexus.requirement
71      */
72     private Scheduler scheduler;
73
74     /**
75      * @plexus.requirement role-hint="database-update"
76      */
77     private TaskQueue databaseUpdateQueue;
78
79     /**
80      * @plexus.requirement role-hint="repository-scanning"
81      */
82     private TaskQueue repositoryScanningQueue;
83
84     /**
85      * @plexus.requirement
86      */
87     private ArchivaConfiguration archivaConfiguration;
88     
89     /**
90      * @plexus.requirement role-hint="jdo"
91      */
92     private ArchivaDAO dao;
93
94     public static final String DATABASE_SCAN_GROUP = "database-group";
95
96     public static final String DATABASE_JOB = "database-job";
97
98     public static final String DATABASE_JOB_TRIGGER = "database-job-trigger";
99
100     public static final String REPOSITORY_SCAN_GROUP = "repository-group";
101
102     public static final String REPOSITORY_JOB = "repository-job";
103
104     public static final String REPOSITORY_JOB_TRIGGER = "repository-job-trigger";
105
106     public static final String CRON_HOURLY = "0 0 * * * ?";
107
108     private Set<String> jobs = new HashSet<String>();
109     
110     private List<String> queuedRepos = new ArrayList<String>();
111
112     public void startup()
113         throws ArchivaException
114     {
115         archivaConfiguration.addListener( this );
116
117         try
118         {
119             start();
120         }
121         catch ( StartingException e )
122         {
123             throw new ArchivaException( e.getMessage(), e );
124         }
125     }
126     
127     public void start()
128         throws StartingException
129     {
130         try
131         {
132             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
133                 .getManagedRepositories();
134
135             for ( ManagedRepositoryConfiguration repoConfig : repositories )
136             {
137                 if ( repoConfig.isScanned() )
138                 {
139                     scheduleRepositoryJobs( repoConfig );
140                     
141                     if( !isPreviouslyScanned( repoConfig ) )
142                     {
143                         queueInitialRepoScan( repoConfig );
144                     }
145                 }
146             }
147
148             scheduleDatabaseJobs();
149         }
150         catch ( SchedulerException e )
151         {
152             throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
153         }
154     }
155
156     @SuppressWarnings("unchecked")
157     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
158     {
159         List<RepositoryScanStatistics> results =
160             (List<RepositoryScanStatistics>) dao.query( new MostRecentRepositoryScanStatistics( repoConfig.getId() ) );
161
162         if ( results != null && !results.isEmpty() )
163         {
164             return true;
165         }
166
167         return false;
168     }
169     
170     // MRM-848: Pre-configured repository initially appear to be empty
171     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
172     {
173         String repoId = repoConfig.getId();        
174         RepositoryTask task = TaskCreator.createRepositoryTask( repoId, "initial-scan" );
175
176         if ( queuedRepos.contains( repoId ) )
177         {
178             log.error( "Repository [" + repoId + "] is currently being processed or is already queued." );
179         }
180         else
181         {
182             try
183             {
184                 queuedRepos.add( repoConfig.getId() );
185                 this.queueRepositoryTask( task );
186             }
187             catch ( TaskQueueException e )
188             {
189                 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
190             }
191         }
192     }
193     
194     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
195         throws SchedulerException
196     {
197         if ( repoConfig.getRefreshCronExpression() == null )
198         {
199             log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
200             return;
201         }
202         
203         if ( !repoConfig.isScanned() )
204         {
205             log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
206             return;
207         }
208
209         // get the cron string for these database scanning jobs
210         String cronString = repoConfig.getRefreshCronExpression();
211
212         CronExpressionValidator cronValidator = new CronExpressionValidator();
213         if ( !cronValidator.validate( cronString ) )
214         {
215             log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
216                 "] is invalid.  Defaulting to hourly." );
217             cronString = CRON_HOURLY;
218         }
219
220         // setup the unprocessed artifact job
221         JobDetail repositoryJob =
222             new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
223
224         JobDataMap dataMap = new JobDataMap();
225         dataMap.put( RepositoryTaskJob.TASK_QUEUE, repositoryScanningQueue );
226         dataMap.put( RepositoryTaskJob.TASK_QUEUE_POLICY, ArchivaTask.QUEUE_POLICY_WAIT );
227         dataMap.put( RepositoryTaskJob.TASK_REPOSITORY, repoConfig.getId() );
228         repositoryJob.setJobDataMap( dataMap );
229
230         try
231         {
232             CronTrigger trigger =
233                 new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
234
235             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
236             scheduler.scheduleJob( repositoryJob, trigger );
237         }
238         catch ( ParseException e )
239         {
240             log.error(
241                 "ParseException in repository scanning cron expression, disabling repository scanning for '" +
242                     repoConfig.getId() + "': " + e.getMessage() );
243         }
244
245     }
246
247     private synchronized void scheduleDatabaseJobs()
248         throws SchedulerException
249     {
250         String cronString = archivaConfiguration.getConfiguration().getDatabaseScanning().getCronExpression();
251
252         // setup the unprocessed artifact job
253         JobDetail databaseJob = new JobDetail( DATABASE_JOB, DATABASE_SCAN_GROUP, DatabaseTaskJob.class );
254
255         JobDataMap dataMap = new JobDataMap();
256         dataMap.put( DatabaseTaskJob.TASK_QUEUE, databaseUpdateQueue );
257         databaseJob.setJobDataMap( dataMap );
258
259         CronExpressionValidator cronValidator = new CronExpressionValidator();
260         if ( !cronValidator.validate( cronString ) )
261         {
262             log.warn(
263                 "Cron expression [" + cronString + "] for database update is invalid.  Defaulting to hourly." );
264             cronString = CRON_HOURLY;
265         }
266
267         try
268         {
269             CronTrigger trigger = new CronTrigger( DATABASE_JOB_TRIGGER, DATABASE_SCAN_GROUP, cronString );
270
271             scheduler.scheduleJob( databaseJob, trigger );
272         }
273         catch ( ParseException e )
274         {
275             log.error(
276                 "ParseException in database scanning cron expression, disabling database scanning: " + e.getMessage() );
277         }
278
279     }
280
281     public void stop()
282         throws StoppingException
283     {
284         try
285         {
286             scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
287
288             for ( String job : jobs )
289             {
290                 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
291             }
292             jobs.clear();
293             queuedRepos.clear();
294         }
295         catch ( SchedulerException e )
296         {
297             throw new StoppingException( "Unable to unschedule tasks", e );
298         }
299     }
300
301     public void scheduleDatabaseTasks()
302         throws TaskExecutionException
303     {
304         try
305         {
306             scheduleDatabaseJobs();
307         }
308         catch ( SchedulerException e )
309         {
310             throw new TaskExecutionException( "Unable to schedule repository jobs: " + e.getMessage(), e );
311
312         }
313     }
314
315     @SuppressWarnings("unchecked")
316     public boolean isProcessingAnyRepositoryTask()
317         throws ArchivaException
318     {
319         synchronized( repositoryScanningQueue )
320         {
321             List<? extends Task> queue = null;
322     
323             try
324             {
325                 queue = repositoryScanningQueue.getQueueSnapshot();
326             }
327             catch ( TaskQueueException e )
328             {
329                 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
330             }
331     
332             return !queue.isEmpty();
333         }
334     }
335
336     @SuppressWarnings("unchecked")
337     public boolean isProcessingRepositoryTask( String repositoryId )
338         throws ArchivaException
339     {
340         synchronized( repositoryScanningQueue )
341         {
342             List<? extends Task> queue = null;
343     
344             try
345             {
346                 queue = repositoryScanningQueue.getQueueSnapshot();
347             }
348             catch ( TaskQueueException e )
349             {
350                 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
351             }
352     
353             return CollectionUtils.exists( queue, new RepositoryTaskSelectionPredicate( repositoryId ) );
354         }
355     }
356     
357     @SuppressWarnings("unchecked")
358     public boolean isProcessingRepositoryTaskWithName( String taskName )
359         throws ArchivaException
360     {
361         synchronized( repositoryScanningQueue )
362         {
363             List<? extends Task> queue = null;
364     
365             try
366             {
367                 queue = repositoryScanningQueue.getQueueSnapshot();
368             }
369             catch ( TaskQueueException e )
370             {
371                 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
372             }
373     
374             return CollectionUtils.exists( queue, new RepositoryTaskNameSelectionPredicate( taskName ) );
375         }
376     }
377
378     @SuppressWarnings("unchecked")
379     public boolean isProcessingDatabaseTask()
380         throws ArchivaException
381     {
382         List<? extends Task> queue = null;
383
384         try
385         {
386             queue = databaseUpdateQueue.getQueueSnapshot();
387         }
388         catch ( TaskQueueException e )
389         {
390             throw new ArchivaException( "Unable to get database update queue:" + e.getMessage(), e );
391         }
392
393         return !queue.isEmpty();
394     }
395
396     public void queueRepositoryTask( RepositoryTask task )
397         throws TaskQueueException
398     {
399         synchronized( repositoryScanningQueue )
400         {
401             if( task.getResourceFile() != null )
402             {
403                 try
404                 {
405                     if( isProcessingRepositoryTaskWithName( task.getName() ) )
406                     {
407                         log.debug( "Repository task '" + task.getName() + "' is already queued. Skipping task.." );
408                         return;
409                     }
410                 }
411                 catch ( ArchivaException e )
412                 {
413                     log.warn( "Error occurred while checking if repository task '" + task.getName() +
414                         "' is already queued." );
415                 }
416             }
417             
418             // add check if the task is already queued if it is a file scan 
419             repositoryScanningQueue.put( task );
420         }
421     }
422
423     public void queueDatabaseTask( DatabaseTask task )
424         throws TaskQueueException
425     {
426         databaseUpdateQueue.put( task );
427     }
428
429     public void configurationEvent( ConfigurationEvent event )
430     {
431         if ( event.getType() == ConfigurationEvent.SAVED )
432         {
433             try
434             {
435                 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
436
437                 scheduleDatabaseJobs();
438             }
439             catch ( SchedulerException e )
440             {
441                 log.error( "Error restarting the database scanning job after property change." );
442             }
443
444             for ( String job : jobs )
445             {
446                 try
447                 {
448                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
449                 }
450                 catch ( SchedulerException e )
451                 {
452                     log.error( "Error restarting the repository scanning job after property change." );
453                 }
454             }
455             jobs.clear();
456
457             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration().getManagedRepositories();
458
459             for ( ManagedRepositoryConfiguration repoConfig : repositories )
460             {
461                 if ( repoConfig.getRefreshCronExpression() != null )
462                 {
463                     try
464                     {
465                         scheduleRepositoryJobs( repoConfig );
466                     }
467                     catch ( SchedulerException e )
468                     {
469                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
470                     }
471                 }
472             }
473         }
474     }
475 }