Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add refresh listener to Org API #310

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion jvm/src/main/scala/com/nawforce/apexlink/api/Org.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.nawforce.apexlink.api

import com.nawforce.apexlink.org.OPM
import com.nawforce.apexlink.org.{OPM, RefreshListener}
import com.nawforce.apexlink.plugins.{PluginsManager, UnusedPlugin}
import com.nawforce.apexlink.rpc.{
BombScore,
Expand Down Expand Up @@ -75,6 +75,13 @@ trait Org {
*/
def isDirty(): Boolean

/** Add or remove a listener which is called when all metadata changes have been processed.
*
* Similar to polling until isDirty = false, though the action will run on the same thread as
* the flusher and block it until completed. Use with caution.
*/
def setRefreshListener(rl: Option[RefreshListener]): Unit

/** Collection of all current issues reported against this org.
*/
def issues: IssuesCollection
Expand Down Expand Up @@ -242,6 +249,7 @@ trait Org {
* Class namespaces are NOT included.
*/
def getTestMethodItems(paths: Array[String]): Array[MethodTestItem]

}

object Org {
Expand Down
66 changes: 55 additions & 11 deletions jvm/src/main/scala/com/nawforce/apexlink/org/Flusher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ import com.nawforce.pkgforce.path.PathLike

import scala.collection.mutable

trait RefreshListener {
def onRefreshOne(orgPath: PathLike, updatedPath: PathLike): Unit
def onRefreshMany(orgPath: PathLike, updatedPaths: Seq[PathLike]): Unit
}

case class RefreshRequest(pkg: OPM.PackageImpl, path: PathLike, highPriority: Boolean)

class Flusher(org: OPM.OrgImpl, parsedCache: Option[ParsedCache]) {
protected val refreshQueue = new mutable.Queue[RefreshRequest]()
private var expired = false
protected val refreshQueue = new mutable.Queue[RefreshRequest]()
protected var skippedQueue = false
private var expired = false
private var listener: Option[RefreshListener] = None

def setListener(rl: Option[RefreshListener]): Unit = listener = rl

def isDirty: Boolean = {
org.refreshLock.synchronized { refreshQueue.nonEmpty }
Expand All @@ -36,7 +45,12 @@ class Flusher(org: OPM.OrgImpl, parsedCache: Option[ParsedCache]) {
refreshQueue.enqueue(request)
} else {
org.refreshLock.synchronized {
request.pkg.refreshBatched(Seq(request))
val updated = request.pkg.refreshBatched(Seq(request))
// Notify of updated path
if (updated) listener.foreach(_.onRefreshOne(org.path, request.path))

// Tell auto flush we skipped the queue
skippedQueue |= updated
}
}
}
Expand All @@ -48,21 +62,43 @@ class Flusher(org: OPM.OrgImpl, parsedCache: Option[ParsedCache]) {
def refreshAndFlush(): Boolean = {
OrgInfo.current.withValue(org) {
org.refreshLock.synchronized {
var updated = false
val packages = org.packages
var updated = false
val updatedPaths = mutable.Set[PathLike]()
val packages = org.packages

// Process in chunks, new requests may be queued during processing
while (refreshQueue.nonEmpty) {
val toProcess = refreshQueue.dequeueAll(_ => true)
LoggerOps.debug(s"Batched refresh starting for ${toProcess.length} items")
packages
.foreach(pkg => {
updated |= pkg.refreshBatched(toProcess.filter(_.pkg == pkg))
val reqs = toProcess.filter(_.pkg == pkg)
updated |= pkg.refreshBatched(reqs)

if (updated) updatedPaths.addAll(reqs.map(_.path))
})
LoggerOps.debug(s"Batched refresh completed")
}

// Flush to cache
flush()

// Notify of updated paths
if (updated) listener.foreach(_.onRefreshMany(org.path, updatedPaths.toSeq))

updated
}
}
}

protected def flush(): Unit = {
OrgInfo.current.withValue(org) {
org.refreshLock.synchronized {
val packages = org.packages

// Reset skip status to prevent more flushes
skippedQueue = false

parsedCache.foreach(pc => {
packages.foreach(pkg => {
pkg.flush(pc)
Expand All @@ -75,7 +111,6 @@ class Flusher(org: OPM.OrgImpl, parsedCache: Option[ParsedCache]) {

// Clean registered caches to reduce memory
Cleanable.clean()
updated
}
}
}
Expand All @@ -92,20 +127,29 @@ class CacheFlusher(org: OPM.OrgImpl, parsedCache: Option[ParsedCache])
t.start()

override def run(): Unit = {
def queueSize: Int = org.refreshLock.synchronized { refreshQueue.size }
def queueSize: Int = org.refreshLock.synchronized { refreshQueue.size }
def skipped: Boolean = org.refreshLock.synchronized { skippedQueue }

while (true) {
// Wait for non-zero queue to be stable
// Or with an empty queue and a priority/single update
var stable = false
while (!stable) {
var skip = false
while (!stable && !skip) {
val start = queueSize
Thread.sleep(1000)
val end = queueSize
stable = start > 0 && start == end
skip = skipped
}

// Process refresh requests & flush
refreshAndFlush()
if (stable) {
// Process refresh requests & flush
refreshAndFlush()
} else if (skip) {
// Already refreshed, just flush
flush()
}
}
}
}
4 changes: 4 additions & 0 deletions jvm/src/main/scala/com/nawforce/apexlink/org/OPM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ object OPM extends TriHierarchy {
false
}

def setRefreshListener(rl: Option[RefreshListener]): Unit = {
flusher.setListener(rl)
}

/** Queue a metadata refresh request */
def queueMetadataRefresh(request: RefreshRequest): Unit = {
flusher.queue(request)
Expand Down
9 changes: 7 additions & 2 deletions jvm/src/main/scala/com/nawforce/apexlink/org/PackageAPI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,13 @@ trait PackageAPI extends Package {

private[nawforce] def refreshAll(paths: Array[PathLike]): Unit = {
OrgInfo.current.withValue(org) {
val highPriority = paths.length == 1
org.queueMetadataRefresh(paths.map(path => RefreshRequest(this, path, highPriority)))
if (paths.length == 1) {
org.queueMetadataRefresh(RefreshRequest(this, paths.head, highPriority = true))
} else {
org.queueMetadataRefresh(
paths.map(path => RefreshRequest(this, path, highPriority = false))
)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion jvm/src/test/scala/com/nawforce/apexlink/TestHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.nawforce.apexlink

import com.nawforce.apexlink.TestHelper.{CURSOR, locToString}
import com.nawforce.apexlink.api._
import com.nawforce.apexlink.org.{OPM, OrgInfo}
import com.nawforce.apexlink.org.{OPM, OrgInfo, RefreshListener}
import com.nawforce.apexlink.plugins.{Plugin, PluginsManager}
import com.nawforce.apexlink.rpc.{LocationLink, TargetLocation}
import com.nawforce.apexlink.types.apex.{ApexClassDeclaration, ApexFullDeclaration, FullDeclaration}
Expand Down
59 changes: 58 additions & 1 deletion jvm/src/test/scala/com/nawforce/apexlink/pkg/RefreshTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package com.nawforce.apexlink.pkg

import com.nawforce.apexlink.TestHelper
import com.nawforce.apexlink.org.OPM
import com.nawforce.apexlink.org.{OPM, RefreshListener}
import com.nawforce.pkgforce.PathInterpolator.PathInterpolator
import com.nawforce.pkgforce.names.{Name, Names, TypeName}
import com.nawforce.pkgforce.path.PathLike
import com.nawforce.runtime.FileSystemHelper
import org.scalatest.funsuite.AnyFunSuite

import scala.collection.mutable

class RefreshTest extends AnyFunSuite with TestHelper {

private def refresh(
Expand All @@ -33,6 +35,17 @@ class RefreshTest extends AnyFunSuite with TestHelper {
pkg.refresh(path, highPriority)
}

private def refreshAll(pkg: OPM.PackageImpl, paths: Map[PathLike, String]): Unit = {
paths.foreach(p => p._1.write(p._2))
pkg.refreshAll(paths.keys.toArray)
}

class Listener(val capture: mutable.ArrayBuffer[PathLike]) extends RefreshListener {
def onRefreshOne(orgPath: PathLike, updatedPath: PathLike): Unit = capture.addOne(updatedPath)
def onRefreshMany(orgPath: PathLike, updatedPaths: Seq[PathLike]): Unit =
capture.addAll(updatedPaths)
}

test("Valid refresh") {
withManualFlush {
FileSystemHelper.run(Map("pkg/Foo.cls" -> "public class Foo {}")) { root: PathLike =>
Expand Down Expand Up @@ -1158,4 +1171,48 @@ class RefreshTest extends AnyFunSuite with TestHelper {
}
}

test("Refresh listener with refresh") {
val capture = mutable.ArrayBuffer[PathLike]()

withManualFlush {
FileSystemHelper.run(Map("pkg/Foo.cls" -> "public class Foo {}")) { root: PathLike =>
val org = createOrg(root)
org.setRefreshListener(Some(new Listener(capture)))
val pkg = org.unmanaged
refresh(pkg, root.join("pkg/Foo.cls"), "public class Foo {}")
assert(org.flush())
assert(org.issues.isEmpty)
assert(capture.map(_.toString).toSeq.sorted.equals(Seq("/pkg/Foo.cls")))
}
}
}

test("Refresh listener with refreshAll") {
val capture = mutable.ArrayBuffer[PathLike]()

withManualFlush {
FileSystemHelper.run(
Map(
"pkg/Foo.cls" -> "public class Foo {}",
"pkg/Bar.cls" -> "public class Bar {}",
"pkg/Baz.cls" -> "public class Baz {}"
)
) { root: PathLike =>
val org = createOrg(root)
org.setRefreshListener(Some(new Listener(capture)))
val pkg = org.unmanaged
refreshAll(
pkg,
Map(
root.join("pkg/Foo.cls") -> "public class Foo {}",
root.join("pkg/Bar.cls") -> "public class Bar {}"
)
)
assert(org.flush())
assert(org.issues.isEmpty)
assert(capture.map(_.toString).toSeq.sorted.equals(Seq("/pkg/Bar.cls", "/pkg/Foo.cls")))
}
}
}

}
Loading