1 package org.apache.maven.archiva.scheduled;
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
22 import org.apache.commons.collections.CollectionUtils;
23 import org.apache.maven.archiva.common.ArchivaException;
24 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
25 import org.apache.maven.archiva.configuration.ConfigurationEvent;
26 import org.apache.maven.archiva.configuration.ConfigurationListener;
27 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
28 import org.apache.maven.archiva.database.ArchivaDAO;
29 import org.apache.maven.archiva.database.constraints.MostRecentRepositoryScanStatistics;
30 import org.apache.maven.archiva.repository.scanner.RepositoryScanStatistics;
31 import org.apache.maven.archiva.scheduled.tasks.ArchivaTask;
32 import org.apache.maven.archiva.scheduled.tasks.DatabaseTask;
33 import org.apache.maven.archiva.scheduled.tasks.RepositoryTask;
34 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskSelectionPredicate;
35 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
36 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
37 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
38 import org.codehaus.plexus.scheduler.CronExpressionValidator;
39 import org.codehaus.plexus.scheduler.Scheduler;
40 import org.codehaus.plexus.taskqueue.Task;
41 import org.codehaus.plexus.taskqueue.TaskQueue;
42 import org.codehaus.plexus.taskqueue.TaskQueueException;
43 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
44 import org.quartz.CronTrigger;
45 import org.quartz.JobDataMap;
46 import org.quartz.JobDetail;
47 import org.quartz.SchedulerException;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import java.text.ParseException;
52 import java.util.ArrayList;
53 import java.util.HashSet;
54 import java.util.List;
58 * Default implementation of a scheduling component for archiva.
60 * @plexus.component role="org.apache.maven.archiva.scheduled.ArchivaTaskScheduler" role-hint="default"
62 public class DefaultArchivaTaskScheduler
63 implements ArchivaTaskScheduler, Startable, ConfigurationListener
65 private Logger log = LoggerFactory.getLogger( DefaultArchivaTaskScheduler.class );
70 private Scheduler scheduler;
73 * @plexus.requirement role-hint="database-update"
75 private TaskQueue databaseUpdateQueue;
78 * @plexus.requirement role-hint="repository-scanning"
80 private TaskQueue repositoryScanningQueue;
85 private ArchivaConfiguration archivaConfiguration;
88 * @plexus.requirement role-hint="jdo"
90 private ArchivaDAO dao;
92 public static final String DATABASE_SCAN_GROUP = "database-group";
94 public static final String DATABASE_JOB = "database-job";
96 public static final String DATABASE_JOB_TRIGGER = "database-job-trigger";
98 public static final String REPOSITORY_SCAN_GROUP = "repository-group";
100 public static final String REPOSITORY_JOB = "repository-job";
102 public static final String REPOSITORY_JOB_TRIGGER = "repository-job-trigger";
104 public static final String CRON_HOURLY = "0 0 * * * ?";
106 private Set<String> jobs = new HashSet<String>();
108 private List<String> queuedRepos = new ArrayList<String>();
110 public void startup()
111 throws ArchivaException
113 archivaConfiguration.addListener( this );
119 catch ( StartingException e )
121 throw new ArchivaException( e.getMessage(), e );
126 throws StartingException
130 List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
131 .getManagedRepositories();
133 for ( ManagedRepositoryConfiguration repoConfig : repositories )
135 if ( repoConfig.isScanned() )
137 scheduleRepositoryJobs( repoConfig );
139 if( !isPreviouslyScanned( repoConfig ) )
141 queueInitialRepoScan( repoConfig );
146 scheduleDatabaseJobs();
148 catch ( SchedulerException e )
150 throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
154 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
156 List<RepositoryScanStatistics> results =
157 dao.query( new MostRecentRepositoryScanStatistics( repoConfig.getId() ) );
159 if ( results != null && !results.isEmpty() )
167 // MRM-848: Pre-configured repository initially appear to be empty
168 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
170 String repoId = repoConfig.getId();
172 RepositoryTask task = new RepositoryTask();
173 task.setRepositoryId( repoId );
174 task.setName( REPOSITORY_JOB + ":" + repoId + ":initial-scan" );
175 task.setQueuePolicy( ArchivaTask.QUEUE_POLICY_WAIT );
177 boolean scheduleTask = false;
179 if ( queuedRepos.contains( repoId ) )
181 log.error( "Repository [" + repoId + "] is currently being processed or is already queued." );
192 queuedRepos.add( repoConfig.getId() );
193 this.queueRepositoryTask( task );
195 catch ( TaskQueueException e )
197 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
202 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
203 throws SchedulerException
205 if ( repoConfig.getRefreshCronExpression() == null )
207 log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
211 if ( !repoConfig.isScanned() )
213 log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
217 // get the cron string for these database scanning jobs
218 String cronString = repoConfig.getRefreshCronExpression();
220 CronExpressionValidator cronValidator = new CronExpressionValidator();
221 if ( !cronValidator.validate( cronString ) )
223 log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
224 "] is invalid. Defaulting to hourly." );
225 cronString = CRON_HOURLY;
228 // setup the unprocessed artifact job
229 JobDetail repositoryJob =
230 new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
232 JobDataMap dataMap = new JobDataMap();
233 dataMap.put( RepositoryTaskJob.TASK_QUEUE, repositoryScanningQueue );
234 dataMap.put( RepositoryTaskJob.TASK_QUEUE_POLICY, ArchivaTask.QUEUE_POLICY_WAIT );
235 dataMap.put( RepositoryTaskJob.TASK_REPOSITORY, repoConfig.getId() );
236 repositoryJob.setJobDataMap( dataMap );
240 CronTrigger trigger =
241 new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
243 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
244 scheduler.scheduleJob( repositoryJob, trigger );
246 catch ( ParseException e )
249 "ParseException in repository scanning cron expression, disabling repository scanning for '" +
250 repoConfig.getId() + "': " + e.getMessage() );
255 private synchronized void scheduleDatabaseJobs()
256 throws SchedulerException
258 String cronString = archivaConfiguration.getConfiguration().getDatabaseScanning().getCronExpression();
260 // setup the unprocessed artifact job
261 JobDetail databaseJob = new JobDetail( DATABASE_JOB, DATABASE_SCAN_GROUP, DatabaseTaskJob.class );
263 JobDataMap dataMap = new JobDataMap();
264 dataMap.put( DatabaseTaskJob.TASK_QUEUE, databaseUpdateQueue );
265 databaseJob.setJobDataMap( dataMap );
267 CronExpressionValidator cronValidator = new CronExpressionValidator();
268 if ( !cronValidator.validate( cronString ) )
271 "Cron expression [" + cronString + "] for database update is invalid. Defaulting to hourly." );
272 cronString = CRON_HOURLY;
277 CronTrigger trigger = new CronTrigger( DATABASE_JOB_TRIGGER, DATABASE_SCAN_GROUP, cronString );
279 scheduler.scheduleJob( databaseJob, trigger );
281 catch ( ParseException e )
284 "ParseException in database scanning cron expression, disabling database scanning: " + e.getMessage() );
290 throws StoppingException
294 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
296 for ( String job : jobs )
298 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
303 catch ( SchedulerException e )
305 throw new StoppingException( "Unable to unschedule tasks", e );
309 public void scheduleDatabaseTasks()
310 throws TaskExecutionException
314 scheduleDatabaseJobs();
316 catch ( SchedulerException e )
318 throw new TaskExecutionException( "Unable to schedule repository jobs: " + e.getMessage(), e );
323 public boolean isProcessingAnyRepositoryTask()
324 throws ArchivaException
326 List<? extends Task> queue = null;
330 queue = repositoryScanningQueue.getQueueSnapshot();
332 catch ( TaskQueueException e )
334 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
337 return !queue.isEmpty();
340 public boolean isProcessingRepositoryTask( String repositoryId )
341 throws ArchivaException
343 List<? extends Task> queue = null;
347 queue = repositoryScanningQueue.getQueueSnapshot();
349 catch ( TaskQueueException e )
351 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
354 return CollectionUtils.exists( queue, new RepositoryTaskSelectionPredicate( repositoryId ) );
357 public boolean isProcessingDatabaseTask()
358 throws ArchivaException
360 List<? extends Task> queue = null;
364 queue = databaseUpdateQueue.getQueueSnapshot();
366 catch ( TaskQueueException e )
368 throw new ArchivaException( "Unable to get database update queue:" + e.getMessage(), e );
371 return !queue.isEmpty();
374 public void queueRepositoryTask( RepositoryTask task )
375 throws TaskQueueException
377 repositoryScanningQueue.put( task );
380 public void queueDatabaseTask( DatabaseTask task )
381 throws TaskQueueException
383 databaseUpdateQueue.put( task );
386 public void configurationEvent( ConfigurationEvent event )
388 if ( event.getType() == ConfigurationEvent.SAVED )
392 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
394 scheduleDatabaseJobs();
396 catch ( SchedulerException e )
398 log.error( "Error restarting the database scanning job after property change." );
401 for ( String job : jobs )
405 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
407 catch ( SchedulerException e )
409 log.error( "Error restarting the repository scanning job after property change." );
414 List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration().getManagedRepositories();
416 for ( ManagedRepositoryConfiguration repoConfig : repositories )
418 if ( repoConfig.getRefreshCronExpression() != null )
422 scheduleRepositoryJobs( repoConfig );
424 catch ( SchedulerException e )
426 log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );