From 357ce8ba04d147cc77fde42dbdf06e5eddd2840b Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Thu, 29 Jul 2021 15:24:57 +0100 Subject: [PATCH] Fix FlatMapObservableTest Add header and ensure can run in Scala 2.11 JAVA-4241 --- .../internal/FlatMapObservableTest.scala | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/driver-scala/src/test/scala/org/mongodb/scala/internal/FlatMapObservableTest.scala b/driver-scala/src/test/scala/org/mongodb/scala/internal/FlatMapObservableTest.scala index 32509075475..25b576a20d6 100644 --- a/driver-scala/src/test/scala/org/mongodb/scala/internal/FlatMapObservableTest.scala +++ b/driver-scala/src/test/scala/org/mongodb/scala/internal/FlatMapObservableTest.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.mongodb.scala.internal import org.mongodb.scala.{ BaseSpec, Observable, Observer } @@ -13,17 +29,11 @@ class FlatMapObservableTest extends BaseSpec with Futures with Eventually { val completedCounter = new AtomicInteger(0) Observable(1 to 100) .flatMap( - x => - (observer: Observer[_ >: Int]) => { - Future(()).onComplete(_ => { - observer.onNext(x) - observer.onComplete() - }) - } + x => createObservable(x) ) .subscribe( _ => (), - p.failure, + e => p.failure(e), () => { completedCounter.incrementAndGet() Thread.sleep(100) @@ -35,4 +45,13 @@ class FlatMapObservableTest extends BaseSpec with Futures with Eventually { assert(completedCounter.get() == 1, s"${completedCounter.get()}") Thread.sleep(1000) } + + private def createObservable(x: Int): Observable[Int] = new Observable[Int] { + override def subscribe(observer: Observer[_ >: Int]): Unit = { + Future(()).onComplete(_ => { + observer.onNext(x) + observer.onComplete() + }) + } + } }