1 package org.apache.maven.archiva.scheduled;
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
22 import org.apache.commons.collections.CollectionUtils;
23 import org.apache.maven.archiva.common.ArchivaException;
24 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
25 import org.apache.maven.archiva.configuration.ConfigurationEvent;
26 import org.apache.maven.archiva.configuration.ConfigurationListener;
27 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
28 import org.apache.maven.archiva.database.ArchivaDAO;
29 import org.apache.maven.archiva.database.constraints.MostRecentRepositoryScanStatistics;
30 import org.apache.maven.archiva.repository.scanner.RepositoryScanStatistics;
31 import org.apache.maven.archiva.scheduled.tasks.ArchivaTask;
32 import org.apache.maven.archiva.scheduled.tasks.DatabaseTask;
33 import org.apache.maven.archiva.scheduled.tasks.RepositoryTask;
34 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskNameSelectionPredicate;
35 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskSelectionPredicate;
36 import org.apache.maven.archiva.scheduled.tasks.TaskCreator;
37 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
38 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
39 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
40 import org.codehaus.plexus.scheduler.CronExpressionValidator;
41 import org.codehaus.plexus.scheduler.Scheduler;
42 import org.codehaus.plexus.taskqueue.Task;
43 import org.codehaus.plexus.taskqueue.TaskQueue;
44 import org.codehaus.plexus.taskqueue.TaskQueueException;
45 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
46 import org.quartz.CronTrigger;
47 import org.quartz.JobDataMap;
48 import org.quartz.JobDetail;
49 import org.quartz.SchedulerException;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 import java.text.ParseException;
54 import java.util.ArrayList;
55 import java.util.HashSet;
56 import java.util.List;
60 * Default implementation of a scheduling component for archiva.
62 * @plexus.component role="org.apache.maven.archiva.scheduled.ArchivaTaskScheduler" role-hint="default"
64 public class DefaultArchivaTaskScheduler
65 implements ArchivaTaskScheduler, Startable, ConfigurationListener
67 private Logger log = LoggerFactory.getLogger( DefaultArchivaTaskScheduler.class );
72 private Scheduler scheduler;
75 * @plexus.requirement role-hint="database-update"
77 private TaskQueue databaseUpdateQueue;
80 * @plexus.requirement role-hint="repository-scanning"
82 private TaskQueue repositoryScanningQueue;
87 private ArchivaConfiguration archivaConfiguration;
90 * @plexus.requirement role-hint="jdo"
92 private ArchivaDAO dao;
94 public static final String DATABASE_SCAN_GROUP = "database-group";
96 public static final String DATABASE_JOB = "database-job";
98 public static final String DATABASE_JOB_TRIGGER = "database-job-trigger";
100 public static final String REPOSITORY_SCAN_GROUP = "repository-group";
102 public static final String REPOSITORY_JOB = "repository-job";
104 public static final String REPOSITORY_JOB_TRIGGER = "repository-job-trigger";
106 public static final String CRON_HOURLY = "0 0 * * * ?";
108 private Set<String> jobs = new HashSet<String>();
110 private List<String> queuedRepos = new ArrayList<String>();
112 public void startup()
113 throws ArchivaException
115 archivaConfiguration.addListener( this );
121 catch ( StartingException e )
123 throw new ArchivaException( e.getMessage(), e );
128 throws StartingException
132 List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
133 .getManagedRepositories();
135 for ( ManagedRepositoryConfiguration repoConfig : repositories )
137 if ( repoConfig.isScanned() )
139 scheduleRepositoryJobs( repoConfig );
141 if( !isPreviouslyScanned( repoConfig ) )
143 queueInitialRepoScan( repoConfig );
148 scheduleDatabaseJobs();
150 catch ( SchedulerException e )
152 throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
156 @SuppressWarnings("unchecked")
157 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
159 List<RepositoryScanStatistics> results =
160 (List<RepositoryScanStatistics>) dao.query( new MostRecentRepositoryScanStatistics( repoConfig.getId() ) );
162 if ( results != null && !results.isEmpty() )
170 // MRM-848: Pre-configured repository initially appear to be empty
171 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
173 String repoId = repoConfig.getId();
174 RepositoryTask task = TaskCreator.createRepositoryTask( repoId, "initial-scan" );
176 if ( queuedRepos.contains( repoId ) )
178 log.error( "Repository [" + repoId + "] is currently being processed or is already queued." );
184 queuedRepos.add( repoConfig.getId() );
185 this.queueRepositoryTask( task );
187 catch ( TaskQueueException e )
189 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
194 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
195 throws SchedulerException
197 if ( repoConfig.getRefreshCronExpression() == null )
199 log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
203 if ( !repoConfig.isScanned() )
205 log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
209 // get the cron string for these database scanning jobs
210 String cronString = repoConfig.getRefreshCronExpression();
212 CronExpressionValidator cronValidator = new CronExpressionValidator();
213 if ( !cronValidator.validate( cronString ) )
215 log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
216 "] is invalid. Defaulting to hourly." );
217 cronString = CRON_HOURLY;
220 // setup the unprocessed artifact job
221 JobDetail repositoryJob =
222 new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
224 JobDataMap dataMap = new JobDataMap();
225 dataMap.put( RepositoryTaskJob.TASK_QUEUE, repositoryScanningQueue );
226 dataMap.put( RepositoryTaskJob.TASK_QUEUE_POLICY, ArchivaTask.QUEUE_POLICY_WAIT );
227 dataMap.put( RepositoryTaskJob.TASK_REPOSITORY, repoConfig.getId() );
228 repositoryJob.setJobDataMap( dataMap );
232 CronTrigger trigger =
233 new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
235 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
236 scheduler.scheduleJob( repositoryJob, trigger );
238 catch ( ParseException e )
241 "ParseException in repository scanning cron expression, disabling repository scanning for '" +
242 repoConfig.getId() + "': " + e.getMessage() );
247 private synchronized void scheduleDatabaseJobs()
248 throws SchedulerException
250 String cronString = archivaConfiguration.getConfiguration().getDatabaseScanning().getCronExpression();
252 // setup the unprocessed artifact job
253 JobDetail databaseJob = new JobDetail( DATABASE_JOB, DATABASE_SCAN_GROUP, DatabaseTaskJob.class );
255 JobDataMap dataMap = new JobDataMap();
256 dataMap.put( DatabaseTaskJob.TASK_QUEUE, databaseUpdateQueue );
257 databaseJob.setJobDataMap( dataMap );
259 CronExpressionValidator cronValidator = new CronExpressionValidator();
260 if ( !cronValidator.validate( cronString ) )
263 "Cron expression [" + cronString + "] for database update is invalid. Defaulting to hourly." );
264 cronString = CRON_HOURLY;
269 CronTrigger trigger = new CronTrigger( DATABASE_JOB_TRIGGER, DATABASE_SCAN_GROUP, cronString );
271 scheduler.scheduleJob( databaseJob, trigger );
273 catch ( ParseException e )
276 "ParseException in database scanning cron expression, disabling database scanning: " + e.getMessage() );
282 throws StoppingException
286 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
288 for ( String job : jobs )
290 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
295 catch ( SchedulerException e )
297 throw new StoppingException( "Unable to unschedule tasks", e );
301 public void scheduleDatabaseTasks()
302 throws TaskExecutionException
306 scheduleDatabaseJobs();
308 catch ( SchedulerException e )
310 throw new TaskExecutionException( "Unable to schedule repository jobs: " + e.getMessage(), e );
315 @SuppressWarnings("unchecked")
316 public boolean isProcessingAnyRepositoryTask()
317 throws ArchivaException
319 synchronized( repositoryScanningQueue )
321 List<? extends Task> queue = null;
325 queue = repositoryScanningQueue.getQueueSnapshot();
327 catch ( TaskQueueException e )
329 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
332 return !queue.isEmpty();
336 @SuppressWarnings("unchecked")
337 public boolean isProcessingRepositoryTask( String repositoryId )
338 throws ArchivaException
340 synchronized( repositoryScanningQueue )
342 List<? extends Task> queue = null;
346 queue = repositoryScanningQueue.getQueueSnapshot();
348 catch ( TaskQueueException e )
350 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
353 return CollectionUtils.exists( queue, new RepositoryTaskSelectionPredicate( repositoryId ) );
357 @SuppressWarnings("unchecked")
358 public boolean isProcessingRepositoryTaskWithName( String taskName )
359 throws ArchivaException
361 synchronized( repositoryScanningQueue )
363 List<? extends Task> queue = null;
367 queue = repositoryScanningQueue.getQueueSnapshot();
369 catch ( TaskQueueException e )
371 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
374 return CollectionUtils.exists( queue, new RepositoryTaskNameSelectionPredicate( taskName ) );
378 @SuppressWarnings("unchecked")
379 public boolean isProcessingDatabaseTask()
380 throws ArchivaException
382 List<? extends Task> queue = null;
386 queue = databaseUpdateQueue.getQueueSnapshot();
388 catch ( TaskQueueException e )
390 throw new ArchivaException( "Unable to get database update queue:" + e.getMessage(), e );
393 return !queue.isEmpty();
396 public void queueRepositoryTask( RepositoryTask task )
397 throws TaskQueueException
399 synchronized( repositoryScanningQueue )
401 if( task.getResourceFile() != null )
405 if( isProcessingRepositoryTaskWithName( task.getName() ) )
407 log.debug( "Repository task '" + task.getName() + "' is already queued. Skipping task.." );
411 catch ( ArchivaException e )
413 log.warn( "Error occurred while checking if repository task '" + task.getName() +
414 "' is already queued." );
418 // add check if the task is already queued if it is a file scan
419 repositoryScanningQueue.put( task );
423 public void queueDatabaseTask( DatabaseTask task )
424 throws TaskQueueException
426 databaseUpdateQueue.put( task );
429 public void configurationEvent( ConfigurationEvent event )
431 if ( event.getType() == ConfigurationEvent.SAVED )
435 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
437 scheduleDatabaseJobs();
439 catch ( SchedulerException e )
441 log.error( "Error restarting the database scanning job after property change." );
444 for ( String job : jobs )
448 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
450 catch ( SchedulerException e )
452 log.error( "Error restarting the repository scanning job after property change." );
457 List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration().getManagedRepositories();
459 for ( ManagedRepositoryConfiguration repoConfig : repositories )
461 if ( repoConfig.getRefreshCronExpression() != null )
465 scheduleRepositoryJobs( repoConfig );
467 catch ( SchedulerException e )
469 log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );