]> source.dussan.org Git - archiva.git/blob
b93e88cd7a881f2057ced22c0858e55fb3e70b90
[archiva.git] /
1 package org.apache.maven.archiva.scheduled;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.archiva.repository.scanner.RepositoryScanStatistics;
23 import org.apache.commons.collections.CollectionUtils;
24 import org.apache.maven.archiva.common.ArchivaException;
25 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
26 import org.apache.maven.archiva.configuration.ConfigurationEvent;
27 import org.apache.maven.archiva.configuration.ConfigurationListener;
28 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
29 import org.apache.maven.archiva.database.ArchivaDAO;
30 import org.apache.maven.archiva.database.constraints.MostRecentRepositoryScanStatistics;
31 import org.apache.maven.archiva.scheduled.tasks.ArchivaTask;
32 import org.apache.maven.archiva.scheduled.tasks.DatabaseTask;
33 import org.apache.maven.archiva.scheduled.tasks.RepositoryTask;
34 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskSelectionPredicate;
35 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
36 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
37 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
38 import org.codehaus.plexus.scheduler.CronExpressionValidator;
39 import org.codehaus.plexus.scheduler.Scheduler;
40 import org.codehaus.plexus.taskqueue.Task;
41 import org.codehaus.plexus.taskqueue.TaskQueue;
42 import org.codehaus.plexus.taskqueue.TaskQueueException;
43 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
44 import org.quartz.CronTrigger;
45 import org.quartz.JobDataMap;
46 import org.quartz.JobDetail;
47 import org.quartz.SchedulerException;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import java.text.ParseException;
52 import java.util.ArrayList;
53 import java.util.HashSet;
54 import java.util.List;
55 import java.util.Set;
56
57 /**
58  * Default implementation of a scheduling component for archiva.
59  *
60  * @plexus.component role="org.apache.maven.archiva.scheduled.ArchivaTaskScheduler" role-hint="default"
61  */
62 public class DefaultArchivaTaskScheduler
63     implements ArchivaTaskScheduler, Startable, ConfigurationListener
64 {
65     private Logger log = LoggerFactory.getLogger( DefaultArchivaTaskScheduler.class );
66     
67     /**
68      * @plexus.requirement
69      */
70     private Scheduler scheduler;
71
72     /**
73      * @plexus.requirement role-hint="database-update"
74      */
75     private TaskQueue databaseUpdateQueue;
76
77     /**
78      * @plexus.requirement role-hint="repository-scanning"
79      */
80     private TaskQueue repositoryScanningQueue;
81
82     /**
83      * @plexus.requirement
84      */
85     private ArchivaConfiguration archivaConfiguration;
86     
87     /**
88      * @plexus.requirement role-hint="jdo"
89      */
90     private ArchivaDAO dao;
91
92     public static final String DATABASE_SCAN_GROUP = "database-group";
93
94     public static final String DATABASE_JOB = "database-job";
95
96     public static final String DATABASE_JOB_TRIGGER = "database-job-trigger";
97
98     public static final String REPOSITORY_SCAN_GROUP = "repository-group";
99
100     public static final String REPOSITORY_JOB = "repository-job";
101
102     public static final String REPOSITORY_JOB_TRIGGER = "repository-job-trigger";
103
104     public static final String CRON_HOURLY = "0 0 * * * ?";
105
106     private Set<String> jobs = new HashSet<String>();
107     
108     private List<String> queuedRepos = new ArrayList<String>();
109
110     public void startup()
111         throws ArchivaException
112     {
113         archivaConfiguration.addListener( this );
114
115         try
116         {
117             start();
118         }
119         catch ( StartingException e )
120         {
121             throw new ArchivaException( e.getMessage(), e );
122         }
123     }
124     
125     public void start()
126         throws StartingException
127     {
128         try
129         {
130             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
131                 .getManagedRepositories();
132
133             for ( ManagedRepositoryConfiguration repoConfig : repositories )
134             {
135                 if ( repoConfig.isScanned() )
136                 {
137                     scheduleRepositoryJobs( repoConfig );
138                     
139                     if( !isPreviouslyScanned( repoConfig ) )
140                     {
141                         queueInitialRepoScan( repoConfig );
142                     }
143                 }
144             }
145
146             scheduleDatabaseJobs();
147         }
148         catch ( SchedulerException e )
149         {
150             throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
151         }
152     }
153
154     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
155     {
156         List<RepositoryScanStatistics> results =
157             dao.query( new MostRecentRepositoryScanStatistics( repoConfig.getId() ) );
158
159         if ( results != null && !results.isEmpty() )
160         {
161             return true;
162         }
163
164         return false;
165     }
166     
167     // MRM-848: Pre-configured repository initially appear to be empty
168     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
169     {
170         String repoId = repoConfig.getId();
171
172         RepositoryTask task = new RepositoryTask();
173         task.setRepositoryId( repoId );
174         task.setName( REPOSITORY_JOB + ":" + repoId + ":initial-scan" );
175         task.setQueuePolicy( ArchivaTask.QUEUE_POLICY_WAIT );
176
177         boolean scheduleTask = false;
178
179         if ( queuedRepos.contains( repoId ) )
180         {
181             log.error( "Repository [" + repoId + "] is currently being processed or is already queued." );
182         }
183         else
184         {
185             scheduleTask = true;
186         }
187
188         if ( scheduleTask )
189         {
190             try
191             {
192                 queuedRepos.add( repoConfig.getId() );
193                 this.queueRepositoryTask( task );
194             }
195             catch ( TaskQueueException e )
196             {
197                 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
198             }
199         }
200     }
201     
202     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
203         throws SchedulerException
204     {
205         if ( repoConfig.getRefreshCronExpression() == null )
206         {
207             log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
208             return;
209         }
210
211         // get the cron string for these database scanning jobs
212         String cronString = repoConfig.getRefreshCronExpression();
213
214         CronExpressionValidator cronValidator = new CronExpressionValidator();
215         if ( !cronValidator.validate( cronString ) )
216         {
217             log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
218                 "] is invalid.  Defaulting to hourly." );
219             cronString = CRON_HOURLY;
220         }
221
222         // setup the unprocessed artifact job
223         JobDetail repositoryJob =
224             new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
225
226         JobDataMap dataMap = new JobDataMap();
227         dataMap.put( RepositoryTaskJob.TASK_QUEUE, repositoryScanningQueue );
228         dataMap.put( RepositoryTaskJob.TASK_QUEUE_POLICY, ArchivaTask.QUEUE_POLICY_WAIT );
229         dataMap.put( RepositoryTaskJob.TASK_REPOSITORY, repoConfig.getId() );
230         repositoryJob.setJobDataMap( dataMap );
231
232         try
233         {
234             CronTrigger trigger =
235                 new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
236
237             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
238             scheduler.scheduleJob( repositoryJob, trigger );
239         }
240         catch ( ParseException e )
241         {
242             log.error(
243                 "ParseException in repository scanning cron expression, disabling repository scanning for '" +
244                     repoConfig.getId() + "': " + e.getMessage() );
245         }
246
247     }
248
249     private synchronized void scheduleDatabaseJobs()
250         throws SchedulerException
251     {
252         String cronString = archivaConfiguration.getConfiguration().getDatabaseScanning().getCronExpression();
253
254         // setup the unprocessed artifact job
255         JobDetail databaseJob = new JobDetail( DATABASE_JOB, DATABASE_SCAN_GROUP, DatabaseTaskJob.class );
256
257         JobDataMap dataMap = new JobDataMap();
258         dataMap.put( DatabaseTaskJob.TASK_QUEUE, databaseUpdateQueue );
259         databaseJob.setJobDataMap( dataMap );
260
261         CronExpressionValidator cronValidator = new CronExpressionValidator();
262         if ( !cronValidator.validate( cronString ) )
263         {
264             log.warn(
265                 "Cron expression [" + cronString + "] for database update is invalid.  Defaulting to hourly." );
266             cronString = CRON_HOURLY;
267         }
268
269         try
270         {
271             CronTrigger trigger = new CronTrigger( DATABASE_JOB_TRIGGER, DATABASE_SCAN_GROUP, cronString );
272
273             scheduler.scheduleJob( databaseJob, trigger );
274         }
275         catch ( ParseException e )
276         {
277             log.error(
278                 "ParseException in database scanning cron expression, disabling database scanning: " + e.getMessage() );
279         }
280
281     }
282
283     public void stop()
284         throws StoppingException
285     {
286         try
287         {
288             scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
289
290             for ( String job : jobs )
291             {
292                 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
293             }
294             jobs.clear();
295             queuedRepos.clear();
296         }
297         catch ( SchedulerException e )
298         {
299             throw new StoppingException( "Unable to unschedule tasks", e );
300         }
301     }
302
303     public void scheduleDatabaseTasks()
304         throws TaskExecutionException
305     {
306         try
307         {
308             scheduleDatabaseJobs();
309         }
310         catch ( SchedulerException e )
311         {
312             throw new TaskExecutionException( "Unable to schedule repository jobs: " + e.getMessage(), e );
313
314         }
315     }
316
317     public boolean isProcessingAnyRepositoryTask()
318         throws ArchivaException
319     {
320         List<? extends Task> queue = null;
321
322         try
323         {
324             queue = repositoryScanningQueue.getQueueSnapshot();
325         }
326         catch ( TaskQueueException e )
327         {
328             throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
329         }
330
331         return !queue.isEmpty();
332     }
333
334     public boolean isProcessingRepositoryTask( String repositoryId )
335         throws ArchivaException
336     {
337         List<? extends Task> queue = null;
338
339         try
340         {
341             queue = repositoryScanningQueue.getQueueSnapshot();
342         }
343         catch ( TaskQueueException e )
344         {
345             throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
346         }
347
348         return CollectionUtils.exists( queue, new RepositoryTaskSelectionPredicate( repositoryId ) );
349     }
350
351     public boolean isProcessingDatabaseTask()
352         throws ArchivaException
353     {
354         List<? extends Task> queue = null;
355
356         try
357         {
358             queue = databaseUpdateQueue.getQueueSnapshot();
359         }
360         catch ( TaskQueueException e )
361         {
362             throw new ArchivaException( "Unable to get database update queue:" + e.getMessage(), e );
363         }
364
365         return !queue.isEmpty();
366     }
367
368     public void queueRepositoryTask( RepositoryTask task )
369         throws TaskQueueException
370     {
371         repositoryScanningQueue.put( task );
372     }
373
374     public void queueDatabaseTask( DatabaseTask task )
375         throws TaskQueueException
376     {
377         databaseUpdateQueue.put( task );
378     }
379
380     public void configurationEvent( ConfigurationEvent event )
381     {
382         if ( event.getType() == ConfigurationEvent.SAVED )
383         {
384             try
385             {
386                 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
387
388                 scheduleDatabaseJobs();
389             }
390             catch ( SchedulerException e )
391             {
392                 log.error( "Error restarting the database scanning job after property change." );
393             }
394
395             for ( String job : jobs )
396             {
397                 try
398                 {
399                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
400                 }
401                 catch ( SchedulerException e )
402                 {
403                     log.error( "Error restarting the repository scanning job after property change." );
404                 }
405             }
406             jobs.clear();
407
408             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration().getManagedRepositories();
409
410             for ( ManagedRepositoryConfiguration repoConfig : repositories )
411             {
412                 if ( repoConfig.getRefreshCronExpression() != null )
413                 {
414                     try
415                     {
416                         scheduleRepositoryJobs( repoConfig );
417                     }
418                     catch ( SchedulerException e )
419                     {
420                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
421                     }
422                 }
423             }
424         }
425     }
426 }