]> source.dussan.org Git - archiva.git/blob
f89343d32e17f378e77ef135a59ddc714abd1feb
[archiva.git] /
1 package org.apache.archiva.scheduler.repository;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.archiva.common.ArchivaException;
23 import org.apache.archiva.configuration.ArchivaConfiguration;
24 import org.apache.archiva.configuration.ConfigurationEvent;
25 import org.apache.archiva.configuration.ConfigurationListener;
26 import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27 import org.apache.archiva.metadata.repository.MetadataRepository;
28 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29 import org.apache.archiva.metadata.repository.RepositorySession;
30 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31 import org.apache.archiva.metadata.repository.stats.RepositoryStatisticsManager;
32 import org.apache.archiva.redback.components.scheduler.CronExpressionValidator;
33 import org.apache.archiva.redback.components.scheduler.Scheduler;
34 import org.apache.archiva.redback.components.taskqueue.TaskQueue;
35 import org.apache.archiva.redback.components.taskqueue.TaskQueueException;
36 import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
37 import org.apache.archiva.scheduler.repository.model.RepositoryTask;
38 import org.apache.commons.lang.time.StopWatch;
39 import org.quartz.CronScheduleBuilder;
40 import org.quartz.CronTrigger;
41 import org.quartz.JobBuilder;
42 import org.quartz.JobDataMap;
43 import org.quartz.JobDetail;
44 import org.quartz.SchedulerException;
45 import org.quartz.TriggerBuilder;
46 import org.quartz.impl.JobDetailImpl;
47 import org.quartz.impl.triggers.CronTriggerImpl;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.springframework.stereotype.Service;
51
52 import javax.annotation.PostConstruct;
53 import javax.annotation.PreDestroy;
54 import javax.inject.Inject;
55 import javax.inject.Named;
56 import java.text.ParseException;
57 import java.util.ArrayList;
58 import java.util.HashSet;
59 import java.util.List;
60 import java.util.Set;
61
62 /**
63  * Default implementation of a scheduling component for archiva.
64  */
65 @Service( "archivaTaskScheduler#repository" )
66 public class DefaultRepositoryArchivaTaskScheduler
67     implements RepositoryArchivaTaskScheduler, ConfigurationListener
68 {
69     private Logger log = LoggerFactory.getLogger( getClass() );
70
71     @Inject
72     private Scheduler scheduler;
73
74     @Inject
75     private CronExpressionValidator cronValidator;
76
77     @Inject
78     @Named( value = "taskQueue#repository-scanning" )
79     private TaskQueue repositoryScanningQueue;
80
81     @Inject
82     private ArchivaConfiguration archivaConfiguration;
83
84     @Inject
85     @Named( value = "repositoryStatisticsManager#default" )
86     private RepositoryStatisticsManager repositoryStatisticsManager;
87
88     /**
89      * TODO: could have multiple implementations
90      */
91     @Inject
92     private RepositorySessionFactory repositorySessionFactory;
93
94     private static final String REPOSITORY_SCAN_GROUP = "rg";
95
96     private static final String REPOSITORY_JOB = "rj";
97
98     private static final String REPOSITORY_JOB_TRIGGER = "rjt";
99
100     static final String TASK_QUEUE = "TASK_QUEUE";
101
102     static final String TASK_REPOSITORY = "TASK_REPOSITORY";
103
104     public static final String CRON_HOURLY = "0 0 * * * ?";
105
106     private Set<String> jobs = new HashSet<String>();
107
108     private List<String> queuedRepos = new ArrayList<String>();
109
110     @PostConstruct
111     public void startup()
112         throws ArchivaException
113     {
114
115         StopWatch stopWatch = new StopWatch();
116         stopWatch.start();
117
118         archivaConfiguration.addListener( this );
119
120         List<ManagedRepositoryConfiguration> repositories =
121             archivaConfiguration.getConfiguration().getManagedRepositories();
122
123         RepositorySession repositorySession = repositorySessionFactory.createSession();
124         try
125         {
126             MetadataRepository metadataRepository = repositorySession.getRepository();
127             for ( ManagedRepositoryConfiguration repoConfig : repositories )
128             {
129                 if ( repoConfig.isScanned() )
130                 {
131                     try
132                     {
133                         scheduleRepositoryJobs( repoConfig );
134                     }
135                     catch ( SchedulerException e )
136                     {
137                         throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
138                     }
139
140                     try
141                     {
142                         if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
143                         {
144                             queueInitialRepoScan( repoConfig );
145                         }
146                     }
147                     catch ( MetadataRepositoryException e )
148                     {
149                         log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
150                                   e.getMessage(), e );
151                     }
152                 }
153             }
154         }
155         finally
156         {
157             repositorySession.close();
158         }
159
160         stopWatch.stop();
161         log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
162     }
163
164
165     @PreDestroy
166     public void stop()
167         throws SchedulerException
168     {
169         for ( String job : jobs )
170         {
171             scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
172         }
173         jobs.clear();
174         queuedRepos.clear();
175
176     }
177
178     @SuppressWarnings( "unchecked" )
179     public boolean isProcessingRepositoryTask( String repositoryId )
180     {
181         synchronized ( repositoryScanningQueue )
182         {
183             List<RepositoryTask> queue = null;
184
185             try
186             {
187                 queue = repositoryScanningQueue.getQueueSnapshot();
188             }
189             catch ( TaskQueueException e )
190             {
191                 // not possible with plexus-taskqueue implementation, ignore
192             }
193
194             for ( RepositoryTask queuedTask : queue )
195             {
196                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
197                 {
198                     return true;
199                 }
200             }
201             return false;
202         }
203     }
204
205     public boolean isProcessingRepositoryTask( RepositoryTask task )
206     {
207         synchronized ( repositoryScanningQueue )
208         {
209             List<RepositoryTask> queue = null;
210
211             try
212             {
213                 queue = repositoryScanningQueue.getQueueSnapshot();
214             }
215             catch ( TaskQueueException e )
216             {
217                 // not possible with plexus-taskqueue implementation, ignore
218             }
219
220             for ( RepositoryTask queuedTask : queue )
221             {
222                 if ( task.equals( queuedTask ) )
223                 {
224                     return true;
225                 }
226             }
227             return false;
228         }
229     }
230
231     public void queueTask( RepositoryTask task )
232         throws TaskQueueException
233     {
234         synchronized ( repositoryScanningQueue )
235         {
236             if ( isProcessingRepositoryTask( task ) )
237             {
238                 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
239             }
240             else
241             {
242                 // add check if the task is already queued if it is a file scan
243                 repositoryScanningQueue.put( task );
244             }
245         }
246     }
247
248     public boolean unQueueTask( RepositoryTask task )
249         throws TaskQueueException
250     {
251         synchronized ( repositoryScanningQueue )
252         {
253             if ( !isProcessingRepositoryTask( task ) )
254             {
255                 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
256                 return false;
257             }
258             else
259             {
260                 return repositoryScanningQueue.remove( task );
261             }
262         }
263     }
264
265     public void configurationEvent( ConfigurationEvent event )
266     {
267         if ( event.getType() == ConfigurationEvent.SAVED )
268         {
269             for ( String job : jobs )
270             {
271                 try
272                 {
273                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
274                 }
275                 catch ( SchedulerException e )
276                 {
277                     log.error( "Error restarting the repository scanning job after property change." );
278                 }
279             }
280             jobs.clear();
281
282             List<ManagedRepositoryConfiguration> repositories =
283                 archivaConfiguration.getConfiguration().getManagedRepositories();
284
285             for ( ManagedRepositoryConfiguration repoConfig : repositories )
286             {
287                 if ( repoConfig.getRefreshCronExpression() != null )
288                 {
289                     try
290                     {
291                         scheduleRepositoryJobs( repoConfig );
292                     }
293                     catch ( SchedulerException e )
294                     {
295                         log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
296                     }
297                 }
298             }
299         }
300     }
301
302     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
303                                          MetadataRepository metadataRepository )
304         throws MetadataRepositoryException
305     {
306         long start = System.currentTimeMillis();
307
308         boolean res = repositoryStatisticsManager.hasStatistics( metadataRepository, repoConfig.getId() );
309
310         long end = System.currentTimeMillis();
311
312         log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
313
314         return res;
315     }
316
317     // MRM-848: Pre-configured repository initially appear to be empty
318     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
319     {
320         String repoId = repoConfig.getId();
321         RepositoryTask task = new RepositoryTask();
322         task.setRepositoryId( repoId );
323
324         if ( !queuedRepos.contains( repoId ) )
325         {
326             log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
327
328             try
329             {
330                 queuedRepos.add( repoConfig.getId() );
331                 this.queueTask( task );
332             }
333             catch ( TaskQueueException e )
334             {
335                 log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
336             }
337         }
338     }
339
340     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
341         throws SchedulerException
342     {
343         if ( repoConfig.getRefreshCronExpression() == null )
344         {
345             log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
346             return;
347         }
348
349         if ( !repoConfig.isScanned() )
350         {
351             log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
352             return;
353         }
354
355         // get the cron string for these database scanning jobs
356         String cronString = repoConfig.getRefreshCronExpression();
357
358         if ( !cronValidator.validate( cronString ) )
359         {
360             log.warn( "Cron expression [{}] for repository [{}] is invalid.  Defaulting to hourly.", cronString,
361                       repoConfig.getId() );
362             cronString = CRON_HOURLY;
363         }
364
365         JobDataMap jobDataMap = new JobDataMap( );
366         jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
367         jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
368
369         // setup the unprocessed artifact job
370         JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
371                                         .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
372                                         .setJobData( jobDataMap )
373                                         .build();
374
375         try
376         {
377             CronTrigger trigger = TriggerBuilder.newTrigger()
378                     .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
379                     .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
380                     .build();
381
382             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
383             scheduler.scheduleJob( repositoryJob, trigger );
384         }
385         catch ( RuntimeException e )
386         {
387             log.error(
388                 "ParseException in repository scanning cron expression, disabling repository scanning for '': {}",
389                 repoConfig.getId(), e.getMessage() );
390         }
391
392     }
393 }