]> source.dussan.org Git - archiva.git/blob
672869b188e81145824681c9041c57d16b1add31
[archiva.git] /
1 package org.apache.archiva.scheduler.repository;
2
3 /*
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *   http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing,
15  * software distributed under the License is distributed on an
16  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17  * KIND, either express or implied.  See the License for the
18  * specific language governing permissions and limitations
19  * under the License.
20  */
21
22 import org.apache.archiva.common.ArchivaException;
23 import org.apache.archiva.configuration.ArchivaConfiguration;
24 import org.apache.archiva.configuration.ConfigurationEvent;
25 import org.apache.archiva.configuration.ConfigurationListener;
26 import org.apache.archiva.configuration.ManagedRepositoryConfiguration;
27 import org.apache.archiva.metadata.repository.MetadataRepository;
28 import org.apache.archiva.metadata.repository.MetadataRepositoryException;
29 import org.apache.archiva.metadata.repository.RepositorySession;
30 import org.apache.archiva.metadata.repository.RepositorySessionFactory;
31 import org.apache.archiva.metadata.repository.stats.model.RepositoryStatisticsManager;
32 import org.apache.archiva.redback.components.scheduler.CronExpressionValidator;
33 import org.apache.archiva.redback.components.scheduler.Scheduler;
34 import org.apache.archiva.redback.components.taskqueue.TaskQueue;
35 import org.apache.archiva.redback.components.taskqueue.TaskQueueException;
36 import org.apache.archiva.scheduler.repository.model.RepositoryArchivaTaskScheduler;
37 import org.apache.archiva.scheduler.repository.model.RepositoryTask;
38 import org.apache.commons.lang.time.StopWatch;
39 import org.quartz.CronScheduleBuilder;
40 import org.quartz.CronTrigger;
41 import org.quartz.JobBuilder;
42 import org.quartz.JobDataMap;
43 import org.quartz.JobDetail;
44 import org.quartz.SchedulerException;
45 import org.quartz.TriggerBuilder;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.stereotype.Service;
49
50 import javax.annotation.PostConstruct;
51 import javax.annotation.PreDestroy;
52 import javax.inject.Inject;
53 import javax.inject.Named;
54 import java.util.ArrayList;
55 import java.util.HashSet;
56 import java.util.List;
57 import java.util.Set;
58
59 /**
60  * Default implementation of a scheduling component for archiva.
61  */
62 @Service( "archivaTaskScheduler#repository" )
63 public class DefaultRepositoryArchivaTaskScheduler
64     implements RepositoryArchivaTaskScheduler, ConfigurationListener
65 {
66     private Logger log = LoggerFactory.getLogger( getClass() );
67
68     @Inject
69     private Scheduler scheduler;
70
71     @Inject
72     private CronExpressionValidator cronValidator;
73
74     @Inject
75     @Named( value = "taskQueue#repository-scanning" )
76     private TaskQueue repositoryScanningQueue;
77
78     @Inject
79     private ArchivaConfiguration archivaConfiguration;
80
81     @Inject
82     @Named( value = "repositoryStatisticsManager#default" )
83     private RepositoryStatisticsManager repositoryStatisticsManager;
84
85     /**
86      * TODO: could have multiple implementations
87      */
88     @Inject
89     private RepositorySessionFactory repositorySessionFactory;
90
91     private static final String REPOSITORY_SCAN_GROUP = "rg";
92
93     private static final String REPOSITORY_JOB = "rj";
94
95     private static final String REPOSITORY_JOB_TRIGGER = "rjt";
96
97     static final String TASK_QUEUE = "TASK_QUEUE";
98
99     static final String TASK_REPOSITORY = "TASK_REPOSITORY";
100
101     public static final String CRON_HOURLY = "0 0 * * * ?";
102
103     private Set<String> jobs = new HashSet<>();
104
105     private List<String> queuedRepos = new ArrayList<>();
106
107     @PostConstruct
108     public void startup()
109         throws ArchivaException
110     {
111
112         StopWatch stopWatch = new StopWatch();
113         stopWatch.start();
114
115         archivaConfiguration.addListener( this );
116
117         List<ManagedRepositoryConfiguration> repositories =
118             archivaConfiguration.getConfiguration().getManagedRepositories();
119
120         RepositorySession repositorySession = repositorySessionFactory.createSession();
121         try
122         {
123             MetadataRepository metadataRepository = repositorySession.getRepository();
124             for ( ManagedRepositoryConfiguration repoConfig : repositories )
125             {
126                 if ( repoConfig.isScanned() )
127                 {
128                     try
129                     {
130                         scheduleRepositoryJobs( repoConfig );
131                     }
132                     catch ( SchedulerException e )
133                     {
134                         throw new ArchivaException( "Unable to start scheduler: " + e.getMessage(), e );
135                     }
136
137                     try
138                     {
139                         if ( !isPreviouslyScanned( repoConfig, metadataRepository ) )
140                         {
141                             queueInitialRepoScan( repoConfig );
142                         }
143                     }
144                     catch ( MetadataRepositoryException e )
145                     {
146                         log.warn( "Unable to determine if a repository is already scanned, skipping initial scan: {}",
147                                   e.getMessage(), e );
148                     }
149                 }
150             }
151         }
152         finally
153         {
154             repositorySession.close();
155         }
156
157         stopWatch.stop();
158         log.info( "Time to initalize DefaultRepositoryArchivaTaskScheduler: {} ms", stopWatch.getTime() );
159     }
160
161
162     @PreDestroy
163     public void stop()
164         throws SchedulerException
165     {
166         for ( String job : jobs )
167         {
168             scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
169         }
170         jobs.clear();
171         queuedRepos.clear();
172
173     }
174
175     @SuppressWarnings( "unchecked" )
176     @Override
177     public boolean isProcessingRepositoryTask( String repositoryId )
178     {
179         synchronized ( repositoryScanningQueue )
180         {
181             List<RepositoryTask> queue = null;
182
183             try
184             {
185                 queue = repositoryScanningQueue.getQueueSnapshot();
186             }
187             catch ( TaskQueueException e )
188             {
189                 // not possible with plexus-taskqueue implementation, ignore
190             }
191
192             for ( RepositoryTask queuedTask : queue )
193             {
194                 if ( queuedTask.getRepositoryId().equals( repositoryId ) )
195                 {
196                     return true;
197                 }
198             }
199             return false;
200         }
201     }
202
203     @Override
204     public boolean isProcessingRepositoryTask( RepositoryTask task )
205     {
206         synchronized ( repositoryScanningQueue )
207         {
208             List<RepositoryTask> queue = null;
209
210             try
211             {
212                 queue = repositoryScanningQueue.getQueueSnapshot();
213             }
214             catch ( TaskQueueException e )
215             {
216                 // not possible with plexus-taskqueue implementation, ignore
217             }
218
219             for ( RepositoryTask queuedTask : queue )
220             {
221                 if ( task.equals( queuedTask ) )
222                 {
223                     return true;
224                 }
225             }
226             return false;
227         }
228     }
229
230     @Override
231     public void queueTask( RepositoryTask task )
232         throws TaskQueueException
233     {
234         synchronized ( repositoryScanningQueue )
235         {
236             if ( isProcessingRepositoryTask( task ) )
237             {
238                 log.debug( "Repository task '{}' is already queued. Skipping task.", task );
239             }
240             else
241             {
242                 // add check if the task is already queued if it is a file scan
243                 repositoryScanningQueue.put( task );
244             }
245         }
246     }
247
248     @Override
249     public boolean unQueueTask( RepositoryTask task )
250         throws TaskQueueException
251     {
252         synchronized ( repositoryScanningQueue )
253         {
254             if ( !isProcessingRepositoryTask( task ) )
255             {
256                 log.info( "cannot unqueue Repository task '{}' not already queued.", task );
257                 return false;
258             }
259             else
260             {
261                 return repositoryScanningQueue.remove( task );
262             }
263         }
264     }
265
266     @Override
267     public void configurationEvent( ConfigurationEvent event )
268     {
269         if ( event.getType() == ConfigurationEvent.SAVED )
270         {
271             for ( String job : jobs )
272             {
273                 try
274                 {
275                     scheduler.unscheduleJob( job, REPOSITORY_SCAN_GROUP );
276                 }
277                 catch ( SchedulerException e )
278                 {
279                     log.error( "Error restarting the repository scanning job after property change." );
280                 }
281             }
282             jobs.clear();
283
284             List<ManagedRepositoryConfiguration> repositories =
285                 archivaConfiguration.getConfiguration().getManagedRepositories();
286
287             for ( ManagedRepositoryConfiguration repoConfig : repositories )
288             {
289                 if ( repoConfig.getRefreshCronExpression() != null )
290                 {
291                     try
292                     {
293                         scheduleRepositoryJobs( repoConfig );
294                     }
295                     catch ( SchedulerException e )
296                     {
297                         log.error( "error restarting job: '{}' : '{}'", REPOSITORY_JOB, repoConfig.getId() );
298                     }
299                 }
300             }
301         }
302     }
303
304     private boolean isPreviouslyScanned( ManagedRepositoryConfiguration repoConfig,
305                                          MetadataRepository metadataRepository )
306         throws MetadataRepositoryException
307     {
308         long start = System.currentTimeMillis();
309
310         boolean res = repositoryStatisticsManager.hasStatistics( metadataRepository, repoConfig.getId() );
311
312         long end = System.currentTimeMillis();
313
314         log.debug( "isPreviouslyScanned repo {} {} time: {} ms", repoConfig.getId(), res, ( end - start ) );
315
316         return res;
317     }
318
319     // MRM-848: Pre-configured repository initially appear to be empty
320     private synchronized void queueInitialRepoScan( ManagedRepositoryConfiguration repoConfig )
321     {
322         String repoId = repoConfig.getId();
323         RepositoryTask task = new RepositoryTask();
324         task.setRepositoryId( repoId );
325
326         if ( !queuedRepos.contains( repoId ) )
327         {
328             log.info( "Repository [{}] is queued to be scanned as it hasn't been previously.", repoId );
329
330             try
331             {
332                 queuedRepos.add( repoConfig.getId() );
333                 this.queueTask( task );
334             }
335             catch ( TaskQueueException e )
336             {
337                 log.error( "Error occurred while queueing repository [{}] task : {}", e.getMessage(), repoId );
338             }
339         }
340     }
341
342     private synchronized void scheduleRepositoryJobs( ManagedRepositoryConfiguration repoConfig )
343         throws SchedulerException
344     {
345         if ( repoConfig.getRefreshCronExpression() == null )
346         {
347             log.warn( "Skipping job, no cron expression for {}", repoConfig.getId() );
348             return;
349         }
350
351         if ( !repoConfig.isScanned() )
352         {
353             log.warn( "Skipping job, repository scannable has been disabled for {}", repoConfig.getId() );
354             return;
355         }
356
357         // get the cron string for these database scanning jobs
358         String cronString = repoConfig.getRefreshCronExpression();
359
360         if ( !cronValidator.validate( cronString ) )
361         {
362             log.warn( "Cron expression [{}] for repository [{}] is invalid.  Defaulting to hourly.", cronString,
363                       repoConfig.getId() );
364             cronString = CRON_HOURLY;
365         }
366
367         JobDataMap jobDataMap = new JobDataMap( );
368         jobDataMap.put( TASK_QUEUE, repositoryScanningQueue );
369         jobDataMap.put( TASK_REPOSITORY, repoConfig.getId() );
370
371         // setup the unprocessed artifact job
372         JobDetail repositoryJob = JobBuilder.newJob( RepositoryTaskJob.class )
373                                         .withIdentity( REPOSITORY_JOB + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
374                                         .setJobData( jobDataMap )
375                                         .build();
376
377         try
378         {
379             CronTrigger trigger = TriggerBuilder.newTrigger()
380                     .withIdentity( REPOSITORY_JOB_TRIGGER + ":" + repoConfig.getId(), REPOSITORY_SCAN_GROUP )
381                     .withSchedule( CronScheduleBuilder.cronSchedule( cronString ) )
382                     .build();
383
384             jobs.add( REPOSITORY_JOB + ":" + repoConfig.getId() );
385             scheduler.scheduleJob( repositoryJob, trigger );
386         }
387         catch ( RuntimeException e )
388         {
389             log.error(
390                 "ParseException in repository scanning cron expression, disabling repository scanning for '': {}",
391                 repoConfig.getId(), e.getMessage() );
392         }
393
394     }
395 }