1 package org.apache.maven.archiva.scheduled;
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
22 import org.apache.archiva.repository.scanner.RepositoryScanStatistics;
23 import org.apache.commons.collections.CollectionUtils;
24 import org.apache.maven.archiva.common.ArchivaException;
25 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
26 import org.apache.maven.archiva.configuration.ConfigurationEvent;
27 import org.apache.maven.archiva.configuration.ConfigurationListener;
28 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
29 import org.apache.maven.archiva.database.ArchivaDAO;
30 import org.apache.maven.archiva.database.constraints.MostRecentRepositoryScanStatistics;
31 import org.apache.maven.archiva.scheduled.tasks.ArchivaTask;
32 import org.apache.maven.archiva.scheduled.tasks.DatabaseTask;
33 import org.apache.maven.archiva.scheduled.tasks.RepositoryTask;
34 import org.apache.maven.archiva.scheduled.tasks.RepositoryTaskSelectionPredicate;
35 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
36 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
37 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
38 import org.codehaus.plexus.scheduler.CronExpressionValidator;
39 import org.codehaus.plexus.scheduler.Scheduler;
40 import org.codehaus.plexus.taskqueue.Task;
41 import org.codehaus.plexus.taskqueue.TaskQueue;
42 import org.codehaus.plexus.taskqueue.TaskQueueException;
43 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
44 import org.quartz.CronTrigger;
45 import org.quartz.JobDataMap;
46 import org.quartz.JobDetail;
47 import org.quartz.SchedulerException;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import java.text.ParseException;
52 import java.util.ArrayList;
53 import java.util.HashSet;
54 import java.util.List;
58 * Default implementation of a scheduling component for archiva.
60 * @plexus.component role="org.apache.maven.archiva.scheduled.ArchivaTaskScheduler" role-hint="default"
62 public class DefaultArchivaTaskScheduler
63 implements ArchivaTaskScheduler, Startable, ConfigurationListener
65 private Logger log = LoggerFactory.getLogger( DefaultArchivaTaskScheduler.class );
70 private Scheduler scheduler;
73 * @plexus.requirement role-hint="database-update"
75 private TaskQueue databaseUpdateQueue;
78 * @plexus.requirement role-hint="repository-scanning"
80 private TaskQueue repositoryScanningQueue;
85 private ArchivaConfiguration archivaConfiguration;
88 * @plexus.requirement role-hint="jdo"
90 private ArchivaDAO dao;
92 public static final String DATABASE_SCAN_GROUP = "database-group";
94 public static final String DATABASE_JOB = "database-job";
96 public static final String DATABASE_JOB_TRIGGER = "database-job-trigger";
98 public static final String REPOSITORY_SCAN_GROUP = "repository-group";
100 public static final String REPOSITORY_JOB = "repository-job";
102 public static final String REPOSITORY_JOB_TRIGGER = "repository-job-trigger";
104 public static final String CRON_HOURLY = "0 0 * * * ?";
106 private Set<String> jobs = new HashSet<String>();
108 private List<String> queuedRepos = new ArrayList<String>();
110 public void startup()
111 throws ArchivaException
113 archivaConfiguration.addListener( this );
119 catch ( StartingException e )
121 throw new ArchivaException( e.getMessage(), e );
126 throws StartingException
130 List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration()
131 .getManagedRepositories();
133 for ( ManagedRepositoryConfiguration repoConfig : repositories )
135 if ( repoConfig.isScanned() )
137 scheduleRepositoryJobs( repoConfig );
139 if( !isPreviouslyScanned( repoConfig ) )
141 queueInitialRepoScan( repoConfig );
146 scheduleDatabaseJobs();
148 catch ( SchedulerException e )
150 throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
154 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
156 List<RepositoryScanStatistics> results =
157 dao.query( new MostRecentRepositoryScanStatistics( repoConfig.getId() ) );
159 if ( results != null && !results.isEmpty() )
167 // MRM-848: Pre-configured repository initially appear to be empty
168 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
170 String repoId = repoConfig.getId();
172 RepositoryTask task = new RepositoryTask();
173 task.setRepositoryId( repoId );
174 task.setName( REPOSITORY_JOB + ":" + repoId + ":initial-scan" );
175 task.setQueuePolicy( ArchivaTask.QUEUE_POLICY_WAIT );
177 boolean scheduleTask = false;
179 if ( queuedRepos.contains( repoId ) )
181 log.error( "Repository [" + repoId + "] is currently being processed or is already queued." );
192 queuedRepos.add( repoConfig.getId() );
193 this.queueRepositoryTask( task );
195 catch ( TaskQueueException e )
197 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
202 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
203 throws SchedulerException
205 if ( repoConfig.getRefreshCronExpression() == null )
207 log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
211 // get the cron string for these database scanning jobs
212 String cronString = repoConfig.getRefreshCronExpression();
214 CronExpressionValidator cronValidator = new CronExpressionValidator();
215 if ( !cronValidator.validate( cronString ) )
217 log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
218 "] is invalid. Defaulting to hourly." );
219 cronString = CRON_HOURLY;
222 // setup the unprocessed artifact job
223 JobDetail repositoryJob =
224 new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
226 JobDataMap dataMap = new JobDataMap();
227 dataMap.put( RepositoryTaskJob.TASK_QUEUE, repositoryScanningQueue );
228 dataMap.put( RepositoryTaskJob.TASK_QUEUE_POLICY, ArchivaTask.QUEUE_POLICY_WAIT );
229 dataMap.put( RepositoryTaskJob.TASK_REPOSITORY, repoConfig.getId() );
230 repositoryJob.setJobDataMap( dataMap );
234 CronTrigger trigger =
235 new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
237 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
238 scheduler.scheduleJob( repositoryJob, trigger );
240 catch ( ParseException e )
243 "ParseException in repository scanning cron expression, disabling repository scanning for '" +
244 repoConfig.getId() + "': " + e.getMessage() );
249 private synchronized void scheduleDatabaseJobs()
250 throws SchedulerException
252 String cronString = archivaConfiguration.getConfiguration().getDatabaseScanning().getCronExpression();
254 // setup the unprocessed artifact job
255 JobDetail databaseJob = new JobDetail( DATABASE_JOB, DATABASE_SCAN_GROUP, DatabaseTaskJob.class );
257 JobDataMap dataMap = new JobDataMap();
258 dataMap.put( DatabaseTaskJob.TASK_QUEUE, databaseUpdateQueue );
259 databaseJob.setJobDataMap( dataMap );
261 CronExpressionValidator cronValidator = new CronExpressionValidator();
262 if ( !cronValidator.validate( cronString ) )
265 "Cron expression [" + cronString + "] for database update is invalid. Defaulting to hourly." );
266 cronString = CRON_HOURLY;
271 CronTrigger trigger = new CronTrigger( DATABASE_JOB_TRIGGER, DATABASE_SCAN_GROUP, cronString );
273 scheduler.scheduleJob( databaseJob, trigger );
275 catch ( ParseException e )
278 "ParseException in database scanning cron expression, disabling database scanning: " + e.getMessage() );
284 throws StoppingException
288 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
290 for ( String job : jobs )
292 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
297 catch ( SchedulerException e )
299 throw new StoppingException( "Unable to unschedule tasks", e );
303 public void scheduleDatabaseTasks()
304 throws TaskExecutionException
308 scheduleDatabaseJobs();
310 catch ( SchedulerException e )
312 throw new TaskExecutionException( "Unable to schedule repository jobs: " + e.getMessage(), e );
317 public boolean isProcessingAnyRepositoryTask()
318 throws ArchivaException
320 List<? extends Task> queue = null;
324 queue = repositoryScanningQueue.getQueueSnapshot();
326 catch ( TaskQueueException e )
328 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
331 return !queue.isEmpty();
334 public boolean isProcessingRepositoryTask( String repositoryId )
335 throws ArchivaException
337 List<? extends Task> queue = null;
341 queue = repositoryScanningQueue.getQueueSnapshot();
343 catch ( TaskQueueException e )
345 throw new ArchivaException( "Unable to get repository scanning queue:" + e.getMessage(), e );
348 return CollectionUtils.exists( queue, new RepositoryTaskSelectionPredicate( repositoryId ) );
351 public boolean isProcessingDatabaseTask()
352 throws ArchivaException
354 List<? extends Task> queue = null;
358 queue = databaseUpdateQueue.getQueueSnapshot();
360 catch ( TaskQueueException e )
362 throw new ArchivaException( "Unable to get database update queue:" + e.getMessage(), e );
365 return !queue.isEmpty();
368 public void queueRepositoryTask( RepositoryTask task )
369 throws TaskQueueException
371 repositoryScanningQueue.put( task );
374 public void queueDatabaseTask( DatabaseTask task )
375 throws TaskQueueException
377 databaseUpdateQueue.put( task );
380 public void configurationEvent( ConfigurationEvent event )
382 if ( event.getType() == ConfigurationEvent.SAVED )
386 scheduler.unscheduleJob( DATABASE_JOB, DATABASE_SCAN_GROUP );
388 scheduleDatabaseJobs();
390 catch ( SchedulerException e )
392 log.error( "Error restarting the database scanning job after property change." );
395 for ( String job : jobs )
399 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
401 catch ( SchedulerException e )
403 log.error( "Error restarting the repository scanning job after property change." );
408 List<ManagedRepositoryConfiguration> repositories = archivaConfiguration.getConfiguration().getManagedRepositories();
410 for ( ManagedRepositoryConfiguration repoConfig : repositories )
412 if ( repoConfig.getRefreshCronExpression() != null )
416 scheduleRepositoryJobs( repoConfig );
418 catch ( SchedulerException e )
420 log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );