1 package org.apache.maven.archiva.scheduled;
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
22 import org.apache.commons.collections.CollectionUtils;
23 import org.apache.maven.archiva.common.ArchivaException;
24 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
25 import org.apache.maven.archiva.configuration.ConfigurationEvent;
26 import org.apache.maven.archiva.configuration.ConfigurationListener;
27 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
28 import org.apache.maven.archiva.database.ArchivaDAO;
29 import org.apache.maven.archiva.database.constraints.MostRecentRepositoryScanStatistics;
30 import org.apache.maven.archiva.repository.scanner.RepositoryScanStatistics;
31 import org.apache.maven.archiva.scheduled.tasks.ArchivaTask;
32 import org.apache.maven.archiva.scheduled.tasks.DatabaseTask;
33 import org.apache.maven.archiva.scheduled.tasks.RepositoryTask;
34 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskSelectionPredicate;
35 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
36 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
37 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
38 import org.codehaus.plexus.scheduler.CronExpressionValidator;
39 import org.codehaus.plexus.scheduler.Scheduler;
40 import org.codehaus.plexus.taskqueue.Task;
41 import org.codehaus.plexus.taskqueue.TaskQueue;
42 import org.codehaus.plexus.taskqueue.TaskQueueException;
43 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
44 import org.quartz.CronTrigger;
45 import org.quartz.JobDataMap;
46 import org.quartz.JobDetail;
47 import org.quartz.SchedulerException;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import java.text.ParseException;
52 import java.util.ArrayList;
53 import java.util.HashSet;
54 import java.util.List;
58 * Default implementation of a scheduling component for archiva.
60 * @plexus.component role="org.apache.maven.archiva.scheduled.ArchivaTaskScheduler" role-hint="default"
62 public class DefaultArchivaTaskScheduler
63 implements ArchivaTaskScheduler, Startable, ConfigurationListener
65 private Logger log = LoggerFactory.getLogger( DefaultArchivaTaskScheduler.class );
70 private Scheduler scheduler;
73 * @plexus.requirement role-hint="database-update"
75 private TaskQueue databaseUpdateQueue;
78 * @plexus.requirement role-hint="repository-scanning"
80 private TaskQueue repositoryScanningQueue;
85 private ArchivaConfiguration archivaConfiguration;
88 * @plexus.requirement role-hint="jdo"
90 private ArchivaDAO dao;
92 public static final String DATABASE_SCAN_GROUP = "database-group";
94 public static final String DATABASE_JOB = "database-job";
96 public static final String DATABASE_JOB_TRIGGER = "database-job-trigger";
98 public static final String REPOSITORY_SCAN_GROUP = "repository-group";
100 public static final String REPOSITORY_JOB = "repository-job";
102 public static final String REPOSITORY_JOB_TRIGGER = "repository-job-trigger";
104 public static final String CRON_HOURLY = "0 0 * * * ?";
106 private Set<String> jobs = new HashSet<String>();
108 private List<String> queuedRepos = new ArrayList<String>();
110 public void startup()
111 throws ArchivaException
113 archivaConfiguration.addListener( this );
119 catch ( StartingException e )
121 throw new ArchivaException( e.getMessage(), e );
126 throws StartingException
130 List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
131 .getManagedRepositories();
133 for ( ManagedRepositoryConfiguration repoConfig : repositories )
135 if ( repoConfig.isScanned() )
137 scheduleRepositoryJobs( repoConfig );
139 if( !isPreviouslyScanned( repoConfig ) )
141 queueInitialRepoScan( repoConfig );
146 scheduleDatabaseJobs();
148 catch ( SchedulerException e )
150 throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
154 @SuppressWarnings("unchecked")
155 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
157 List<RepositoryScanStatistics> results =
158 (List<RepositoryScanStatistics>) dao.query( new MostRecentRepositoryScanStatistics( repoConfig.getId() ) );
160 if ( results != null && !results.isEmpty() )
168 // MRM-848: Pre-configured repository initially appear to be empty
169 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
171 String repoId = repoConfig.getId();
173 RepositoryTask task = new RepositoryTask();
174 task.setRepositoryId( repoId );
175 task.setName( REPOSITORY_JOB + ":" + repoId + ":initial-scan" );
176 task.setQueuePolicy( ArchivaTask.QUEUE_POLICY_WAIT );
178 boolean scheduleTask = false;
180 if ( queuedRepos.contains( repoId ) )
182 log.error( "Repository [" + repoId + "] is currently being processed or is already queued." );
193 queuedRepos.add( repoConfig.getId() );
194 this.queueRepositoryTask( task );
196 catch ( TaskQueueException e )
198 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
203 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
204 throws SchedulerException
206 if ( repoConfig.getRefreshCronExpression() == null )
208 log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
212 if ( !repoConfig.isScanned() )
214 log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
218 // get the cron string for these database scanning jobs
219 String cronString = repoConfig.getRefreshCronExpression();
221 CronExpressionValidator cronValidator = new CronExpressionValidator();
222 if ( !cronValidator.validate( cronString ) )
224 log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
225 "] is invalid. Defaulting to hourly." );
226 cronString = CRON_HOURLY;
229 // setup the unprocessed artifact job
230 JobDetail repositoryJob =
231 new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
233 JobDataMap dataMap = new JobDataMap();
234 dataMap.put( RepositoryTaskJob.TASK_QUEUE, repositoryScanningQueue );
235 dataMap.put( RepositoryTaskJob.TASK_QUEUE_POLICY, ArchivaTask.QUEUE_POLICY_WAIT );
236 dataMap.put( RepositoryTaskJob.TASK_REPOSITORY, repoConfig.getId() );
237 repositoryJob.setJobDataMap( dataMap );
241 CronTrigger trigger =
242 new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
244 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
245 scheduler.scheduleJob( repositoryJob, trigger );
247 catch ( ParseException e )
250 "ParseException in repository scanning cron expression, disabling repository scanning for '" +
251 repoConfig.getId() + "': " + e.getMessage() );
256 private synchronized void scheduleDatabaseJobs()
257 throws SchedulerException
259 String cronString = archivaConfiguration.getConfiguration().getDatabaseScanning().getCronExpression();
261 // setup the unprocessed artifact job
262 JobDetail databaseJob = new JobDetail( DATABASE_JOB, DATABASE_SCAN_GROUP, DatabaseTaskJob.class );
264 JobDataMap dataMap = new JobDataMap();
265 dataMap.put( DatabaseTaskJob.TASK_QUEUE, databaseUpdateQueue );
266 databaseJob.setJobDataMap( dataMap );
268 CronExpressionValidator cronValidator = new CronExpressionValidator();
269 if ( !cronValidator.validate( cronString ) )
272 "Cron expression [" + cronString + "] for database update is invalid. Defaulting to hourly." );
273 cronString = CRON_HOURLY;
278 CronTrigger trigger = new CronTrigger( DATABASE_JOB_TRIGGER, DATABASE_SCAN_GROUP, cronString );
280 scheduler.scheduleJob( databaseJob, trigger );
282 catch ( ParseException e )
285 "ParseException in database scanning cron expression, disabling database scanning: " + e.getMessage() );
291 throws StoppingException
295 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
297 for ( String job : jobs )
299 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
304 catch ( SchedulerException e )
306 throw new StoppingException( "Unable to unschedule tasks", e );
310 public void scheduleDatabaseTasks()
311 throws TaskExecutionException
315 scheduleDatabaseJobs();
317 catch ( SchedulerException e )
319 throw new TaskExecutionException( "Unable to schedule repository jobs: " + e.getMessage(), e );
324 @SuppressWarnings("unchecked")
325 public boolean isProcessingAnyRepositoryTask()
326 throws ArchivaException
328 List<? extends Task> queue = null;
332 queue = repositoryScanningQueue.getQueueSnapshot();
334 catch ( TaskQueueException e )
336 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
339 return !queue.isEmpty();
342 @SuppressWarnings("unchecked")
343 public boolean isProcessingRepositoryTask( String repositoryId )
344 throws ArchivaException
346 List<? extends Task> queue = null;
350 queue = repositoryScanningQueue.getQueueSnapshot();
352 catch ( TaskQueueException e )
354 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
357 return CollectionUtils.exists( queue, new RepositoryTaskSelectionPredicate( repositoryId ) );
360 @SuppressWarnings("unchecked")
361 public boolean isProcessingDatabaseTask()
362 throws ArchivaException
364 List<? extends Task> queue = null;
368 queue = databaseUpdateQueue.getQueueSnapshot();
370 catch ( TaskQueueException e )
372 throw new ArchivaException( "Unable to get database update queue:" + e.getMessage(), e );
375 return !queue.isEmpty();
378 public void queueRepositoryTask( RepositoryTask task )
379 throws TaskQueueException
381 repositoryScanningQueue.put( task );
384 public void queueDatabaseTask( DatabaseTask task )
385 throws TaskQueueException
387 databaseUpdateQueue.put( task );
390 public void configurationEvent( ConfigurationEvent event )
392 if ( event.getType() == ConfigurationEvent.SAVED )
396 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
398 scheduleDatabaseJobs();
400 catch ( SchedulerException e )
402 log.error( "Error restarting the database scanning job after property change." );
405 for ( String job : jobs )
409 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
411 catch ( SchedulerException e )
413 log.error( "Error restarting the repository scanning job after property change." );
418 List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration().getManagedRepositories();
420 for ( ManagedRepositoryConfiguration repoConfig : repositories )
422 if ( repoConfig.getRefreshCronExpression() != null )
426 scheduleRepositoryJobs( repoConfig );
428 catch ( SchedulerException e )
430 log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );