]> source.dussan.org Git - archiva.git/blob
665860d9cdfabe9a6256d35cae1ceafd13faa456
[archiva.git] /
1 package org.apache.archiva.scheduler.indexing;
2 /*
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  */
20
21 import org.apache.archiva.admin.model.RepositoryAdminException;
22 import org.apache.archiva.admin.model.beans.NetworkProxy;
23 import org.apache.archiva.admin.model.beans.RemoteRepository;
24 import org.apache.archiva.admin.model.remote.RemoteRepositoryAdmin;
25 import org.apache.archiva.proxy.common.WagonFactory;
26 import org.apache.commons.io.FileUtils;
27 import org.apache.commons.lang.StringUtils;
28 import org.apache.commons.lang.time.StopWatch;
29 import org.apache.http.HttpEntity;
30 import org.apache.http.HttpException;
31 import org.apache.http.HttpHost;
32 import org.apache.http.HttpRequest;
33 import org.apache.http.HttpResponse;
34 import org.apache.http.HttpStatus;
35 import org.apache.http.auth.AuthScope;
36 import org.apache.http.auth.UsernamePasswordCredentials;
37 import org.apache.http.client.ClientProtocolException;
38 import org.apache.http.client.methods.HttpGet;
39 import org.apache.http.entity.ContentType;
40 import org.apache.http.impl.client.BasicCredentialsProvider;
41 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
42 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
43 import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
44 import org.apache.http.nio.ContentDecoder;
45 import org.apache.http.nio.ContentEncoder;
46 import org.apache.http.nio.IOControl;
47 import org.apache.http.nio.client.methods.ZeroCopyConsumer;
48 import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
49 import org.apache.http.protocol.HttpContext;
50 import org.apache.maven.index.context.IndexingContext;
51 import org.apache.maven.index.updater.IndexUpdateRequest;
52 import org.apache.maven.index.updater.IndexUpdater;
53 import org.apache.maven.index.updater.ResourceFetcher;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import java.io.File;
58 import java.io.FileInputStream;
59 import java.io.FileNotFoundException;
60 import java.io.IOException;
61 import java.io.InputStream;
62 import java.lang.reflect.Field;
63 import java.net.MalformedURLException;
64 import java.net.URL;
65 import java.util.List;
66 import java.util.Map;
67 import java.util.concurrent.ExecutionException;
68 import java.util.concurrent.Future;
69 import java.util.concurrent.TimeUnit;
70 import java.util.concurrent.TimeoutException;
71
72 /**
73  * @author Olivier Lamy
74  * @since 1.4-M1
75  */
76 public class DownloadRemoteIndexTask
77     implements Runnable
78 {
79     private Logger log = LoggerFactory.getLogger( getClass() );
80
81     private RemoteRepository remoteRepository;
82
83     private RemoteRepositoryAdmin remoteRepositoryAdmin;
84
85     private WagonFactory wagonFactory;
86
87     private NetworkProxy networkProxy;
88
89     private boolean fullDownload;
90
91     private List<String> runningRemoteDownloadIds;
92
93     private IndexUpdater indexUpdater;
94
95
96     public DownloadRemoteIndexTask( DownloadRemoteIndexTaskRequest downloadRemoteIndexTaskRequest,
97                                     List<String> runningRemoteDownloadIds )
98     {
99         this.remoteRepository = downloadRemoteIndexTaskRequest.getRemoteRepository();
100         this.wagonFactory = downloadRemoteIndexTaskRequest.getWagonFactory();
101         this.networkProxy = downloadRemoteIndexTaskRequest.getNetworkProxy();
102         this.fullDownload = downloadRemoteIndexTaskRequest.isFullDownload();
103         this.runningRemoteDownloadIds = runningRemoteDownloadIds;
104         this.indexUpdater = downloadRemoteIndexTaskRequest.getIndexUpdater();
105         this.remoteRepositoryAdmin = downloadRemoteIndexTaskRequest.getRemoteRepositoryAdmin();
106     }
107
108     public void run()
109     {
110
111         // so short lock : not sure we need it
112         synchronized ( this.runningRemoteDownloadIds )
113         {
114             if ( this.runningRemoteDownloadIds.contains( this.remoteRepository.getId() ) )
115             {
116                 // skip it as it's running
117                 log.info( "skip download index remote for repo {} it's already running",
118                           this.remoteRepository.getId() );
119                 return;
120             }
121             this.runningRemoteDownloadIds.add( this.remoteRepository.getId() );
122         }
123         File tempIndexDirectory = null;
124         StopWatch stopWatch = new StopWatch();
125         stopWatch.start();
126         try
127         {
128             log.info( "start download remote index for remote repository {}", this.remoteRepository.getId() );
129             IndexingContext indexingContext = remoteRepositoryAdmin.createIndexContext( this.remoteRepository );
130
131             // create a temp directory to download files
132             tempIndexDirectory = new File( indexingContext.getIndexDirectoryFile().getParent(), ".tmpIndex" );
133             File indexCacheDirectory = new File( indexingContext.getIndexDirectoryFile().getParent(), ".indexCache" );
134             indexCacheDirectory.mkdirs();
135             if ( tempIndexDirectory.exists() )
136             {
137                 FileUtils.deleteDirectory( tempIndexDirectory );
138             }
139             tempIndexDirectory.mkdirs();
140             tempIndexDirectory.deleteOnExit();
141             String baseIndexUrl = indexingContext.getIndexUpdateUrl();
142
143             URL indexUrl = new URL( baseIndexUrl );
144
145             /*
146             String wagonProtocol = new URL( this.remoteRepository.getUrl() ).getProtocol();
147
148             final StreamWagon wagon = (StreamWagon) wagonFactory.getWagon(
149                 new WagonFactoryRequest( wagonProtocol, this.remoteRepository.getExtraHeaders() ).networkProxy(
150                     this.networkProxy ) );
151             int timeoutInMilliseconds = remoteRepository.getTimeout() * 1000;
152             // FIXME olamy having 2 config values
153             wagon.setReadTimeout( timeoutInMilliseconds );
154             wagon.setTimeout( timeoutInMilliseconds );
155
156             if ( wagon instanceof AbstractHttpClientWagon )
157             {
158                 HttpConfiguration httpConfiguration = new HttpConfiguration();
159                 HttpMethodConfiguration httpMethodConfiguration = new HttpMethodConfiguration();
160                 httpMethodConfiguration.setUsePreemptive( true );
161                 httpMethodConfiguration.setReadTimeout( timeoutInMilliseconds );
162                 httpConfiguration.setGet( httpMethodConfiguration );
163                 ( (AbstractHttpClientWagon) wagon ).setHttpConfiguration( httpConfiguration );
164             }
165
166             wagon.addTransferListener( new DownloadListener() );
167             ProxyInfo proxyInfo = null;
168             if ( this.networkProxy != null )
169             {
170                 proxyInfo = new ProxyInfo();
171                 proxyInfo.setHost( this.networkProxy.getHost() );
172                 proxyInfo.setPort( this.networkProxy.getPort() );
173                 proxyInfo.setUserName( this.networkProxy.getUsername() );
174                 proxyInfo.setPassword( this.networkProxy.getPassword() );
175             }
176             AuthenticationInfo authenticationInfo = null;
177             if ( this.remoteRepository.getUserName() != null )
178             {
179                 authenticationInfo = new AuthenticationInfo();
180                 authenticationInfo.setUserName( this.remoteRepository.getUserName() );
181                 authenticationInfo.setPassword( this.remoteRepository.getPassword() );
182             }
183             wagon.connect( new Repository( this.remoteRepository.getId(), baseIndexUrl ), authenticationInfo,
184                            proxyInfo );
185             */
186             //---------------------------------------------
187
188             HttpAsyncClientBuilder builder = HttpAsyncClientBuilder.create();
189
190             BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
191
192             if ( this.networkProxy != null )
193             {
194                 HttpHost httpHost = new HttpHost( this.networkProxy.getHost(), this.networkProxy.getPort() );
195                 builder = builder.setProxy( httpHost );
196
197                 if ( this.networkProxy.getUsername() != null )
198                 {
199                     basicCredentialsProvider.setCredentials(
200                         new AuthScope( this.networkProxy.getHost(), this.networkProxy.getPort(), null, null ),
201                         new UsernamePasswordCredentials( this.networkProxy.getUsername(),
202                                                          this.networkProxy.getPassword() ) );
203                 }
204
205             }
206
207             if ( this.remoteRepository.getUserName() != null )
208             {
209                 basicCredentialsProvider.setCredentials(
210                     new AuthScope( indexUrl.getHost(), indexUrl.getPort(), null, null ),
211                     new UsernamePasswordCredentials( this.remoteRepository.getUserName(),
212                                                      this.remoteRepository.getPassword() ) );
213
214             }
215
216             builder = builder.setDefaultCredentialsProvider( basicCredentialsProvider );
217
218             File indexDirectory = indexingContext.getIndexDirectoryFile();
219             if ( !indexDirectory.exists() )
220             {
221                 indexDirectory.mkdirs();
222             }
223
224             CloseableHttpAsyncClient closeableHttpAsyncClient = builder.build();
225             closeableHttpAsyncClient.start();
226             ResourceFetcher resourceFetcher =
227                 new ZeroCopyResourceFetcher( log, tempIndexDirectory, remoteRepository, closeableHttpAsyncClient,
228                                              baseIndexUrl );
229
230             IndexUpdateRequest request = new IndexUpdateRequest( indexingContext, resourceFetcher );
231             request.setForceFullUpdate( this.fullDownload );
232             request.setLocalIndexCacheDir( indexCacheDirectory );
233
234             this.indexUpdater.fetchAndUpdateIndex( request );
235             stopWatch.stop();
236             log.info( "time update index from remote for repository {}: {} s", this.remoteRepository.getId(),
237                       ( stopWatch.getTime() / 1000 ) );
238
239             // index packing optionnal ??
240             //IndexPackingRequest indexPackingRequest =
241             //    new IndexPackingRequest( indexingContext, indexingContext.getIndexDirectoryFile() );
242             //indexPacker.packIndex( indexPackingRequest );
243             indexingContext.updateTimestamp( true );
244
245         }
246         catch ( MalformedURLException e )
247         {
248             log.error( e.getMessage(), e );
249             throw new RuntimeException( e.getMessage(), e );
250         }
251         catch ( IOException e )
252         {
253             log.error( e.getMessage(), e );
254             throw new RuntimeException( e.getMessage(), e );
255         }
256         catch ( RepositoryAdminException e )
257         {
258             log.error( e.getMessage(), e );
259             throw new RuntimeException( e.getMessage(), e );
260         }
261         finally
262         {
263             deleteDirectoryQuiet( tempIndexDirectory );
264             this.runningRemoteDownloadIds.remove( this.remoteRepository.getId() );
265         }
266         log.info( "end download remote index for remote repository " + this.remoteRepository.getId() );
267     }
268
269     private void deleteDirectoryQuiet( File f )
270     {
271         try
272         {
273             FileUtils.deleteDirectory( f );
274         }
275         catch ( IOException e )
276         {
277             log.warn( "skip error delete {} : {}", f, e.getMessage() );
278         }
279     }
280
281     private static class ZeroCopyConsumerListener
282         extends ZeroCopyConsumer
283     {
284         private Logger log = LoggerFactory.getLogger( getClass() );
285
286         private String resourceName;
287
288         private long startTime;
289
290         private long totalLength = 0;
291
292         //private long currentLength = 0;
293
294         private ZeroCopyConsumerListener( File file, String resourceName )
295             throws FileNotFoundException
296         {
297             super( file );
298             this.resourceName = resourceName;
299         }
300
301         @Override
302         protected File process( final HttpResponse response, final File file, final ContentType contentType )
303             throws Exception
304         {
305             if ( response.getStatusLine().getStatusCode() != HttpStatus.SC_OK )
306             {
307                 throw new ClientProtocolException( "Upload failed: " + response.getStatusLine() );
308             }
309             long endTime = System.currentTimeMillis();
310             log.info( "end of transfer file {} {} kb: {}s", resourceName, this.totalLength / 1024,
311                       ( endTime - startTime ) / 1000 );
312             return file;
313         }
314
315         @Override
316         protected void onContentReceived( ContentDecoder decoder, IOControl ioControl )
317             throws IOException
318         {
319             if ( decoder instanceof LengthDelimitedDecoder )
320             {
321                 LengthDelimitedDecoder ldl = LengthDelimitedDecoder.class.cast( decoder );
322                 long len = getLen( ldl );
323                 if ( len > -1 )
324                 {
325                     log.debug( "transfer of {} : {}/{}", resourceName, len / 1024, this.totalLength / 1024 );
326                 }
327             }
328
329             super.onContentReceived( decoder, ioControl );
330         }
331
332         @Override
333         protected void onResponseReceived( HttpResponse response )
334         {
335             this.startTime = System.currentTimeMillis();
336             super.onResponseReceived( response );
337             this.totalLength = response.getEntity().getContentLength();
338             log.info( "start transfer of {}, contentLength: {}", resourceName, this.totalLength / 1024 );
339         }
340
341         @Override
342         protected void onEntityEnclosed( HttpEntity entity, ContentType contentType )
343             throws IOException
344         {
345             super.onEntityEnclosed( entity, contentType );
346         }
347
348         private long getLen( LengthDelimitedDecoder ldl )
349         {
350             try
351             {
352                 Field lenField = LengthDelimitedDecoder.class.getDeclaredField( "len" );
353                 lenField.setAccessible( true );
354                 long len = (Long) lenField.get( ldl );
355                 return len;
356             }
357             catch ( NoSuchFieldException e )
358             {
359                 log.debug( e.getMessage(), e );
360                 return -1;
361             }
362             catch ( IllegalAccessException e )
363             {
364                 log.debug( e.getMessage(), e );
365                 return -1;
366             }
367         }
368     }
369
370     private static class ZeroCopyResourceFetcher
371         implements ResourceFetcher
372     {
373
374         Logger log;
375
376         File tempIndexDirectory;
377
378         final RemoteRepository remoteRepository;
379
380         CloseableHttpAsyncClient httpclient;
381
382         String baseIndexUrl;
383
384         private ZeroCopyResourceFetcher( Logger log, File tempIndexDirectory, RemoteRepository remoteRepository,
385                                          CloseableHttpAsyncClient httpclient, String baseIndexUrl )
386         {
387             this.log = log;
388             this.tempIndexDirectory = tempIndexDirectory;
389             this.remoteRepository = remoteRepository;
390             this.httpclient = httpclient;
391             this.baseIndexUrl = baseIndexUrl;
392         }
393
394         public void connect( String id, String url )
395             throws IOException
396         {
397             //no op
398         }
399
400         public void disconnect()
401             throws IOException
402         {
403             // no op
404         }
405
406         public InputStream retrieve( final String name )
407             throws IOException
408         {
409
410             log.info( "index update retrieve file, name:{}", name );
411             File file = new File( tempIndexDirectory, name );
412             if ( file.exists() )
413             {
414                 file.delete();
415             }
416             file.deleteOnExit();
417
418             ZeroCopyConsumer<File> consumer = new ZeroCopyConsumerListener( file, name );
419
420             URL targetUrl = new URL( this.baseIndexUrl );
421             final HttpHost targetHost = new HttpHost( targetUrl.getHost(), targetUrl.getPort() );
422
423             Future<File> httpResponseFuture = httpclient.execute( new HttpAsyncRequestProducer()
424             {
425                 @Override
426                 public HttpHost getTarget()
427                 {
428                     return targetHost;
429                 }
430
431                 @Override
432                 public HttpRequest generateRequest()
433                     throws IOException, HttpException
434                 {
435                     StringBuilder url = new StringBuilder( baseIndexUrl );
436                     if ( !StringUtils.endsWith( baseIndexUrl, "/" ) )
437                     {
438                         url.append( '/' );
439                     }
440                     HttpGet httpGet = new HttpGet( url.append( addParameters( name, remoteRepository ) ).toString() );
441                     return httpGet;
442                 }
443
444                 @Override
445                 public void produceContent( ContentEncoder encoder, IOControl ioctrl )
446                     throws IOException
447                 {
448                     // no op
449                 }
450
451                 @Override
452                 public void requestCompleted( HttpContext context )
453                 {
454                     log.debug( "requestCompleted" );
455                 }
456
457                 @Override
458                 public void failed( Exception ex )
459                 {
460                     log.error( "http request failed", ex );
461                 }
462
463                 @Override
464                 public boolean isRepeatable()
465                 {
466                     log.debug( "isRepeatable" );
467                     return true;
468                 }
469
470                 @Override
471                 public void resetRequest()
472                     throws IOException
473                 {
474                     log.debug( "resetRequest" );
475                 }
476
477                 @Override
478                 public void close()
479                     throws IOException
480                 {
481                     log.debug( "close" );
482                 }
483
484             }, consumer, null );
485             try
486             {
487                 int timeOut = this.remoteRepository.getRemoteDownloadTimeout();
488                 file = timeOut > 0 ? httpResponseFuture.get( timeOut, TimeUnit.SECONDS ) : httpResponseFuture.get();
489             }
490             catch ( InterruptedException e )
491             {
492                 throw new IOException( e.getMessage(), e );
493             }
494             catch ( ExecutionException e )
495             {
496                 throw new IOException( e.getMessage(), e );
497             }
498             catch ( TimeoutException e )
499             {
500                 throw new IOException( e.getMessage(), e );
501             }
502             return new FileInputStream( file );
503         }
504
505     }
506
507     // FIXME remove crappy copy/paste
508     protected static String addParameters( String path, RemoteRepository remoteRepository )
509     {
510         if ( remoteRepository.getExtraParameters().isEmpty() )
511         {
512             return path;
513         }
514
515         boolean question = false;
516
517         StringBuilder res = new StringBuilder( path == null ? "" : path );
518
519         for ( Map.Entry<String, String> entry : remoteRepository.getExtraParameters().entrySet() )
520         {
521             if ( !question )
522             {
523                 res.append( '?' ).append( entry.getKey() ).append( '=' ).append( entry.getValue() );
524             }
525         }
526
527         return res.toString();
528     }
529
530
531 }
532
533