From 222ddd5b7d8f3f2798623ba9a6cf08c091a517f2 Mon Sep 17 00:00:00 2001 From: Olivier Lamy Date: Thu, 7 Nov 2013 05:21:39 +0000 Subject: [PATCH] use zero copy transfer for huge maven index git-svn-id: https://svn.apache.org/repos/asf/archiva/trunk@1539520 13f79535-47bb-0310-9956-ffa450edef68 --- .../archiva-scheduler-indexing/pom.xml | 5 + .../indexing/DownloadRemoteIndexTask.java | 224 +++++++++++++++++- .../src/test/resources/log4j2-test.xml | 4 +- pom.xml | 7 + 4 files changed, 229 insertions(+), 11 deletions(-) diff --git a/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/pom.xml b/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/pom.xml index 39d95b9f1..66ea8c1ed 100644 --- a/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/pom.xml +++ b/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/pom.xml @@ -71,6 +71,10 @@ + + org.apache.httpcomponents + httpasyncclient + org.springframework spring-context @@ -79,6 +83,7 @@ org.slf4j slf4j-api + org.eclipse.jetty jetty-server diff --git a/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/src/main/java/org/apache/archiva/scheduler/indexing/DownloadRemoteIndexTask.java b/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/src/main/java/org/apache/archiva/scheduler/indexing/DownloadRemoteIndexTask.java index b0bea81c0..a25473300 100644 --- a/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/src/main/java/org/apache/archiva/scheduler/indexing/DownloadRemoteIndexTask.java +++ b/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/src/main/java/org/apache/archiva/scheduler/indexing/DownloadRemoteIndexTask.java @@ -26,7 +26,29 @@ import org.apache.archiva.proxy.common.WagonFactory; import org.apache.archiva.proxy.common.WagonFactoryException; import org.apache.archiva.proxy.common.WagonFactoryRequest; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.StopWatch; +import org.apache.http.HttpException; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.Credentials; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.client.methods.ZeroCopyConsumer; +import org.apache.http.nio.protocol.HttpAsyncRequestProducer; +import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; +import org.apache.http.protocol.HttpContext; import org.apache.maven.index.context.IndexingContext; import org.apache.maven.index.updater.IndexUpdateRequest; import org.apache.maven.index.updater.IndexUpdater; @@ -56,8 +78,13 @@ import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; +import java.security.Principal; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * @author Olivier Lamy @@ -170,6 +197,23 @@ public class DownloadRemoteIndexTask wagon.connect( new Repository( this.remoteRepository.getId(), baseIndexUrl ), authenticationInfo, proxyInfo ); + //--------------------------------------------- + + HttpAsyncClientBuilder builder = HttpAsyncClientBuilder.create(); + + if ( this.networkProxy != null ) + { + HttpHost httpHost = new HttpHost( this.networkProxy.getHost(), this.networkProxy.getPort() ); + builder = builder.setProxy( httpHost ); + } + + if ( this.remoteRepository.getUserName() != null ) + { + BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider(); + basicCredentialsProvider.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials( + this.remoteRepository.getUserName(), this.remoteRepository.getPassword() ) ); + } + File indexDirectory = indexingContext.getIndexDirectoryFile(); if ( !indexDirectory.exists() ) { @@ -178,6 +222,12 @@ public class DownloadRemoteIndexTask ResourceFetcher resourceFetcher = new WagonResourceFetcher( log, tempIndexDirectory, wagon, remoteRepository ); + CloseableHttpAsyncClient closeableHttpAsyncClient = builder.build(); + closeableHttpAsyncClient.start(); + resourceFetcher = + new ZeroCopyResourceFetcher( log, tempIndexDirectory, remoteRepository, closeableHttpAsyncClient, + baseIndexUrl ); + IndexUpdateRequest request = new IndexUpdateRequest( indexingContext, resourceFetcher ); request.setForceFullUpdate( this.fullDownload ); request.setLocalIndexCacheDir( indexCacheDirectory ); @@ -360,29 +410,183 @@ public class DownloadRemoteIndexTask } } - // FIXME remove crappy copy/paste - protected String addParameters( String path, RemoteRepository remoteRepository ) + } + + private static class ZeroCopyResourceFetcher + implements ResourceFetcher + { + + Logger log; + + File tempIndexDirectory; + + final RemoteRepository remoteRepository; + + CloseableHttpAsyncClient httpclient; + + String baseIndexUrl; + + private ZeroCopyResourceFetcher( Logger log, File tempIndexDirectory, RemoteRepository remoteRepository, + CloseableHttpAsyncClient httpclient, String baseIndexUrl ) + { + this.log = log; + this.tempIndexDirectory = tempIndexDirectory; + this.remoteRepository = remoteRepository; + this.httpclient = httpclient; + this.baseIndexUrl = baseIndexUrl; + } + + public void connect( String id, String url ) + throws IOException { - if ( remoteRepository.getExtraParameters().isEmpty() ) + //no op + } + + public void disconnect() + throws IOException + { + // no op + } + + public InputStream retrieve( final String name ) + throws IOException, FileNotFoundException + { + + log.info( "index update retrieve file, name:{}", name ); + File file = new File( tempIndexDirectory, name ); + if ( file.exists() ) { - return path; + file.delete(); } + file.deleteOnExit(); + + ZeroCopyConsumer consumer = new ZeroCopyConsumer( file ) + { - boolean question = false; + @Override + protected File process( final HttpResponse response, final File file, final ContentType contentType ) + throws Exception + { + if ( response.getStatusLine().getStatusCode() != HttpStatus.SC_OK ) + { + throw new ClientProtocolException( "Upload failed: " + response.getStatusLine() ); + } + return file; + } - StringBuilder res = new StringBuilder( path == null ? "" : path ); + @Override + protected void onContentReceived( ContentDecoder decoder, IOControl ioctrl ) + throws IOException + { + log.debug( "onContentReceived" ); + super.onContentReceived( decoder, ioctrl ); + } + }; + URL targetUrl = new URL( this.remoteRepository.getUrl() ); + final HttpHost targetHost = new HttpHost( targetUrl.getHost(), targetUrl.getPort() ); - for ( Map.Entry entry : remoteRepository.getExtraParameters().entrySet() ) + Future httpResponseFuture = httpclient.execute( new HttpAsyncRequestProducer() { - if ( !question ) + @Override + public HttpHost getTarget() + { + return targetHost; + } + + @Override + public HttpRequest generateRequest() + throws IOException, HttpException + { + StringBuilder url = new StringBuilder( baseIndexUrl ); + if ( !StringUtils.endsWith( baseIndexUrl, "/" ) ) + { + url.append( '/' ); + } + HttpGet httpGet = new HttpGet( url.append( addParameters( name, remoteRepository ) ).toString() ); + return httpGet; + } + + @Override + public void produceContent( ContentEncoder encoder, IOControl ioctrl ) + throws IOException + { + // no op + } + + @Override + public void requestCompleted( HttpContext context ) + { + // no op + } + + @Override + public void failed( Exception ex ) + { + log.error( "http request failed", ex ); + } + + @Override + public boolean isRepeatable() + { + return true; + } + + @Override + public void resetRequest() + throws IOException + { + // no op + } + + @Override + public void close() + throws IOException { - res.append( '?' ).append( entry.getKey() ).append( '=' ).append( entry.getValue() ); + // no op } + }, consumer, null ); + try + { + file = httpResponseFuture.get( this.remoteRepository.getTimeout(), TimeUnit.SECONDS ); + } + catch ( InterruptedException e ) + { + throw new IOException( e.getMessage(), e ); + } + catch ( ExecutionException e ) + { + throw new IOException( e.getMessage(), e ); + } + catch ( TimeoutException e ) + { + throw new IOException( e.getMessage(), e ); } + return new FileInputStream( file ); + } - return res.toString(); + } + + // FIXME remove crappy copy/paste + protected static String addParameters( String path, RemoteRepository remoteRepository ) + { + if ( remoteRepository.getExtraParameters().isEmpty() ) + { + return path; + } + + boolean question = false; + + StringBuilder res = new StringBuilder( path == null ? "" : path ); + + for ( Map.Entry entry : remoteRepository.getExtraParameters().entrySet() ) + { + if ( !question ) + { + res.append( '?' ).append( entry.getKey() ).append( '=' ).append( entry.getValue() ); + } } + return res.toString(); } diff --git a/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/src/test/resources/log4j2-test.xml b/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/src/test/resources/log4j2-test.xml index 740df697e..6e3ef0ff3 100644 --- a/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/src/test/resources/log4j2-test.xml +++ b/archiva-modules/archiva-scheduler/archiva-scheduler-indexing/src/test/resources/log4j2-test.xml @@ -30,7 +30,9 @@ - + diff --git a/pom.xml b/pom.xml index 155b1ea73..53ee97aef 100644 --- a/pom.xml +++ b/pom.xml @@ -1288,6 +1288,13 @@ + + + org.apache.httpcomponents + httpasyncclient + 4.0 + + org.apache.maven.wagon wagon-provider-api -- 2.39.5