1 package org.apache.archiva.scheduler.repository;
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
22 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
23 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
24 import org.apache.archiva.scheduler.ArchivaTaskScheduler;
25 import org.apache.maven.archiva.common.ArchivaException;
26 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
27 import org.apache.maven.archiva.configuration.ConfigurationEvent;
28 import org.apache.maven.archiva.configuration.ConfigurationListener;
29 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
30 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
31 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
32 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
33 import org.codehaus.plexus.scheduler.CronExpressionValidator;
34 import org.codehaus.plexus.scheduler.Scheduler;
35 import org.codehaus.plexus.taskqueue.TaskQueue;
36 import org.codehaus.plexus.taskqueue.TaskQueueException;
37 import org.quartz.CronTrigger;
38 import org.quartz.JobDataMap;
39 import org.quartz.JobDetail;
40 import org.quartz.SchedulerException;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import java.text.ParseException;
45 import java.util.ArrayList;
46 import java.util.HashSet;
47 import java.util.List;
51 * Default implementation of a scheduling component for archiva.
53 * @plexus.component role="org.apache.archiva.scheduler.ArchivaTaskScheduler" role-hint="repository"
55 public class RepositoryArchivaTaskScheduler
56 implements ArchivaTaskScheduler<RepositoryTask>, Startable, ConfigurationListener
58 private Logger log = LoggerFactory.getLogger( RepositoryArchivaTaskScheduler.class );
63 private Scheduler scheduler;
66 * @plexus.requirement role-hint="repository-scanning"
68 private TaskQueue repositoryScanningQueue;
73 private ArchivaConfiguration archivaConfiguration;
78 private RepositoryStatisticsManager repositoryStatisticsManager;
80 private static final String REPOSITORY_SCAN_GROUP = "rg";
82 private static final String REPOSITORY_JOB = "rj";
84 private static final String REPOSITORY_JOB_TRIGGER = "rjt";
86 static final String TASK_QUEUE = "TASK_QUEUE";
88 static final String TASK_REPOSITORY = "TASK_REPOSITORY";
90 public static final String CRON_HOURLY = "0 0 * * * ?";
92 private Set<String> jobs = new HashSet<String>();
94 private List<String> queuedRepos = new ArrayList<String>();
97 throws ArchivaException
99 archivaConfiguration.addListener( this );
105 catch ( StartingException e )
107 throw new ArchivaException( e.getMessage(), e );
112 throws StartingException
114 List<ManagedRepositoryConfiguration> repositories =
115 archivaConfiguration.getConfiguration().getManagedRepositories();
117 for ( ManagedRepositoryConfiguration repoConfig : repositories )
119 if ( repoConfig.isScanned() )
123 scheduleRepositoryJobs( repoConfig );
125 catch ( SchedulerException e )
127 throw new StartingException( "Unable to start scheduler: " + e.getMessage(), e );
132 if ( !isPreviouslyScanned( repoConfig ) )
134 queueInitialRepoScan( repoConfig );
137 catch ( MetadataRepositoryException e )
139 log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: " +
147 throws StoppingException
151 for ( String job : jobs )
153 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
158 catch ( SchedulerException e )
160 throw new StoppingException( "Unable to unschedule tasks", e );
164 @SuppressWarnings( "unchecked" )
165 public boolean isProcessingRepositoryTask( String repositoryId )
167 synchronized ( repositoryScanningQueue )
169 List<RepositoryTask> queue = null;
173 queue = repositoryScanningQueue.getQueueSnapshot();
175 catch ( TaskQueueException e )
177 // not possible with plexus-taskqueue implementation, ignore
180 for ( RepositoryTask queuedTask : queue )
182 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
191 @SuppressWarnings( "unchecked" )
192 private boolean isProcessingRepositoryTask( RepositoryTask task )
194 synchronized ( repositoryScanningQueue )
196 List<RepositoryTask> queue = null;
200 queue = repositoryScanningQueue.getQueueSnapshot();
202 catch ( TaskQueueException e )
204 // not possible with plexus-taskqueue implementation, ignore
207 for ( RepositoryTask queuedTask : queue )
209 if ( task.equals( queuedTask ) )
218 public void queueTask( RepositoryTask task )
219 throws TaskQueueException
221 synchronized ( repositoryScanningQueue )
223 if ( isProcessingRepositoryTask( task ) )
225 log.debug( "Repository task '" + task + "' is already queued. Skipping task." );
229 // add check if the task is already queued if it is a file scan
230 repositoryScanningQueue.put( task );
235 public void configurationEvent( ConfigurationEvent event )
237 if ( event.getType() == ConfigurationEvent.SAVED )
239 for ( String job : jobs )
243 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
245 catch ( SchedulerException e )
247 log.error( "Error restarting the repository scanning job after property change." );
252 List<ManagedRepositoryConfiguration> repositories =
253 archivaConfiguration.getConfiguration().getManagedRepositories();
255 for ( ManagedRepositoryConfiguration repoConfig : repositories )
257 if ( repoConfig.getRefreshCronExpression() != null )
261 scheduleRepositoryJobs( repoConfig );
263 catch ( SchedulerException e )
265 log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
272 @SuppressWarnings( "unchecked" )
273 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig )
274 throws MetadataRepositoryException
276 return repositoryStatisticsManager.getLastStatistics( repoConfig.getId() ) != null;
279 // MRM-848: Pre-configured repository initially appear to be empty
280 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
282 String repoId = repoConfig.getId();
283 RepositoryTask task = new RepositoryTask();
284 task.setRepositoryId( repoId );
286 if ( !queuedRepos.contains( repoId ) )
288 log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." );
292 queuedRepos.add( repoConfig.getId() );
293 this.queueTask( task );
295 catch ( TaskQueueException e )
297 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
302 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
303 throws SchedulerException
305 if ( repoConfig.getRefreshCronExpression() == null )
307 log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
311 if ( !repoConfig.isScanned() )
313 log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
317 // get the cron string for these database scanning jobs
318 String cronString = repoConfig.getRefreshCronExpression();
320 CronExpressionValidator cronValidator = new CronExpressionValidator();
321 if ( !cronValidator.validate( cronString ) )
323 log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId() +
324 "] is invalid. Defaulting to hourly." );
325 cronString = CRON_HOURLY;
328 // setup the unprocessed artifact job
329 JobDetail repositoryJob = new JobDetail( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP,
330 RepositoryTaskJob.class );
332 JobDataMap dataMap = new JobDataMap();
333 dataMap.put( TASK_QUEUE, repositoryScanningQueue );
334 dataMap.put( TASK_REPOSITORY, repoConfig.getId() );
335 repositoryJob.setJobDataMap( dataMap );
339 CronTrigger trigger = new CronTrigger( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(),
340 REPOSITORY_SCAN_GROUP, cronString );
342 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
343 scheduler.scheduleJob( repositoryJob, trigger );
345 catch ( ParseException e )
347 log.error( "ParseException in repository scanning cron expression, disabling repository scanning for '" +
348 repoConfig.getId() + "': " + e.getMessage() );