Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BATCH-2275: added support to read/write from/to Couchbase #5

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions spring-batch-couchbase/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/target
.classpath
.project
.settings
65 changes: 65 additions & 0 deletions spring-batch-couchbase/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# spring-batch-couchbase
=========
ItemReader and ItemWriter implementations for Couchbase using Spring Data Couchbase. Please read the [reference manual] for implementation details of
different methods to read/write to/from Couchbase as provided by [CouchbaseOperations] and other configuration details.

Please read the api documentation for [Query] for configuring query parameters.

## Configuration for reader/writer
=========
```
@Configuration
public class ReaderWriterConfig {

@Bean
public ItemReader<ClassToRead> couchbaseItemReader() {

CouchbaseItemReader<ClassToRead> reader = new CouchbaseItemReader<ClassToRead>();
reader.setCouchbaseOperations(couchbaseOperations());
reader.setQuery(query());
reader.setDesignDocument("designDocumentName");
reader.setView("viewName");
reader.setTargetType(ClassToRead.class);
reader.setPageSize(15);
return reader;
}

@Bean
public ItemWriter<ClassToWrite> couchbaseItemWriter() {

CouchbaseItemWriter<ClassToWrite> writer = new CouchbaseItemWriter<ClassToWrite>(couchbaseOperations());

// Optional
// writer.setDelete(true|false); (defaults to false)
// writer.setOverrideDocuments(true|false); (defaults to false)
// writer.setPersistTo(ZERO|MASTER|ONE|TWO|THREE|FOUR); (defaults to PersistTo.ZERO)
// writer.setReplicateTo(ZERO|ONE|TWO|THREE); (defaults to ReplicateTo.ZERO)

return writer;
}

/*Optional
@Bean
*/
public Query query() {

Query query = new Query();
// configure the query as required (there are 17 query parameters)

return query;
}

@Bean
public CouchbaseOperations couchbaseOperations() {
// configure and return Couchbase template
}
}
```

##### NOTE
Use the pageSize attribute (inherited from AbstractPaginatedDataItemReader) to set the limit for the data returned by the query.
If the Query object's limit attribute has been set, it will be overridden by the pageSize attribute to make it consistent with the other reader behaviors.

[reference manual]:http://docs.spring.io/spring-data/couchbase/docs/1.1.1.RELEASE/reference/html/
[CouchbaseOperations]:http://docs.spring.io/spring-data/couchbase/docs/1.1.1.RELEASE/api/org/springframework/data/couchbase/core/CouchbaseOperations.html
[Query]:http://www.couchbase.com/autodocs/couchbase-java-client-1.4.3/index.html?com/couchbase/client/protocol/views/Query.html
62 changes: 62 additions & 0 deletions spring-batch-couchbase/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-couchbase</artifactId>
<version>1.0.0-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.7</java.version>
<spring-batch.version>3.0.1.RELEASE</spring-batch.version>
<spring-data-couchbase.version>1.1.1.RELEASE</spring-data-couchbase.version>
<mockito.verion>1.9.5</mockito.verion>
<junit.verion>4.11</junit.verion>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>

<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring-batch.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-couchbase</artifactId>
<version>${spring-data-couchbase.version}</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.verion}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.verion}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.item.data;

import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.Assert.hasLength;
import static org.springframework.util.Assert.notNull;
import static org.springframework.util.ClassUtils.getShortName;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.slf4j.Logger;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.couchbase.core.CouchbaseOperations;

import com.couchbase.client.protocol.views.Query;
import com.couchbase.client.protocol.views.ViewResponse;
import com.couchbase.client.protocol.views.ViewRow;

/**
* <p>
* Restartable {@link ItemReader} that reads documents from Couchbase
* via a paging technique.
* </p>
*
* <p>
* It executes the query object {@link Query} to retrieve the requested
* documents. Additional pages are requested as needed to provide data
* when the {@link #read()} method is called. If the limit is not set on the
* {@link Query} object, the default limit will be applied as specified in
* {@link AbstractPaginatedDataItemReader#pageSize}
* </p>
*
* <p>
* The implementation is thread-safe between calls to
* {@link #open(ExecutionContext)}, but remember to use <code>saveState=false</code>
* if used in a multi-threaded client (no restart available).
* </p>
*
*
* @author Hasnain Javed
* @since 3.x.x
* @param <T> Type of item to be read
*/
public class CouchbaseItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

private final Logger logger;

private CouchbaseOperations couchbaseOperations;

private Query query;

private String designDocument;
private String view;

private Class<? extends T> targetType;

public CouchbaseItemReader() {
setName(getShortName(getClass()));
logger = getLogger(getClass());
}

/**
* Used to perform operations against the Couchbase instance. Also
* handles the mapping of documents to objects.
*
* @param couchbaseOperations the CouchbaseOperations instance to use
* @see CouchbaseOperations
*/
public void setCouchbaseOperations(CouchbaseOperations couchbaseOperations) {
this.couchbaseOperations = couchbaseOperations;
}

/**
* Used to fetch documents from Couchbase.
*
* @param query the query to be executed
* @see Query
*/
public void setQuery(Query query) {
this.query = query;
}

/**
* @param designDocument the name of the design document
*/
public void setDesignDocument(String designDocument) {
this.designDocument = designDocument;
}

/**
* @param view the name of the view
*/
public void setView(String view) {
this.view = view;
}

/**
* The type of object to be returned for each {@link #read()} call.
*
* @param type the type of object to return
*/
public void setTargetType(Class<? extends T> targetType) {
this.targetType = targetType;
}

@Override
public void afterPropertiesSet() throws Exception {
notNull(couchbaseOperations, "A CouchbaseOperations implementation is required.");
notNull(query, "A valid query is required.");
notNull(targetType, "A target type to convert the input into is required.");
hasLength(designDocument, "A design document name is required.");
hasLength(view, "A view name is required.");

logger.debug("setting limit on query to {}", pageSize);
query.setLimit(pageSize);
}

@Override
@SuppressWarnings("unchecked")
protected Iterator<T> doPageRead() {

Iterator<T> iterator = null;

logger.debug("executing query {} with design document {} in view {}", query, designDocument, view);

if(query.willReduce()) {
ViewResponse response = couchbaseOperations.queryView(designDocument, view, query);
iterator = getItems(response);
}else {
iterator = (Iterator<T>)couchbaseOperations.findByView(designDocument, view, query, targetType).iterator();
}

return iterator;
}

private Iterator<T> getItems(ViewResponse response) {

List<T> items = new ArrayList<T>(response.size());

for( ViewRow row : response) {
String id = row.getId();
logger.debug("fetching document with id {}", id);
T item = couchbaseOperations.findById(id, targetType);
items.add(item);
}

return items.iterator();
}
}
Loading