diff options
author | Martin Stockhammer <martin_s@apache.org> | 2019-08-20 23:07:22 +0200 |
---|---|---|
committer | Martin Stockhammer <martin_s@apache.org> | 2019-08-20 23:07:22 +0200 |
commit | cda4ac8085f840060ced0163d21f1e34407411d8 (patch) | |
tree | ae195fa65a3122575884f8358ab5bf4ba9e16ede /archiva-modules | |
parent | 379a72c3b7ea9d6474f71c3d1cbffa98765af8d9 (diff) | |
download | archiva-cda4ac8085f840060ced0163d21f1e34407411d8.tar.gz archiva-cda4ac8085f840060ced0163d21f1e34407411d8.zip |
Fixing cassandra stream api
Diffstat (limited to 'archiva-modules')
3 files changed, 75 insertions, 25 deletions
diff --git a/archiva-modules/metadata/metadata-repository-api/src/test/java/org/apache/archiva/metadata/repository/AbstractMetadataRepositoryTest.java b/archiva-modules/metadata/metadata-repository-api/src/test/java/org/apache/archiva/metadata/repository/AbstractMetadataRepositoryTest.java index 2232a7b16..a13abdc55 100644 --- a/archiva-modules/metadata/metadata-repository-api/src/test/java/org/apache/archiva/metadata/repository/AbstractMetadataRepositoryTest.java +++ b/archiva-modules/metadata/metadata-repository-api/src/test/java/org/apache/archiva/metadata/repository/AbstractMetadataRepositoryTest.java @@ -893,6 +893,7 @@ public abstract class AbstractMetadataRepositoryTest assertNotNull( str ); List<TestMetadataFacet> result = str.collect( Collectors.toList( ) ); assertEquals( 1, result.size( ) ); + assertNotNull( result.get( 0 ) ); assertEquals( TEST_NAME, result.get( 0 ).getName( ) ); } ); @@ -918,6 +919,7 @@ public abstract class AbstractMetadataRepositoryTest assertNotNull( str ); List<TestMetadataFacet> result = str.collect( Collectors.toList( ) ); assertEquals( 100, result.size( ) ); + assertNotNull( result.get( 0 ) ); for (int i=0; i<10; i++) { log.info( "Result {}", result.get( i ).getName( ) ); } diff --git a/archiva-modules/plugins/metadata-store-cassandra/pom.xml b/archiva-modules/plugins/metadata-store-cassandra/pom.xml index bd905acc3..d8ec3a995 100644 --- a/archiva-modules/plugins/metadata-store-cassandra/pom.xml +++ b/archiva-modules/plugins/metadata-store-cassandra/pom.xml @@ -260,6 +260,7 @@ <archiva.repositorySessionFactory.id>cassandra</archiva.repositorySessionFactory.id> <appserver.base>${project.build.directory}/appserver-base</appserver.base> </systemPropertyVariables> + <trimStackTrace>false</trimStackTrace> </configuration> </plugin> </plugins> diff --git a/archiva-modules/plugins/metadata-store-cassandra/src/main/java/org/apache/archiva/metadata/repository/cassandra/CassandraMetadataRepository.java b/archiva-modules/plugins/metadata-store-cassandra/src/main/java/org/apache/archiva/metadata/repository/cassandra/CassandraMetadataRepository.java index 207da5538..2b84b0d34 100644 --- a/archiva-modules/plugins/metadata-store-cassandra/src/main/java/org/apache/archiva/metadata/repository/cassandra/CassandraMetadataRepository.java +++ b/archiva-modules/plugins/metadata-store-cassandra/src/main/java/org/apache/archiva/metadata/repository/cassandra/CassandraMetadataRepository.java @@ -85,6 +85,7 @@ import java.util.Map; import java.util.Set; import java.util.Spliterator; import java.util.UUID; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; @@ -1526,26 +1527,39 @@ public class CassandraMetadataRepository return facets; } - private <T> Spliterator<T> createResultSpliterator( QueryResult<OrderedRows<String, String, String>> result, Function<Row<String, String, String>, T> converter) throws MetadataRepositoryException + private <T> Spliterator<T> createResultSpliterator( QueryResult<OrderedRows<String, String, String>> result, BiFunction<Row<String, String, String>, T, T> converter) throws MetadataRepositoryException { final int size = result.get().getCount(); + final Iterator<Row<String, String, String>> it = result.get( ).iterator( ); + return new Spliterator<T>( ) { + private T lastItem = null; + @Override public boolean tryAdvance( Consumer<? super T> action ) { if (size>=1) { - for ( Row<String, String, String> row : result.get( ) ) + if(it.hasNext()) { - T item = converter.apply( row ); - if ( item != null ) + while ( it.hasNext( ) ) { - action.accept( item ); - return true; + Row<String, String, String> row = it.next( ); + T item = converter.apply( row, lastItem ); + if ( item != null && lastItem !=null && item != lastItem ) + { + action.accept( lastItem ); + lastItem = item; + return true; + } + lastItem = item; } + action.accept( lastItem ); + return true; + } else { + return false; } - } return false; } @@ -1570,6 +1584,20 @@ public class CassandraMetadataRepository }; } + + /** + * Implementation is not very performant, because sorting is part of the stream. I do not know how to specify the sort + * in the query. + * + * @param session + * @param repositoryId + * @param facetClazz + * @param offset + * @param maxEntries + * @param <T> + * @return + * @throws MetadataRepositoryException + */ @Override public <T extends MetadataFacet> Stream<T> getMetadataFacetStream( RepositorySession session, String repositoryId, Class<T> facetClazz, long offset, long maxEntries ) throws MetadataRepositoryException { @@ -1578,21 +1606,37 @@ public class CassandraMetadataRepository QueryResult<OrderedRows<String, String, String>> result = HFactory // .createRangeSlicesQuery( keyspace, ss, ss, ss ) // - .setColumnFamily( cassandraArchivaManager.getMetadataFacetFamilyName() ) // - .setColumnNames( NAME.toString() ) // - .addEqualsExpression( REPOSITORY_NAME.toString(), repositoryId ) // - .addEqualsExpression( FACET_ID.toString(), facetId ) // - .execute(); + .setColumnFamily( cassandraArchivaManager.getMetadataFacetFamilyName( ) ) // + .setColumnNames( NAME.toString( ), KEY.toString( ), VALUE.toString( ) ) // + .addEqualsExpression( REPOSITORY_NAME.toString( ), repositoryId ) // + .addEqualsExpression( FACET_ID.toString( ), facetId ) // + .setRange( null, null, false, Integer.MAX_VALUE ) + .setRowCount( Integer.MAX_VALUE ) + .execute( ); + - return StreamSupport.stream( createResultSpliterator( result, ( Row<String, String, String> row)-> { + + return StreamSupport.stream( createResultSpliterator( result, ( Row<String, String, String> row, T lastItem)-> { ColumnSlice<String, String> columnSlice = row.getColumnSlice(); String name = getStringValue( columnSlice, NAME.toString( ) ); - T metadataFacet = metadataFacetFactory.createMetadataFacet( repositoryId, name ); - Map<String, String> map = new HashMap<>( ); - map.put( getStringValue( columnSlice, KEY.toString() ), getStringValue( columnSlice, VALUE.toString() ) ); - metadataFacet.fromProperties( map ); - return metadataFacet; - }), false ); + T updateItem; + if (lastItem!=null && lastItem.getName().equals(name)) + { + updateItem = lastItem; + } else + { + updateItem = metadataFacetFactory.createMetadataFacet( repositoryId, name ); + } + String key = getStringValue( columnSlice, KEY.toString() ); + if (StringUtils.isNotEmpty( key )) + { + Map<String, String> map = new HashMap<>( ); + map.put( key , getStringValue( columnSlice, VALUE.toString( ) ) ); + updateItem.fromProperties( map ); + } + return updateItem; + + }), false ).sorted( (f1, f2) -> f1.getName()!=null ? f1.getName().compareTo( f2.getName() ) : 1 ).skip( offset ).limit( maxEntries ); } @Override @@ -1603,11 +1647,14 @@ public class CassandraMetadataRepository } @Override - public MetadataFacet getMetadataFacet( RepositorySession session, final String repositoryId, final String facetId, final String name ) + public <T extends MetadataFacet> T getMetadataFacet( RepositorySession session, final String repositoryId, final Class<T> facetClazz, final String name ) throws MetadataRepositoryException { - - MetadataFacetFactory metadataFacetFactory = getFacetFactory( facetId ); + final MetadataFacetFactory<T> metadataFacetFactory = getFacetFactory( facetClazz ); + if (metadataFacetFactory==null) { + return null; + } + final String facetId = metadataFacetFactory.getFacetId( ); if ( metadataFacetFactory == null ) { return null; @@ -1622,7 +1669,7 @@ public class CassandraMetadataRepository .addEqualsExpression( NAME.toString(), name ) // .execute(); - MetadataFacet metadataFacet = metadataFacetFactory.createMetadataFacet( repositoryId, name ); + T metadataFacet = metadataFacetFactory.createMetadataFacet( repositoryId, name ); int size = result.get().getCount(); if ( size < 1 ) { @@ -1639,9 +1686,9 @@ public class CassandraMetadataRepository } @Override - public <T extends MetadataFacet> T getMetadataFacet( RepositorySession session, String repositoryId, Class<T> clazz, String name ) throws MetadataRepositoryException + public MetadataFacet getMetadataFacet( RepositorySession session, String repositoryId, String facetId, String name ) throws MetadataRepositoryException { - return null; + return getMetadataFacet( session, repositoryId, getFactoryClassForId( facetId ), name ); } @Override |