]> source.dussan.org Git - archiva.git/blob
599de9736505a6e71887b5ce08d4556dfdc91b91
[archiva.git] /
1 package org.apache.maven.archiva.scheduled;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.commons.collections.CollectionUtils;
23 import org.apache.maven.archiva.common.ArchivaException;
24 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
25 import org.apache.maven.archiva.configuration.ConfigurationEvent;
26 import org.apache.maven.archiva.configuration.ConfigurationListener;
27 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
28 import org.apache.maven.archiva.database.ArchivaDAO;
29 import org.apache.maven.archiva.database.constraints.MostRecentRepositoryScanStatistics;
30 import org.apache.maven.archiva.repository.scanner.RepositoryScanStatistics;
31 import org.apache.maven.archiva.scheduled.tasks.ArchivaTask;
32 import org.apache.maven.archiva.scheduled.tasks.DatabaseTask;
33 import org.apache.maven.archiva.scheduled.tasks.RepositoryTask;
34 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskSelectionPredicate;
35 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
36 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
37 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
38 import org.codehaus.plexus.scheduler.CronExpressionValidator;
39 import org.codehaus.plexus.scheduler.Scheduler;
40 import org.codehaus.plexus.taskqueue.Task;
41 import org.codehaus.plexus.taskqueue.TaskQueue;
42 import org.codehaus.plexus.taskqueue.TaskQueueException;
43 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
44 import org.quartz.CronTrigger;
45 import org.quartz.JobDataMap;
46 import org.quartz.JobDetail;
47 import org.quartz.SchedulerException;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import java.text.ParseException;
52 import java.util.ArrayList;
53 import java.util.HashSet;
54 import java.util.List;
55 import java.util.Set;
56
57 /**
58  * Default implementation of a scheduling component for archiva.
59  *
60  * @plexus.component role="org.apache.maven.archiva.scheduled.ArchivaTaskScheduler" role-hint="default"
61  */
62 public class DefaultArchivaTaskScheduler
63     implements ArchivaTaskScheduler, Startable, ConfigurationListener
64 {
65     private Logger log = LoggerFactory.getLogger( DefaultArchivaTaskScheduler.class );
66     
67     /**
68      * @plexus.requirement
69      */
70     private Scheduler scheduler;
71
72     /**
73      * @plexus.requirement role-hint="database-update"
74      */
75     private TaskQueue databaseUpdateQueue;
76
77     /**
78      * @plexus.requirement role-hint="repository-scanning"
79      */
80     private TaskQueue repositoryScanningQueue;
81
82     /**
83      * @plexus.requirement
84      */
85     private ArchivaConfiguration archivaConfiguration;
86     
87     /**
88      * @plexus.requirement role-hint="jdo"
89      */
90     private ArchivaDAO dao;
91
92     public static final String DATABASE_SCAN_GROUP = "database-group";
93
94     public static final String DATABASE_JOB = "database-job";
95
96     public static final String DATABASE_JOB_TRIGGER = "database-job-trigger";
97
98     public static final String REPOSITORY_SCAN_GROUP = "repository-group";
99
100     public static final String REPOSITORY_JOB = "repository-job";
101
102     public static final String REPOSITORY_JOB_TRIGGER = "repository-job-trigger";
103
104     public static final String CRON_HOURLY = "0 0 * * * ?";
105
106     private Set<String> jobs = new HashSet<String>();
107     
108     private List<String> queuedRepos = new ArrayList<String>();
109
110     public void startup()
111         throws ArchivaException
112     {
113         archivaConfiguration.addListener( this );
114
115         try
116         {
117             start();
118         }
119         catch ( StartingException e )
120         {
121             throw new ArchivaException( e.getMessage(), e );
122         }
123     }
124     
125     public void start()
126         throws StartingException
127     {
128         try
129         {
130             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
131                 .getManagedRepositories();
132
133             for ( ManagedRepositoryConfiguration repoConfig : repositories )
134             {
135                 if ( repoConfig.isScanned() )
136                 {
137                     scheduleRepositoryJobs( repoConfig );
138                     
139                     if( !isPreviouslyScanned( repoConfig ) )
140                     {
141                         queueInitialRepoScan( repoConfig );
142                     }
143                 }
144             }
145
146             scheduleDatabaseJobs();
147         }
148         catch ( SchedulerException e )
149         {
150             throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
151         }
152     }
153
154     @SuppressWarnings("unchecked")
155     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
156     {
157         List<RepositoryScanStatistics> results =
158             (List<RepositoryScanStatistics>) dao.query( new MostRecentRepositoryScanStatistics( repoConfig.getId() ) );
159
160         if ( results != null && !results.isEmpty() )
161         {
162             return true;
163         }
164
165         return false;
166     }
167     
168     // MRM-848: Pre-configured repository initially appear to be empty
169     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
170     {
171         String repoId = repoConfig.getId();
172
173         RepositoryTask task = new RepositoryTask();
174         task.setRepositoryId( repoId );
175         task.setName( REPOSITORY_JOB + ":" + repoId + ":initial-scan" );
176         task.setQueuePolicy( ArchivaTask.QUEUE_POLICY_WAIT );
177
178         boolean scheduleTask = false;
179
180         if ( queuedRepos.contains( repoId ) )
181         {
182             log.error( "Repository [" + repoId + "] is currently being processed or is already queued." );
183         }
184         else
185         {
186             scheduleTask = true;
187         }
188
189         if ( scheduleTask )
190         {
191             try
192             {
193                 queuedRepos.add( repoConfig.getId() );
194                 this.queueRepositoryTask( task );
195             }
196             catch ( TaskQueueException e )
197             {
198                 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
199             }
200         }
201     }
202     
203     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
204         throws SchedulerException
205     {
206         if ( repoConfig.getRefreshCronExpression() == null )
207         {
208             log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
209             return;
210         }
211         
212         if ( !repoConfig.isScanned() )
213         {
214             log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
215             return;
216         }
217
218         // get the cron string for these database scanning jobs
219         String cronString = repoConfig.getRefreshCronExpression();
220
221         CronExpressionValidator cronValidator = new CronExpressionValidator();
222         if ( !cronValidator.validate( cronString ) )
223         {
224             log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
225                 "] is invalid.  Defaulting to hourly." );
226             cronString = CRON_HOURLY;
227         }
228
229         // setup the unprocessed artifact job
230         JobDetail repositoryJob =
231             new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
232
233         JobDataMap dataMap = new JobDataMap();
234         dataMap.put( RepositoryTaskJob.TASK_QUEUE, repositoryScanningQueue );
235         dataMap.put( RepositoryTaskJob.TASK_QUEUE_POLICY, ArchivaTask.QUEUE_POLICY_WAIT );
236         dataMap.put( RepositoryTaskJob.TASK_REPOSITORY, repoConfig.getId() );
237         repositoryJob.setJobDataMap( dataMap );
238
239         try
240         {
241             CronTrigger trigger =
242                 new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
243
244             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
245             scheduler.scheduleJob( repositoryJob, trigger );
246         }
247         catch ( ParseException e )
248         {
249             log.error(
250                 "ParseException in repository scanning cron expression, disabling repository scanning for '" +
251                     repoConfig.getId() + "': " + e.getMessage() );
252         }
253
254     }
255
256     private synchronized void scheduleDatabaseJobs()
257         throws SchedulerException
258     {
259         String cronString = archivaConfiguration.getConfiguration().getDatabaseScanning().getCronExpression();
260
261         // setup the unprocessed artifact job
262         JobDetail databaseJob = new JobDetail( DATABASE_JOB, DATABASE_SCAN_GROUP, DatabaseTaskJob.class );
263
264         JobDataMap dataMap = new JobDataMap();
265         dataMap.put( DatabaseTaskJob.TASK_QUEUE, databaseUpdateQueue );
266         databaseJob.setJobDataMap( dataMap );
267
268         CronExpressionValidator cronValidator = new CronExpressionValidator();
269         if ( !cronValidator.validate( cronString ) )
270         {
271             log.warn(
272                 "Cron expression [" + cronString + "] for database update is invalid.  Defaulting to hourly." );
273             cronString = CRON_HOURLY;
274         }
275
276         try
277         {
278             CronTrigger trigger = new CronTrigger( DATABASE_JOB_TRIGGER, DATABASE_SCAN_GROUP, cronString );
279
280             scheduler.scheduleJob( databaseJob, trigger );
281         }
282         catch ( ParseException e )
283         {
284             log.error(
285                 "ParseException in database scanning cron expression, disabling database scanning: " + e.getMessage() );
286         }
287
288     }
289
290     public void stop()
291         throws StoppingException
292     {
293         try
294         {
295             scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
296
297             for ( String job : jobs )
298             {
299                 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
300             }
301             jobs.clear();
302             queuedRepos.clear();
303         }
304         catch ( SchedulerException e )
305         {
306             throw new StoppingException( "Unable to unschedule tasks", e );
307         }
308     }
309
310     public void scheduleDatabaseTasks()
311         throws TaskExecutionException
312     {
313         try
314         {
315             scheduleDatabaseJobs();
316         }
317         catch ( SchedulerException e )
318         {
319             throw new TaskExecutionException( "Unable to schedule repository jobs: " + e.getMessage(), e );
320
321         }
322     }
323
324     @SuppressWarnings("unchecked")
325     public boolean isProcessingAnyRepositoryTask()
326         throws ArchivaException
327     {
328         List<? extends Task> queue = null;
329
330         try
331         {
332             queue = repositoryScanningQueue.getQueueSnapshot();
333         }
334         catch ( TaskQueueException e )
335         {
336             throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
337         }
338
339         return !queue.isEmpty();
340     }
341
342     @SuppressWarnings("unchecked")
343     public boolean isProcessingRepositoryTask( String repositoryId )
344         throws ArchivaException
345     {
346         List<? extends Task> queue = null;
347
348         try
349         {
350             queue = repositoryScanningQueue.getQueueSnapshot();
351         }
352         catch ( TaskQueueException e )
353         {
354             throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
355         }
356
357         return CollectionUtils.exists( queue, new RepositoryTaskSelectionPredicate( repositoryId ) );
358     }
359
360     @SuppressWarnings("unchecked")
361     public boolean isProcessingDatabaseTask()
362         throws ArchivaException
363     {
364         List<? extends Task> queue = null;
365
366         try
367         {
368             queue = databaseUpdateQueue.getQueueSnapshot();
369         }
370         catch ( TaskQueueException e )
371         {
372             throw new ArchivaException( "Unable to get database update queue:" + e.getMessage(), e );
373         }
374
375         return !queue.isEmpty();
376     }
377
378     public void queueRepositoryTask( RepositoryTask task )
379         throws TaskQueueException
380     {
381         repositoryScanningQueue.put( task );
382     }
383
384     public void queueDatabaseTask( DatabaseTask task )
385         throws TaskQueueException
386     {
387         databaseUpdateQueue.put( task );
388     }
389
390     public void configurationEvent( ConfigurationEvent event )
391     {
392         if ( event.getType() == ConfigurationEvent.SAVED )
393         {
394             try
395             {
396                 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
397
398                 scheduleDatabaseJobs();
399             }
400             catch ( SchedulerException e )
401             {
402                 log.error( "Error restarting the database scanning job after property change." );
403             }
404
405             for ( String job : jobs )
406             {
407                 try
408                 {
409                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
410                 }
411                 catch ( SchedulerException e )
412                 {
413                     log.error( "Error restarting the repository scanning job after property change." );
414                 }
415             }
416             jobs.clear();
417
418             List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration().getManagedRepositories();
419
420             for ( ManagedRepositoryConfiguration repoConfig : repositories )
421             {
422                 if ( repoConfig.getRefreshCronExpression() != null )
423                 {
424                     try
425                     {
426                         scheduleRepositoryJobs( repoConfig );
427                     }
428                     catch ( SchedulerException e )
429                     {
430                         log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
431                     }
432                 }
433             }
434         }
435     }
436 }