1 package org.apache.archiva.scheduler.repository;
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
22 import org.apache.archiva.common.ArchivaException;
23 import org.apache.archiva.configuration.ArchivaConfiguration;
24 import org.apache.archiva.configuration.ConfigurationEvent;
25 import org.apache.archiva.configuration.ConfigurationListener;
26 import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27 import org.apache.archiva.metadata.repository.MetadataRepository;
28 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29 import org.apache.archiva.metadata.repository.RepositorySession;
30 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
32 import org.apache.archiva.redback.components.scheduler.CronExpressionValidator;
33 import org.apache.archiva.redback.components.scheduler.Scheduler;
34 import org.apache.archiva.redback.components.taskqueue.TaskQueue;
35 import org.apache.archiva.redback.components.taskqueue.TaskQueueException;
36 import org.apache.archiva.scheduler.ArchivaTaskScheduler;
37 import org.quartz.SchedulerException;
38 import org.quartz.impl.JobDetailImpl;
39 import org.quartz.impl.triggers.CronTriggerImpl;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.stereotype.Service;
44 import javax.annotation.PostConstruct;
45 import javax.annotation.PreDestroy;
46 import javax.inject.Inject;
47 import javax.inject.Named;
48 import java.text.ParseException;
49 import java.util.ArrayList;
50 import java.util.HashSet;
51 import java.util.List;
55 * Default implementation of a scheduling component for archiva.
57 @Service ("archivaTaskScheduler#repository")
58 public class DefaultRepositoryArchivaTaskScheduler
59 implements RepositoryArchivaTaskScheduler, ConfigurationListener
61 private Logger log = LoggerFactory.getLogger( getClass() );
67 private Scheduler scheduler;
70 private CronExpressionValidator cronValidator;
76 @Named (value = "taskQueue#repository-scanning")
77 private TaskQueue repositoryScanningQueue;
83 private ArchivaConfiguration archivaConfiguration;
89 @Named (value = "repositoryStatisticsManager#default")
90 private RepositoryStatisticsManager repositoryStatisticsManager;
93 * TODO: could have multiple implementations
96 private RepositorySessionFactory repositorySessionFactory;
98 private static final String REPOSITORY_SCAN_GROUP = "rg";
100 private static final String REPOSITORY_JOB = "rj";
102 private static final String REPOSITORY_JOB_TRIGGER = "rjt";
104 static final String TASK_QUEUE = "TASK_QUEUE";
106 static final String TASK_REPOSITORY = "TASK_REPOSITORY";
108 public static final String CRON_HOURLY = "0 0 * * * ?";
110 private Set<String> jobs = new HashSet<String>();
112 private List<String> queuedRepos = new ArrayList<String>();
115 public void startup()
116 throws ArchivaException
118 archivaConfiguration.addListener( this );
120 List<ManagedRepositoryConfiguration> repositories =
121 archivaConfiguration.getConfiguration().getManagedRepositories();
123 RepositorySession repositorySession = repositorySessionFactory.createSession();
126 MetadataRepository metadataRepository = repositorySession.getRepository();
127 for ( ManagedRepositoryConfiguration repoConfig : repositories )
129 if ( repoConfig.isScanned() )
133 scheduleRepositoryJobs( repoConfig );
135 catch ( SchedulerException e )
137 throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
142 if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
144 queueInitialRepoScan( repoConfig );
147 catch ( MetadataRepositoryException e )
149 log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: "
150 + e.getMessage(), e );
157 repositorySession.close();
165 throws SchedulerException
167 for ( String job : jobs )
169 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
176 @SuppressWarnings ("unchecked")
177 public boolean isProcessingRepositoryTask( String repositoryId )
179 synchronized ( repositoryScanningQueue )
181 List<RepositoryTask> queue = null;
185 queue = repositoryScanningQueue.getQueueSnapshot();
187 catch ( TaskQueueException e )
189 // not possible with plexus-taskqueue implementation, ignore
192 for ( RepositoryTask queuedTask : queue )
194 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
203 public boolean isProcessingRepositoryTask( RepositoryTask task )
205 synchronized ( repositoryScanningQueue )
207 List<RepositoryTask> queue = null;
211 queue = repositoryScanningQueue.getQueueSnapshot();
213 catch ( TaskQueueException e )
215 // not possible with plexus-taskqueue implementation, ignore
218 for ( RepositoryTask queuedTask : queue )
220 if ( task.equals( queuedTask ) )
229 public void queueTask( RepositoryTask task )
230 throws TaskQueueException
232 synchronized ( repositoryScanningQueue )
234 if ( isProcessingRepositoryTask( task ) )
236 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
240 // add check if the task is already queued if it is a file scan
241 repositoryScanningQueue.put( task );
246 public boolean unQueueTask( RepositoryTask task )
247 throws TaskQueueException
249 synchronized ( repositoryScanningQueue )
251 if ( !isProcessingRepositoryTask( task ) )
253 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
258 return repositoryScanningQueue.remove( task );
263 public void configurationEvent( ConfigurationEvent event )
265 if ( event.getType() == ConfigurationEvent.SAVED )
267 for ( String job : jobs )
271 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
273 catch ( SchedulerException e )
275 log.error( "Error restarting the repository scanning job after property change." );
280 List<ManagedRepositoryConfiguration> repositories =
281 archivaConfiguration.getConfiguration().getManagedRepositories();
283 for ( ManagedRepositoryConfiguration repoConfig : repositories )
285 if ( repoConfig.getRefreshCronExpression() != null )
289 scheduleRepositoryJobs( repoConfig );
291 catch ( SchedulerException e )
293 log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
300 @SuppressWarnings ("unchecked")
301 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
302 MetadataRepository metadataRepository )
303 throws MetadataRepositoryException
305 return repositoryStatisticsManager.getLastStatistics( metadataRepository, repoConfig.getId() ) != null;
308 // MRM-848: Pre-configured repository initially appear to be empty
309 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
311 String repoId = repoConfig.getId();
312 RepositoryTask task = new RepositoryTask();
313 task.setRepositoryId( repoId );
315 if ( !queuedRepos.contains( repoId ) )
317 log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." );
321 queuedRepos.add( repoConfig.getId() );
322 this.queueTask( task );
324 catch ( TaskQueueException e )
326 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
331 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
332 throws SchedulerException
334 if ( repoConfig.getRefreshCronExpression() == null )
336 log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
340 if ( !repoConfig.isScanned() )
342 log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
346 // get the cron string for these database scanning jobs
347 String cronString = repoConfig.getRefreshCronExpression();
349 if ( !cronValidator.validate( cronString ) )
351 log.warn( "Cron expression [{}] for repository [{}] is invalid. Defaulting to hourly.", cronString,
352 repoConfig.getId() );
353 cronString = CRON_HOURLY;
356 // setup the unprocessed artifact job
357 JobDetailImpl repositoryJob =
358 new JobDetailImpl( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP,
359 RepositoryTaskJob.class );
361 repositoryJob.getJobDataMap().put( TASK_QUEUE, repositoryScanningQueue );
362 repositoryJob.getJobDataMap().put( TASK_REPOSITORY, repoConfig.getId() );
366 CronTriggerImpl trigger =
367 new CronTriggerImpl( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP,
370 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
371 scheduler.scheduleJob( repositoryJob, trigger );
373 catch ( ParseException e )
375 log.error( "ParseException in repository scanning cron expression, disabling repository scanning for '"
376 + repoConfig.getId() + "': " + e.getMessage() );