]> source.dussan.org Git - archiva.git/blob
e634195bcb9c9b6b3e82ec6992600ae1f1e0e48b
[archiva.git] /
1 package org.apache.archiva.scheduler.indexing;
2 /*
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  */
20
21 import org.apache.archiva.admin.model.RepositoryAdminException;
22 import org.apache.archiva.admin.model.beans.NetworkProxy;
23 import org.apache.archiva.admin.model.beans.RemoteRepository;
24 import org.apache.archiva.admin.model.remote.RemoteRepositoryAdmin;
25 import org.apache.archiva.proxy.common.WagonFactory;
26 import org.apache.commons.io.FileUtils;
27 import org.apache.commons.lang.StringUtils;
28 import org.apache.commons.lang.time.StopWatch;
29 import org.apache.http.HttpEntity;
30 import org.apache.http.HttpException;
31 import org.apache.http.HttpHost;
32 import org.apache.http.HttpRequest;
33 import org.apache.http.HttpResponse;
34 import org.apache.http.HttpStatus;
35 import org.apache.http.auth.AuthScope;
36 import org.apache.http.auth.UsernamePasswordCredentials;
37 import org.apache.http.client.ClientProtocolException;
38 import org.apache.http.client.methods.HttpGet;
39 import org.apache.http.entity.ContentType;
40 import org.apache.http.impl.client.BasicCredentialsProvider;
41 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
42 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
43 import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
44 import org.apache.http.nio.ContentDecoder;
45 import org.apache.http.nio.ContentEncoder;
46 import org.apache.http.nio.IOControl;
47 import org.apache.http.nio.client.methods.ZeroCopyConsumer;
48 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
49 import org.apache.http.protocol.HttpContext;
50 import org.apache.maven.index.context.IndexingContext;
51 import org.apache.maven.index.updater.IndexUpdateRequest;
52 import org.apache.maven.index.updater.IndexUpdater;
53 import org.apache.maven.index.updater.ResourceFetcher;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import java.io.File;
58 import java.io.FileInputStream;
59 import java.io.FileNotFoundException;
60 import java.io.IOException;
61 import java.io.InputStream;
62 import java.lang.reflect.Field;
63 import java.net.MalformedURLException;
64 import java.net.URL;
65 import java.util.List;
66 import java.util.Map;
67 import java.util.concurrent.ExecutionException;
68 import java.util.concurrent.Future;
69 import java.util.concurrent.TimeUnit;
70 import java.util.concurrent.TimeoutException;
71
72 /**
73  * @author Olivier Lamy
74  * @since 1.4-M1
75  */
76 public class DownloadRemoteIndexTask
77     implements Runnable
78 {
79     private Logger log = LoggerFactory.getLogger( getClass() );
80
81     private RemoteRepository remoteRepository;
82
83     private RemoteRepositoryAdmin remoteRepositoryAdmin;
84
85     private WagonFactory wagonFactory;
86
87     private NetworkProxy networkProxy;
88
89     private boolean fullDownload;
90
91     private List<String> runningRemoteDownloadIds;
92
93     private IndexUpdater indexUpdater;
94
95
96     public DownloadRemoteIndexTask( DownloadRemoteIndexTaskRequest downloadRemoteIndexTaskRequest,
97                                     List<String> runningRemoteDownloadIds )
98     {
99         this.remoteRepository = downloadRemoteIndexTaskRequest.getRemoteRepository();
100         this.wagonFactory = downloadRemoteIndexTaskRequest.getWagonFactory();
101         this.networkProxy = downloadRemoteIndexTaskRequest.getNetworkProxy();
102         this.fullDownload = downloadRemoteIndexTaskRequest.isFullDownload();
103         this.runningRemoteDownloadIds = runningRemoteDownloadIds;
104         this.indexUpdater = downloadRemoteIndexTaskRequest.getIndexUpdater();
105         this.remoteRepositoryAdmin = downloadRemoteIndexTaskRequest.getRemoteRepositoryAdmin();
106     }
107
108     public void run()
109     {
110
111         // so short lock : not sure we need it
112         synchronized ( this.runningRemoteDownloadIds )
113         {
114             if ( this.runningRemoteDownloadIds.contains( this.remoteRepository.getId() ) )
115             {
116                 // skip it as it's running
117                 log.info( "skip download index remote for repo {} it's already running",
118                           this.remoteRepository.getId() );
119                 return;
120             }
121             this.runningRemoteDownloadIds.add( this.remoteRepository.getId() );
122         }
123         File tempIndexDirectory = null;
124         StopWatch stopWatch = new StopWatch();
125         stopWatch.start();
126         try
127         {
128             log.info( "start download remote index for remote repository {}", this.remoteRepository.getId() );
129             IndexingContext indexingContext = remoteRepositoryAdmin.createIndexContext( this.remoteRepository );
130
131             // create a temp directory to download files
132             tempIndexDirectory = new File( indexingContext.getIndexDirectoryFile().getParent(), ".tmpIndex" );
133             File indexCacheDirectory = new File( indexingContext.getIndexDirectoryFile().getParent(), ".indexCache" );
134             indexCacheDirectory.mkdirs();
135             if ( tempIndexDirectory.exists() )
136             {
137                 FileUtils.deleteDirectory( tempIndexDirectory );
138             }
139             tempIndexDirectory.mkdirs();
140             tempIndexDirectory.deleteOnExit();
141             String baseIndexUrl = indexingContext.getIndexUpdateUrl();
142
143             URL indexUrl = new URL( baseIndexUrl );
144
145             /*
146             String wagonProtocol = new URL( this.remoteRepository.getUrl() ).getProtocol();
147
148             final StreamWagon wagon = (StreamWagon) wagonFactory.getWagon(
149                 new WagonFactoryRequest( wagonProtocol, this.remoteRepository.getExtraHeaders() ).networkProxy(
150                     this.networkProxy ) );
151             int timeoutInMilliseconds = remoteRepository.getTimeout() * 1000;
152             // FIXME olamy having 2 config values
153             wagon.setReadTimeout( timeoutInMilliseconds );
154             wagon.setTimeout( timeoutInMilliseconds );
155
156             if ( wagon instanceof AbstractHttpClientWagon )
157             {
158                 HttpConfiguration httpConfiguration = new HttpConfiguration();
159                 HttpMethodConfiguration httpMethodConfiguration = new HttpMethodConfiguration();
160                 httpMethodConfiguration.setUsePreemptive( true );
161                 httpMethodConfiguration.setReadTimeout( timeoutInMilliseconds );
162                 httpConfiguration.setGet( httpMethodConfiguration );
163                 ( (AbstractHttpClientWagon) wagon ).setHttpConfiguration( httpConfiguration );
164             }
165
166             wagon.addTransferListener( new DownloadListener() );
167             ProxyInfo proxyInfo = null;
168             if ( this.networkProxy != null )
169             {
170                 proxyInfo = new ProxyInfo();
171                 proxyInfo.setHost( this.networkProxy.getHost() );
172                 proxyInfo.setPort( this.networkProxy.getPort() );
173                 proxyInfo.setUserName( this.networkProxy.getUsername() );
174                 proxyInfo.setPassword( this.networkProxy.getPassword() );
175             }
176             AuthenticationInfo authenticationInfo = null;
177             if ( this.remoteRepository.getUserName() != null )
178             {
179                 authenticationInfo = new AuthenticationInfo();
180                 authenticationInfo.setUserName( this.remoteRepository.getUserName() );
181                 authenticationInfo.setPassword( this.remoteRepository.getPassword() );
182             }
183             wagon.connect( new Repository( this.remoteRepository.getId(), baseIndexUrl ), authenticationInfo,
184                            proxyInfo );
185             */
186             //---------------------------------------------
187
188             HttpAsyncClientBuilder builder = HttpAsyncClientBuilder.create();
189
190             BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
191
192             if ( this.networkProxy != null )
193             {
194                 HttpHost httpHost = new HttpHost( this.networkProxy.getHost(), this.networkProxy.getPort() );
195                 builder = builder.setProxy( httpHost );
196
197                 if ( this.networkProxy.getUsername() != null )
198                 {
199                     basicCredentialsProvider.setCredentials(
200                         new AuthScope( this.networkProxy.getHost(), this.networkProxy.getPort(), null, null ),
201                         new UsernamePasswordCredentials( this.networkProxy.getUsername(),
202                                                          this.networkProxy.getPassword() ) );
203                 }
204
205             }
206
207             if ( this.remoteRepository.getUserName() != null )
208             {
209                 basicCredentialsProvider.setCredentials(
210                     new AuthScope( indexUrl.getHost(), indexUrl.getPort(), null, null ),
211                     new UsernamePasswordCredentials( this.remoteRepository.getUserName(),
212                                                      this.remoteRepository.getPassword() ) );
213
214             }
215
216             builder = builder.setDefaultCredentialsProvider( basicCredentialsProvider );
217
218             File indexDirectory = indexingContext.getIndexDirectoryFile();
219             if ( !indexDirectory.exists() )
220             {
221                 indexDirectory.mkdirs();
222             }
223
224             CloseableHttpAsyncClient closeableHttpAsyncClient = builder.build();
225             closeableHttpAsyncClient.start();
226             ResourceFetcher resourceFetcher =
227                 new ZeroCopyResourceFetcher( log, tempIndexDirectory, remoteRepository, closeableHttpAsyncClient,
228                                              baseIndexUrl );
229
230             IndexUpdateRequest request = new IndexUpdateRequest( indexingContext, resourceFetcher );
231             request.setForceFullUpdate( this.fullDownload );
232             request.setLocalIndexCacheDir( indexCacheDirectory );
233
234             this.indexUpdater.fetchAndUpdateIndex( request );
235
236             // index packing optionnal ??
237             //IndexPackingRequest indexPackingRequest =
238             //    new IndexPackingRequest( indexingContext, indexingContext.getIndexDirectoryFile() );
239             //indexPacker.packIndex( indexPackingRequest );
240
241             indexingContext.updateTimestamp( true );
242
243             stopWatch.stop();
244             log.info( "time update index from remote for repository {}: {} s", this.remoteRepository.getId(),
245                       ( stopWatch.getTime() / 1000 ) );
246
247
248         }
249         catch ( MalformedURLException e )
250         {
251             log.error( e.getMessage(), e );
252             throw new RuntimeException( e.getMessage(), e );
253         }
254         catch ( IOException e )
255         {
256             log.error( e.getMessage(), e );
257             throw new RuntimeException( e.getMessage(), e );
258         }
259         catch ( RepositoryAdminException e )
260         {
261             log.error( e.getMessage(), e );
262             throw new RuntimeException( e.getMessage(), e );
263         }
264         finally
265         {
266             deleteDirectoryQuiet( tempIndexDirectory );
267             this.runningRemoteDownloadIds.remove( this.remoteRepository.getId() );
268         }
269         log.info( "end download remote index for remote repository {}", this.remoteRepository.getId() );
270     }
271
272     private void deleteDirectoryQuiet( File f )
273     {
274         try
275         {
276             FileUtils.deleteDirectory( f );
277         }
278         catch ( IOException e )
279         {
280             log.warn( "skip error delete {} : {}", f, e.getMessage() );
281         }
282     }
283
284     private static class ZeroCopyConsumerListener
285         extends ZeroCopyConsumer
286     {
287         private Logger log = LoggerFactory.getLogger( getClass() );
288
289         private String resourceName;
290
291         private long startTime;
292
293         private long totalLength = 0;
294
295         //private long currentLength = 0;
296
297         private ZeroCopyConsumerListener( File file, String resourceName )
298             throws FileNotFoundException
299         {
300             super( file );
301             this.resourceName = resourceName;
302         }
303
304         @Override
305         protected File process( final HttpResponse response, final File file, final ContentType contentType )
306             throws Exception
307         {
308             if ( response.getStatusLine().getStatusCode() != HttpStatus.SC_OK )
309             {
310                 throw new ClientProtocolException( "Download failed: " + response.getStatusLine() );
311             }
312             long endTime = System.currentTimeMillis();
313             log.info( "end of transfer file {} {} kb: {}s", resourceName, this.totalLength / 1024,
314                       ( endTime - startTime ) / 1000 );
315             return file;
316         }
317
318         @Override
319         protected void onContentReceived( ContentDecoder decoder, IOControl ioControl )
320             throws IOException
321         {
322             if ( decoder instanceof LengthDelimitedDecoder )
323             {
324                 LengthDelimitedDecoder ldl = LengthDelimitedDecoder.class.cast( decoder );
325                 long len = getLen( ldl );
326                 if ( len > -1 )
327                 {
328                     log.debug( "transfer of {} : {}/{}", resourceName, len / 1024, this.totalLength / 1024 );
329                 }
330             }
331
332             super.onContentReceived( decoder, ioControl );
333         }
334
335         @Override
336         protected void onResponseReceived( HttpResponse response )
337         {
338             this.startTime = System.currentTimeMillis();
339             super.onResponseReceived( response );
340             this.totalLength = response.getEntity().getContentLength();
341             log.info( "start transfer of {}, contentLength: {}", resourceName, this.totalLength / 1024 );
342         }
343
344         @Override
345         protected void onEntityEnclosed( HttpEntity entity, ContentType contentType )
346             throws IOException
347         {
348             super.onEntityEnclosed( entity, contentType );
349         }
350
351         private long getLen( LengthDelimitedDecoder ldl )
352         {
353             try
354             {
355                 Field lenField = LengthDelimitedDecoder.class.getDeclaredField( "len" );
356                 lenField.setAccessible( true );
357                 long len = (Long) lenField.get( ldl );
358                 return len;
359             }
360             catch ( NoSuchFieldException e )
361             {
362                 log.debug( e.getMessage(), e );
363                 return -1;
364             }
365             catch ( IllegalAccessException e )
366             {
367                 log.debug( e.getMessage(), e );
368                 return -1;
369             }
370         }
371     }
372
373     private static class ZeroCopyResourceFetcher
374         implements ResourceFetcher
375     {
376
377         Logger log;
378
379         File tempIndexDirectory;
380
381         final RemoteRepository remoteRepository;
382
383         CloseableHttpAsyncClient httpclient;
384
385         String baseIndexUrl;
386
387         private ZeroCopyResourceFetcher( Logger log, File tempIndexDirectory, RemoteRepository remoteRepository,
388                                          CloseableHttpAsyncClient httpclient, String baseIndexUrl )
389         {
390             this.log = log;
391             this.tempIndexDirectory = tempIndexDirectory;
392             this.remoteRepository = remoteRepository;
393             this.httpclient = httpclient;
394             this.baseIndexUrl = baseIndexUrl;
395         }
396
397         public void connect( String id, String url )
398             throws IOException
399         {
400             //no op
401         }
402
403         public void disconnect()
404             throws IOException
405         {
406             // no op
407         }
408
409         public InputStream retrieve( final String name )
410             throws IOException
411         {
412
413             log.info( "index update retrieve file, name:{}", name );
414             File file = new File( tempIndexDirectory, name );
415             if ( file.exists() )
416             {
417                 file.delete();
418             }
419             file.deleteOnExit();
420
421             ZeroCopyConsumer<File> consumer = new ZeroCopyConsumerListener( file, name );
422
423             URL targetUrl = new URL( this.baseIndexUrl );
424             final HttpHost targetHost = new HttpHost( targetUrl.getHost(), targetUrl.getPort() );
425
426             Future<File> httpResponseFuture = httpclient.execute( new HttpAsyncRequestProducer()
427             {
428                 @Override
429                 public HttpHost getTarget()
430                 {
431                     return targetHost;
432                 }
433
434                 @Override
435                 public HttpRequest generateRequest()
436                     throws IOException, HttpException
437                 {
438                     StringBuilder url = new StringBuilder( baseIndexUrl );
439                     if ( !StringUtils.endsWith( baseIndexUrl, "/" ) )
440                     {
441                         url.append( '/' );
442                     }
443                     HttpGet httpGet = new HttpGet( url.append( addParameters( name, remoteRepository ) ).toString() );
444                     return httpGet;
445                 }
446
447                 @Override
448                 public void produceContent( ContentEncoder encoder, IOControl ioctrl )
449                     throws IOException
450                 {
451                     // no op
452                 }
453
454                 @Override
455                 public void requestCompleted( HttpContext context )
456                 {
457                     log.debug( "requestCompleted" );
458                 }
459
460                 @Override
461                 public void failed( Exception ex )
462                 {
463                     log.error( "http request failed", ex );
464                 }
465
466                 @Override
467                 public boolean isRepeatable()
468                 {
469                     log.debug( "isRepeatable" );
470                     return true;
471                 }
472
473                 @Override
474                 public void resetRequest()
475                     throws IOException
476                 {
477                     log.debug( "resetRequest" );
478                 }
479
480                 @Override
481                 public void close()
482                     throws IOException
483                 {
484                     log.debug( "close" );
485                 }
486
487             }, consumer, null );
488             try
489             {
490                 int timeOut = this.remoteRepository.getRemoteDownloadTimeout();
491                 file = timeOut > 0 ? httpResponseFuture.get( timeOut, TimeUnit.SECONDS ) : httpResponseFuture.get();
492             }
493             catch ( InterruptedException e )
494             {
495                 throw new IOException( e.getMessage(), e );
496             }
497             catch ( ExecutionException e )
498             {
499                 throw new IOException( e.getMessage(), e );
500             }
501             catch ( TimeoutException e )
502             {
503                 throw new IOException( e.getMessage(), e );
504             }
505             return new FileInputStream( file );
506         }
507
508     }
509
510     // FIXME remove crappy copy/paste
511     protected static String addParameters( String path, RemoteRepository remoteRepository )
512     {
513         if ( remoteRepository.getExtraParameters().isEmpty() )
514         {
515             return path;
516         }
517
518         boolean question = false;
519
520         StringBuilder res = new StringBuilder( path == null ? "" : path );
521
522         for ( Map.Entry<String, String> entry : remoteRepository.getExtraParameters().entrySet() )
523         {
524             if ( !question )
525             {
526                 res.append( '?' ).append( entry.getKey() ).append( '=' ).append( entry.getValue() );
527             }
528         }
529
530         return res.toString();
531     }
532
533
534 }
535
536