]> source.dussan.org Git - archiva.git/blob
1a468d8b24c5cd2ae443232f0d886dd3a1e66141
[archiva.git] /
1 package org.apache.maven.archiva.scheduled;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.commons.collections.CollectionUtils;
23 import org.apache.maven.archiva.common.ArchivaException;
24 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
25 import org.apache.maven.archiva.configuration.ConfigurationEvent;
26 import org.apache.maven.archiva.configuration.ConfigurationListener;
27 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
28 import org.apache.maven.archiva.database.ArchivaDAO;
29 import org.apache.maven.archiva.database.constraints.MostRecentRepositoryScanStatistics;
30 import org.apache.maven.archiva.repository.scanner.RepositoryScanStatistics;
31 import org.apache.maven.archiva.scheduled.tasks.ArchivaTask;
32 import org.apache.maven.archiva.scheduled.tasks.DatabaseTask;
33 import org.apache.maven.archiva.scheduled.tasks.RepositoryTask;
34 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskSelectionPredicate;
35 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
36 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
37 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
38 import org.codehaus.plexus.scheduler.CronExpressionValidator;
39 import org.codehaus.plexus.scheduler.Scheduler;
40 import org.codehaus.plexus.taskqueue.Task;
41 import org.codehaus.plexus.taskqueue.TaskQueue;
42 import org.codehaus.plexus.taskqueue.TaskQueueException;
43 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
44 import org.quartz.CronTrigger;
45 import org.quartz.JobDataMap;
46 import org.quartz.JobDetail;
47 import org.quartz.SchedulerException;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import java.text.ParseException;
52 import java.util.ArrayList;
53 import java.util.HashSet;
54 import java.util.List;
55 import java.util.Set;
56
57 /**
58  * Default implementation of a scheduling component for archiva.
59  *
60  * @plexus.component role="org.apache.maven.archiva.scheduled.ArchivaTaskScheduler" role-hint="default"
61  */
62 public class DefaultArchivaTaskScheduler
63     implements ArchivaTaskScheduler, Startable, ConfigurationListener
64 {
65     private Logger log = LoggerFactory.getLogger( DefaultArchivaTaskScheduler.class );
66     
67     /**
68      * @plexus.requirement
69      */
70     private Scheduler scheduler;
71
72     /**
73      * @plexus.requirement role-hint="database-update"
74      */
75     private TaskQueue databaseUpdateQueue;
76
77     /**
78      * @plexus.requirement role-hint="repository-scanning"
79      */
80     private TaskQueue repositoryScanningQueue;
81
82     /**
83      * @plexus.requirement
84      */
85     private ArchivaConfiguration archivaConfiguration;
86     
87     /**
88      * @plexus.requirement role-hint="jdo"
89      */
90     private ArchivaDAO dao;
91
92     public static final String DATABASE_SCAN_GROUP = "database-group";
93
94     public static final String DATABASE_JOB = "database-job";
95
96     public static final String DATABASE_JOB_TRIGGER = "database-job-trigger";
97
98     public static final String REPOSITORY_SCAN_GROUP = "repository-group";
99
100     public static final String REPOSITORY_JOB = "repository-job";
101
102     public static final String REPOSITORY_JOB_TRIGGER = "repository-job-trigger";
103
104     public static final String CRON_HOURLY = "0 0 * * * ?";
105
106     private Set<String> jobs = new HashSet<String>();
107     
108     private List<String> queuedRepos = new ArrayList<String>();
109
110     public void startup()
111         throws ArchivaException
112     {
113         archivaConfiguration.addListener( this );
114
115         try
116         {
117             start();
118         }
119         catch ( StartingException e )
120         {
121             throw new ArchivaException( e.getMessage(), e );
122         }
123     }
124     
125     public void start()
126         throws StartingException
127     {
128         try
129         {
130             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
131                 .getManagedRepositories();
132
133             for ( ManagedRepositoryConfiguration repoConfig : repositories )
134             {
135                 if ( repoConfig.isScanned() )
136                 {
137                     scheduleRepositoryJobs( repoConfig );
138                     
139                     if( !isPreviouslyScanned( repoConfig ) )
140                     {
141                         queueInitialRepoScan( repoConfig );
142                     }
143                 }
144             }
145
146             scheduleDatabaseJobs();
147         }
148         catch ( SchedulerException e )
149         {
150             throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
151         }
152     }
153
154     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
155     {
156         List<RepositoryScanStatistics> results =
157             dao.query( new MostRecentRepositoryScanStatistics( repoConfig.getId() ) );
158
159         if ( results != null && !results.isEmpty() )
160         {
161             return true;
162         }
163
164         return false;
165     }
166     
167     // MRM-848: Pre-configured repository initially appear to be empty
168     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
169     {
170         String repoId = repoConfig.getId();
171
172         RepositoryTask task = new RepositoryTask();
173         task.setRepositoryId( repoId );
174         task.setName( REPOSITORY_JOB + ":" + repoId + ":initial-scan" );
175         task.setQueuePolicy( ArchivaTask.QUEUE_POLICY_WAIT );
176
177         boolean scheduleTask = false;
178
179         if ( queuedRepos.contains( repoId ) )
180         {
181             log.error( "Repository [" + repoId + "] is currently being processed or is already queued." );
182         }
183         else
184         {
185             scheduleTask = true;
186         }
187
188         if ( scheduleTask )
189         {
190             try
191             {
192                 queuedRepos.add( repoConfig.getId() );
193                 this.queueRepositoryTask( task );
194             }
195             catch ( TaskQueueException e )
196             {
197                 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
198             }
199         }
200     }
201     
202     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
203         throws SchedulerException
204     {
205         if ( repoConfig.getRefreshCronExpression() == null )
206         {
207             log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
208             return;
209         }
210         
211         if ( !repoConfig.isScanned() )
212         {
213             log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
214             return;
215         }
216
217         // get the cron string for these database scanning jobs
218         String cronString = repoConfig.getRefreshCronExpression();
219
220         CronExpressionValidator cronValidator = new CronExpressionValidator();
221         if ( !cronValidator.validate( cronString ) )
222         {
223             log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
224                 "] is invalid.  Defaulting to hourly." );
225             cronString = CRON_HOURLY;
226         }
227
228         // setup the unprocessed artifact job
229         JobDetail repositoryJob =
230             new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
231
232         JobDataMap dataMap = new JobDataMap();
233         dataMap.put( RepositoryTaskJob.TASK_QUEUE, repositoryScanningQueue );
234         dataMap.put( RepositoryTaskJob.TASK_QUEUE_POLICY, ArchivaTask.QUEUE_POLICY_WAIT );
235         dataMap.put( RepositoryTaskJob.TASK_REPOSITORY, repoConfig.getId() );
236         repositoryJob.setJobDataMap( dataMap );
237
238         try
239         {
240             CronTrigger trigger =
241                 new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
242
243             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
244             scheduler.scheduleJob( repositoryJob, trigger );
245         }
246         catch ( ParseException e )
247         {
248             log.error(
249                 "ParseException in repository scanning cron expression, disabling repository scanning for '" +
250                     repoConfig.getId() + "': " + e.getMessage() );
251         }
252
253     }
254
255     private synchronized void scheduleDatabaseJobs()
256         throws SchedulerException
257     {
258         String cronString = archivaConfiguration.getConfiguration().getDatabaseScanning().getCronExpression();
259
260         // setup the unprocessed artifact job
261         JobDetail databaseJob = new JobDetail( DATABASE_JOB, DATABASE_SCAN_GROUP, DatabaseTaskJob.class );
262
263         JobDataMap dataMap = new JobDataMap();
264         dataMap.put( DatabaseTaskJob.TASK_QUEUE, databaseUpdateQueue );
265         databaseJob.setJobDataMap( dataMap );
266
267         CronExpressionValidator cronValidator = new CronExpressionValidator();
268         if ( !cronValidator.validate( cronString ) )
269         {
270             log.warn(
271                 "Cron expression [" + cronString + "] for database update is invalid.  Defaulting to hourly." );
272             cronString = CRON_HOURLY;
273         }
274
275         try
276         {
277             CronTrigger trigger = new CronTrigger( DATABASE_JOB_TRIGGER, DATABASE_SCAN_GROUP, cronString );
278
279             scheduler.scheduleJob( databaseJob, trigger );
280         }
281         catch ( ParseException e )
282         {
283             log.error(
284                 "ParseException in database scanning cron expression, disabling database scanning: " + e.getMessage() );
285         }
286
287     }
288
289     public void stop()
290         throws StoppingException
291     {
292         try
293         {
294             scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
295
296             for ( String job : jobs )
297             {
298                 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
299             }
300             jobs.clear();
301             queuedRepos.clear();
302         }
303         catch ( SchedulerException e )
304         {
305             throw new StoppingException( "Unable to unschedule tasks", e );
306         }
307     }
308
309     public void scheduleDatabaseTasks()
310         throws TaskExecutionException
311     {
312         try
313         {
314             scheduleDatabaseJobs();
315         }
316         catch ( SchedulerException e )
317         {
318             throw new TaskExecutionException( "Unable to schedule repository jobs: " + e.getMessage(), e );
319
320         }
321     }
322
323     public boolean isProcessingAnyRepositoryTask()
324         throws ArchivaException
325     {
326         List<? extends Task> queue = null;
327
328         try
329         {
330             queue = repositoryScanningQueue.getQueueSnapshot();
331         }
332         catch ( TaskQueueException e )
333         {
334             throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
335         }
336
337         return !queue.isEmpty();
338     }
339
340     public boolean isProcessingRepositoryTask( String repositoryId )
341         throws ArchivaException
342     {
343         List<? extends Task> queue = null;
344
345         try
346         {
347             queue = repositoryScanningQueue.getQueueSnapshot();
348         }
349         catch ( TaskQueueException e )
350         {
351             throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
352         }
353
354         return CollectionUtils.exists( queue, new RepositoryTaskSelectionPredicate( repositoryId ) );
355     }
356
357     public boolean isProcessingDatabaseTask()
358         throws ArchivaException
359     {
360         List<? extends Task> queue = null;
361
362         try
363         {
364             queue = databaseUpdateQueue.getQueueSnapshot();
365         }
366         catch ( TaskQueueException e )
367         {
368             throw new ArchivaException( "Unable to get database update queue:" + e.getMessage(), e );
369         }
370
371         return !queue.isEmpty();
372     }
373
374     public void queueRepositoryTask( RepositoryTask task )
375         throws TaskQueueException
376     {
377         repositoryScanningQueue.put( task );
378     }
379
380     public void queueDatabaseTask( DatabaseTask task )
381         throws TaskQueueException
382     {
383         databaseUpdateQueue.put( task );
384     }
385
386     public void configurationEvent( ConfigurationEvent event )
387     {
388         if ( event.getType() == ConfigurationEvent.SAVED )
389         {
390             try
391             {
392                 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
393
394                 scheduleDatabaseJobs();
395             }
396             catch ( SchedulerException e )
397             {
398                 log.error( "Error restarting the database scanning job after property change." );
399             }
400
401             for ( String job : jobs )
402             {
403                 try
404                 {
405                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
406                 }
407                 catch ( SchedulerException e )
408                 {
409                     log.error( "Error restarting the repository scanning job after property change." );
410                 }
411             }
412             jobs.clear();
413
414             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration().getManagedRepositories();
415
416             for ( ManagedRepositoryConfiguration repoConfig : repositories )
417             {
418                 if ( repoConfig.getRefreshCronExpression() != null )
419                 {
420                     try
421                     {
422                         scheduleRepositoryJobs( repoConfig );
423                     }
424                     catch ( SchedulerException e )
425                     {
426                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
427                     }
428                 }
429             }
430         }
431     }
432 }