1 package org.apache.archiva.scheduler.repository;
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
22 import org.apache.archiva.metadata.repository.MetadataRepository;
23 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
24 import org.apache.archiva.metadata.repository.RepositorySession;
25 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
26 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
27 import org.apache.archiva.scheduler.ArchivaTaskScheduler;
28 import org.apache.maven.archiva.common.ArchivaException;
29 import org.apache.maven.archiva.configuration.ArchivaConfiguration;
30 import org.apache.maven.archiva.configuration.ConfigurationEvent;
31 import org.apache.maven.archiva.configuration.ConfigurationListener;
32 import org.apache.maven.archiva.configuration.ManagedRepositoryConfiguration;
33 import org.codehaus.plexus.taskqueue.TaskQueue;
34 import org.codehaus.plexus.taskqueue.TaskQueueException;
35 import org.codehaus.redback.components.scheduler.CronExpressionValidator;
36 import org.codehaus.redback.components.scheduler.Scheduler;
37 import org.quartz.CronTrigger;
38 import org.quartz.JobDataMap;
39 import org.quartz.JobDetail;
40 import org.quartz.SchedulerException;
41 import org.quartz.impl.JobDetailImpl;
42 import org.quartz.impl.triggers.CronTriggerImpl;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.springframework.stereotype.Service;
47 import javax.annotation.PostConstruct;
48 import javax.annotation.PreDestroy;
49 import javax.inject.Inject;
50 import javax.inject.Named;
51 import java.text.ParseException;
52 import java.util.ArrayList;
53 import java.util.HashSet;
54 import java.util.List;
58 * Default implementation of a scheduling component for archiva.
60 @Service( "archivaTaskScheduler#repository" )
61 public class RepositoryArchivaTaskScheduler
62 implements ArchivaTaskScheduler<RepositoryTask>, ConfigurationListener
64 private Logger log = LoggerFactory.getLogger( RepositoryArchivaTaskScheduler.class );
70 private Scheduler scheduler;
73 private CronExpressionValidator cronValidator;
76 * plexus.requirement role-hint="repository-scanning"
79 @Named( value = "taskQueue#repository-scanning" )
80 private TaskQueue repositoryScanningQueue;
86 private ArchivaConfiguration archivaConfiguration;
92 @Named( value = "repositoryStatisticsManager#default" )
93 private RepositoryStatisticsManager repositoryStatisticsManager;
96 * TODO: could have multiple implementations
101 private RepositorySessionFactory repositorySessionFactory;
103 private static final String REPOSITORY_SCAN_GROUP = "rg";
105 private static final String REPOSITORY_JOB = "rj";
107 private static final String REPOSITORY_JOB_TRIGGER = "rjt";
109 static final String TASK_QUEUE = "TASK_QUEUE";
111 static final String TASK_REPOSITORY = "TASK_REPOSITORY";
113 public static final String CRON_HOURLY = "0 0 * * * ?";
115 private Set<String> jobs = new HashSet<String>();
117 private List<String> queuedRepos = new ArrayList<String>();
120 public void startup()
121 throws ArchivaException
123 archivaConfiguration.addListener( this );
125 List<ManagedRepositoryConfiguration> repositories =
126 archivaConfiguration.getConfiguration().getManagedRepositories();
128 RepositorySession repositorySession = repositorySessionFactory.createSession();
131 MetadataRepository metadataRepository = repositorySession.getRepository();
132 for ( ManagedRepositoryConfiguration repoConfig : repositories )
134 if ( repoConfig.isScanned() )
138 scheduleRepositoryJobs( repoConfig );
140 catch ( SchedulerException e )
142 throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
147 if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
149 queueInitialRepoScan( repoConfig );
152 catch ( MetadataRepositoryException e )
154 log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: "
155 + e.getMessage(), e );
162 repositorySession.close();
170 throws SchedulerException
172 for ( String job : jobs )
174 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
181 @SuppressWarnings( "unchecked" )
182 public boolean isProcessingRepositoryTask( String repositoryId )
184 synchronized ( repositoryScanningQueue )
186 List<RepositoryTask> queue = null;
190 queue = repositoryScanningQueue.getQueueSnapshot();
192 catch ( TaskQueueException e )
194 // not possible with plexus-taskqueue implementation, ignore
197 for ( RepositoryTask queuedTask : queue )
199 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
208 @SuppressWarnings( "unchecked" )
209 private boolean isProcessingRepositoryTask( RepositoryTask task )
211 synchronized ( repositoryScanningQueue )
213 List<RepositoryTask> queue = null;
217 queue = repositoryScanningQueue.getQueueSnapshot();
219 catch ( TaskQueueException e )
221 // not possible with plexus-taskqueue implementation, ignore
224 for ( RepositoryTask queuedTask : queue )
226 if ( task.equals( queuedTask ) )
235 public void queueTask( RepositoryTask task )
236 throws TaskQueueException
238 synchronized ( repositoryScanningQueue )
240 if ( isProcessingRepositoryTask( task ) )
242 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
246 // add check if the task is already queued if it is a file scan
247 repositoryScanningQueue.put( task );
252 public boolean unQueueTask( RepositoryTask task )
253 throws TaskQueueException
255 synchronized ( repositoryScanningQueue )
257 if ( isProcessingRepositoryTask( task ) )
259 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
264 return repositoryScanningQueue.remove( task );
268 public void configurationEvent( ConfigurationEvent event )
270 if ( event.getType() == ConfigurationEvent.SAVED )
272 for ( String job : jobs )
276 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
278 catch ( SchedulerException e )
280 log.error( "Error restarting the repository scanning job after property change." );
285 List<ManagedRepositoryConfiguration> repositories =
286 archivaConfiguration.getConfiguration().getManagedRepositories();
288 for ( ManagedRepositoryConfiguration repoConfig : repositories )
290 if ( repoConfig.getRefreshCronExpression() != null )
294 scheduleRepositoryJobs( repoConfig );
296 catch ( SchedulerException e )
298 log.error( "error restarting job: " + REPOSITORY_JOB + ":" + repoConfig.getId() );
305 @SuppressWarnings( "unchecked" )
306 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
307 MetadataRepository metadataRepository )
308 throws MetadataRepositoryException
310 return repositoryStatisticsManager.getLastStatistics( metadataRepository, repoConfig.getId() ) != null;
313 // MRM-848: Pre-configured repository initially appear to be empty
314 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
316 String repoId = repoConfig.getId();
317 RepositoryTask task = new RepositoryTask();
318 task.setRepositoryId( repoId );
320 if ( !queuedRepos.contains( repoId ) )
322 log.info( "Repository [" + repoId + "] is queued to be scanned as it hasn't been previously." );
326 queuedRepos.add( repoConfig.getId() );
327 this.queueTask( task );
329 catch ( TaskQueueException e )
331 log.error( "Error occurred while queueing repository [" + repoId + "] task : " + e.getMessage() );
336 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
337 throws SchedulerException
339 if ( repoConfig.getRefreshCronExpression() == null )
341 log.warn( "Skipping job, no cron expression for " + repoConfig.getId() );
345 if ( !repoConfig.isScanned() )
347 log.warn( "Skipping job, repository scannable has been disabled for " + repoConfig.getId() );
351 // get the cron string for these database scanning jobs
352 String cronString = repoConfig.getRefreshCronExpression();
354 if ( !cronValidator.validate( cronString ) )
356 log.warn( "Cron expression [" + cronString + "] for repository [" + repoConfig.getId()
357 + "] is invalid. Defaulting to hourly." );
358 cronString = CRON_HOURLY;
361 // setup the unprocessed artifact job
362 JobDetailImpl repositoryJob =
363 new JobDetailImpl( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, RepositoryTaskJob.class );
365 repositoryJob.getJobDataMap().put( TASK_QUEUE, repositoryScanningQueue );
366 repositoryJob.getJobDataMap().put( TASK_REPOSITORY, repoConfig.getId() );
370 CronTriggerImpl trigger =
371 new CronTriggerImpl( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP, cronString );
373 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
374 scheduler.scheduleJob( repositoryJob, trigger );
376 catch ( ParseException e )
378 log.error( "ParseException in repository scanning cron expression, disabling repository scanning for '"
379 + repoConfig.getId() + "': " + e.getMessage() );