1 package org.apache.archiva.scheduler.repository;
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
22 import org.apache.archiva.common.ArchivaException;
23 import org.apache.archiva.configuration.ArchivaConfiguration;
24 import org.apache.archiva.configuration.ConfigurationEvent;
25 import org.apache.archiva.configuration.ConfigurationListener;
26 import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27 import org.apache.archiva.metadata.repository.MetadataRepository;
28 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29 import org.apache.archiva.metadata.repository.RepositorySession;
30 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31 import org.apache.archiva.metadata.repository.stats.model.RepositoryStatisticsManager;
32 import org.apache.archiva.components.scheduler.CronExpressionValidator;
33 import org.apache.archiva.components.scheduler.Scheduler;
34 import org.apache.archiva.components.taskqueue.TaskQueue;
35 import org.apache.archiva.components.taskqueue.TaskQueueException;
36 import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
37 import org.apache.archiva.scheduler.repository.model.RepositoryTask;
38 import org.apache.commons.lang3.time.StopWatch;
39 import org.quartz.CronScheduleBuilder;
40 import org.quartz.CronTrigger;
41 import org.quartz.JobBuilder;
42 import org.quartz.JobDataMap;
43 import org.quartz.JobDetail;
44 import org.quartz.SchedulerException;
45 import org.quartz.TriggerBuilder;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.stereotype.Service;
50 import javax.annotation.PostConstruct;
51 import javax.annotation.PreDestroy;
52 import javax.inject.Inject;
53 import javax.inject.Named;
54 import java.util.ArrayList;
55 import java.util.HashSet;
56 import java.util.List;
60 * Default implementation of a scheduling component for archiva.
62 @Service( "archivaTaskScheduler#repository" )
63 public class DefaultRepositoryArchivaTaskScheduler
64 implements RepositoryArchivaTaskScheduler, ConfigurationListener
66 private Logger log = LoggerFactory.getLogger( getClass() );
69 private Scheduler scheduler;
72 private CronExpressionValidator cronValidator;
75 @Named( value = "taskQueue#repository-scanning" )
76 private TaskQueue<RepositoryTask> repositoryScanningQueue;
79 private ArchivaConfiguration archivaConfiguration;
82 @Named( value = "repositoryStatisticsManager#default" )
83 private RepositoryStatisticsManager repositoryStatisticsManager;
86 * TODO: could have multiple implementations
89 private RepositorySessionFactory repositorySessionFactory;
91 private static final String REPOSITORY_SCAN_GROUP = "rg";
93 private static final String REPOSITORY_JOB = "rj";
95 private static final String REPOSITORY_JOB_TRIGGER = "rjt";
97 static final String TASK_QUEUE = "TASK_QUEUE";
99 static final String TASK_REPOSITORY = "TASK_REPOSITORY";
101 public static final String CRON_HOURLY = "0 0 * * * ?";
103 private Set<String> jobs = new HashSet<>();
105 private List<String> queuedRepos = new ArrayList<>();
108 public void startup()
109 throws ArchivaException
112 StopWatch stopWatch = new StopWatch();
115 archivaConfiguration.addListener( this );
117 List<ManagedRepositoryConfiguration> repositories =
118 archivaConfiguration.getConfiguration().getManagedRepositories();
120 RepositorySession repositorySession = null;
123 repositorySession = repositorySessionFactory.createSession();
125 catch ( MetadataRepositoryException e )
127 e.printStackTrace( );
131 MetadataRepository metadataRepository = repositorySession.getRepository();
132 for ( ManagedRepositoryConfiguration repoConfig : repositories )
134 if ( repoConfig.isScanned() )
138 scheduleRepositoryJobs( repoConfig );
140 catch ( SchedulerException e )
142 throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
147 if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
149 queueInitialRepoScan( repoConfig );
152 catch ( MetadataRepositoryException e )
154 log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
162 repositorySession.close();
166 log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
172 throws SchedulerException
174 for ( String job : jobs )
176 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
183 @SuppressWarnings( "unchecked" )
185 public boolean isProcessingRepositoryTask( String repositoryId )
187 synchronized ( repositoryScanningQueue )
189 List<RepositoryTask> queue = null;
193 queue = repositoryScanningQueue.getQueueSnapshot();
195 catch ( TaskQueueException e )
197 // not possible with plexus-taskqueue implementation, ignore
200 for ( RepositoryTask queuedTask : queue )
202 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
212 public boolean isProcessingRepositoryTask( RepositoryTask task )
214 synchronized ( repositoryScanningQueue )
216 List<RepositoryTask> queue = null;
220 queue = repositoryScanningQueue.getQueueSnapshot();
222 catch ( TaskQueueException e )
224 // not possible with plexus-taskqueue implementation, ignore
227 for ( RepositoryTask queuedTask : queue )
229 if ( task.equals( queuedTask ) )
239 public void queueTask( RepositoryTask task )
240 throws TaskQueueException
242 synchronized ( repositoryScanningQueue )
244 if ( isProcessingRepositoryTask( task ) )
246 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
250 // add check if the task is already queued if it is a file scan
251 repositoryScanningQueue.put( task );
257 public boolean unQueueTask( RepositoryTask task )
258 throws TaskQueueException
260 synchronized ( repositoryScanningQueue )
262 if ( !isProcessingRepositoryTask( task ) )
264 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
269 return repositoryScanningQueue.remove( task );
275 public void configurationEvent( ConfigurationEvent event )
277 if ( event.getType() == ConfigurationEvent.SAVED )
279 for ( String job : jobs )
283 scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
285 catch ( SchedulerException e )
287 log.error( "Error restarting the repository scanning job after property change." );
292 List<ManagedRepositoryConfiguration> repositories =
293 archivaConfiguration.getConfiguration().getManagedRepositories();
295 for ( ManagedRepositoryConfiguration repoConfig : repositories )
297 if ( repoConfig.getRefreshCronExpression() != null )
301 scheduleRepositoryJobs( repoConfig );
303 catch ( SchedulerException e )
305 log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
312 private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
313 MetadataRepository metadataRepository )
314 throws MetadataRepositoryException
316 long start = System.currentTimeMillis();
318 boolean res = repositoryStatisticsManager.hasStatistics( repoConfig.getId() );
320 long end = System.currentTimeMillis();
322 log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
327 // MRM-848: Pre-configured repository initially appear to be empty
328 private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
330 String repoId = repoConfig.getId();
331 RepositoryTask task = new RepositoryTask();
332 task.setRepositoryId( repoId );
334 if ( !queuedRepos.contains( repoId ) )
336 log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
340 queuedRepos.add( repoConfig.getId() );
341 this.queueTask( task );
343 catch ( TaskQueueException e )
345 log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
350 private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
351 throws SchedulerException
353 if ( repoConfig.getRefreshCronExpression() == null )
355 log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
359 if ( !repoConfig.isScanned() )
361 log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
365 // get the cron string for these database scanning jobs
366 String cronString = repoConfig.getRefreshCronExpression();
368 if ( !cronValidator.validate( cronString ) )
370 log.warn( "Cron expression [{}] for repository [{}] is invalid. Defaulting to hourly.", cronString,
371 repoConfig.getId() );
372 cronString = CRON_HOURLY;
375 JobDataMap jobDataMap = new JobDataMap( );
376 jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
377 jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
379 // setup the unprocessed artifact job
380 JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
381 .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
382 .setJobData( jobDataMap )
387 CronTrigger trigger = TriggerBuilder.newTrigger()
388 .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
389 .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
392 jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
393 scheduler.scheduleJob( repositoryJob, trigger );
395 catch ( RuntimeException e )
398 "ParseException in repository scanning cron expression, disabling repository scanning for '{}': {}",
399 repoConfig.getId(), e.getMessage() );