Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct warnings #1436

Merged
merged 6 commits into from
Jul 21, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void onStart() {
@Override
public void onNext(Observable<? extends T> t) {
if (t instanceof ScalarSynchronousObservable) {
handleScalarSynchronousObservable(t);
handleScalarSynchronousObservable((ScalarSynchronousObservable)t);
} else {
if (t == null || isUnsubscribed()) {
return;
Expand Down Expand Up @@ -128,7 +128,7 @@ private void handleNewSource(Observable<? extends T> t) {
request(1);
}

private void handleScalarSynchronousObservable(Observable<? extends T> t) {
private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? extends T> t) {
// fast-path for scalar, synchronous values such as Observable.from(int)
/**
* Without this optimization:
Expand All @@ -154,8 +154,8 @@ private void handleScalarSynchronousObservable(Observable<? extends T> t) {
}
}

private void handleScalarSynchronousObservableWithoutRequestLimits(Observable<? extends T> t) {
T value = ((ScalarSynchronousObservable<T>) t).get();
private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchronousObservable<? extends T> t) {
T value = t.get();
if (getEmitLock()) {
try {
actual.onNext(value);
Expand All @@ -177,15 +177,15 @@ private void handleScalarSynchronousObservableWithoutRequestLimits(Observable<?
}
}

private void handleScalarSynchronousObservableWithRequestLimits(Observable<? extends T> t) {
private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable<? extends T> t) {
if (getEmitLock()) {
boolean emitted = false;
try {
long r = mergeProducer.requested;
if (r > 0) {
emitted = true;
actual.onNext(((ScalarSynchronousObservable<T>) t).get());
mergeProducer.REQUESTED.decrementAndGet(mergeProducer);
actual.onNext(t.get());
MergeProducer.REQUESTED.decrementAndGet(mergeProducer);
// we handle this Observable without ever incrementing the wip or touching other machinery so just return here
return;
}
Expand All @@ -203,7 +203,7 @@ private void handleScalarSynchronousObservableWithRequestLimits(Observable<? ext
// enqueue the values for later delivery
initScalarValueQueueIfNeeded();
try {
scalarValueQueue.onNext(((ScalarSynchronousObservable<T>) t).get());
scalarValueQueue.onNext(t.get());
} catch (MissingBackpressureException e) {
onError(e);
}
Expand Down Expand Up @@ -295,7 +295,7 @@ private int drainScalarValueQueue() {
}
}
// decrement the number we emitted from outstanding requests
mergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
}
return emittedWhileDraining;
}
Expand Down Expand Up @@ -500,7 +500,7 @@ private void emit(T t, boolean complete) {
} else {
parentSubscriber.actual.onNext(t);
emitted++;
producer.REQUESTED.decrementAndGet(producer);
MergeProducer.REQUESTED.decrementAndGet(producer);
}
} else {
// no requests available, so enqueue it
Expand Down Expand Up @@ -587,7 +587,7 @@ private int drainRequested() {
}

// decrement the number we emitted from outstanding requests
producer.REQUESTED.getAndAdd(producer, -emitted);
MergeProducer.REQUESTED.getAndAdd(producer, -emitted);
return emitted;
}

Expand Down
10 changes: 5 additions & 5 deletions rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static RxRingBuffer getSpmcInstance() {

@Override
protected SpscArrayQueue<Object> createObject() {
return new SpscArrayQueue(SIZE);
return new SpscArrayQueue<Object>(SIZE);
}

};
Expand All @@ -182,12 +182,12 @@ protected SpscArrayQueue<Object> createObject() {

@Override
protected SpmcArrayQueue<Object> createObject() {
return new SpmcArrayQueue(SIZE);
return new SpmcArrayQueue<Object>(SIZE);
}

};

private RxRingBuffer(Queue queue, int size) {
private RxRingBuffer(Queue<Object> queue, int size) {
this.queue = queue;
this.pool = null;
this.size = size;
Expand All @@ -201,7 +201,7 @@ private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {

public void release() {
if (pool != null) {
Queue q = queue;
Queue<Object> q = queue;
q.clear();
queue = null;
pool.returnObject(q);
Expand All @@ -214,7 +214,7 @@ public void unsubscribe() {
}

/* for unit tests */RxRingBuffer() {
this(new SynchronizedQueue<Queue>(SIZE), SIZE);
this(new SynchronizedQueue<Object>(SIZE), SIZE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ public void clear() {
}

public void forEach(Action1<T> action) {
Object[] ss;
T[] ss=null;
synchronized (this) {
if (unsubscribed || subscriptions == null) {
return;
}
ss = subscriptions.toArray();
ss = subscriptions.toArray(ss);
}
for (Object t : ss) {
action.call((T) t);
for (T t : ss) {
action.call(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ public void testForEachAcrossSections() {
buffer.add(i);
}

final ArrayList<String> list = new ArrayList<String>();
final ArrayList<Integer> list = new ArrayList<Integer>();
int nextIndex = buffer.forEach(accumulate(list), 5000);
assertEquals(10000, list.size());
assertEquals(5000, list.get(0));
assertEquals(9999, list.get(4999));
assertEquals(0, list.get(5000));
assertEquals(4999, list.get(9999));
assertEquals(Integer.valueOf(5000), list.get(0));
assertEquals(Integer.valueOf(9999), list.get(4999));
assertEquals(Integer.valueOf(0), list.get(5000));
assertEquals(Integer.valueOf(4999), list.get(9999));
assertEquals(5000, nextIndex);
}

Expand Down Expand Up @@ -364,11 +364,11 @@ public void call() {
assertEquals(0, exceptions.size());
}

private Func1<Object, Boolean> accumulate(final ArrayList list) {
return new Func1<Object, Boolean>() {
private <T> Func1<T, Boolean> accumulate(final ArrayList<T> list) {
return new Func1<T, Boolean>() {

@Override
public Boolean call(Object t1) {
public Boolean call(T t1) {
list.add(t1);
return true;
}
Expand Down
10 changes: 0 additions & 10 deletions rxjava-core/src/test/java/rx/internal/util/RxRingBufferBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,15 @@
*/
package rx.internal.util;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public abstract class RxRingBufferBase {

Expand Down Expand Up @@ -59,7 +51,6 @@ public void addAndPollFailBackpressure() throws MissingBackpressureException {

RxRingBuffer b = createRingBuffer();

TestSubscriber<Object> s = new TestSubscriber<Object>();
try {
for (int i = 0; i < RxRingBuffer.SIZE; i++) {
// System.out.println("Add: " + i);
Expand All @@ -82,7 +73,6 @@ public void addAndPollFailBackpressure() throws MissingBackpressureException {
@Test
public void addAndPoll() throws MissingBackpressureException {
RxRingBuffer b = createRingBuffer();
TestSubscriber<Object> s = new TestSubscriber<Object>();
b.onNext("o");
b.onNext("o");
b.poll();
Expand Down