Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tspace monotonic #571

Merged
merged 2 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 54 additions & 48 deletions jpos/src/main/java/org/jpos/space/TSpace.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* jPOS Project [http://jpos.org]
* Copyright (C) 2000-2021 jPOS Software SRL
* Copyright (C) 2000-2023 jPOS Software SRL
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand All @@ -17,11 +17,10 @@
*/

package org.jpos.space;

import org.jpos.util.Loggeable;
import java.io.PrintStream;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;

Expand All @@ -40,8 +39,9 @@ public class TSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable {
private static final long GCLONG = 60*1000;
private static final long NRD_RESOLUTION = 500L;
private static final int MAX_ENTRIES_IN_DUMP = 1000;
private static final long ONE_MILLION = 1_000_000L; // multiplier millis --> nanos
private final Set[] expirables;
private long lastLongGC = Instant.now().toEpochMilli();
private long lastLongGC = System.nanoTime();

public TSpace () {
super();
Expand Down Expand Up @@ -70,7 +70,7 @@ public void out (K key, V value, long timeout) {
throw new NullPointerException ("key=" + key + ", value=" + value);
Object v = value;
if (timeout > 0) {
v = new Expirable (value, Instant.now().toEpochMilli() + timeout);
v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
}
synchronized (this) {
List l = getList(key);
Expand Down Expand Up @@ -112,17 +112,18 @@ public synchronized V in (Object key) {

@Override
public synchronized V in (Object key, long timeout) {
Object obj;
Instant now = Instant.now();
long duration;
while ((obj = inp (key)) == null &&
(duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
V obj;
long now = System.nanoTime();
long to = now + timeout * ONE_MILLION;
long waitFor;
while ( (obj = inp (key)) == null &&
(waitFor = (to - System.nanoTime())) >= 0 )
{
try {
this.wait (timeout - duration);
this.wait(Math.max(waitFor / ONE_MILLION, 1L));
} catch (InterruptedException e) { }
}
return (V) obj;
return obj;
}

@Override
Expand All @@ -138,17 +139,18 @@ public synchronized V rd (Object key) {

@Override
public synchronized V rd (Object key, long timeout) {
Object obj;
Instant now = Instant.now();
long duration;
while ((obj = rdp (key)) == null &&
(duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
V obj;
long now = System.nanoTime();
long to = now + (timeout * ONE_MILLION);
long waitFor;
while ( (obj = rdp (key)) == null &&
(waitFor = (to - System.nanoTime())) >= 0 )
{
try {
this.wait (timeout - duration);
this.wait(Math.max(waitFor / ONE_MILLION, 1L));
} catch (InterruptedException e) { }
}
return (V) obj;
return obj;
}

@Override
Expand All @@ -162,17 +164,19 @@ public synchronized void nrd (Object key) {

@Override
public synchronized V nrd (Object key, long timeout) {
Object obj;
Instant now = Instant.now();
long duration;
while ((obj = rdp (key)) != null &&
(duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
V obj;
long now = System.nanoTime();
long to = now + (timeout * ONE_MILLION);
long waitFor;
while ( (obj = rdp (key)) != null &&
(waitFor = (to - System.nanoTime())) >= 0 )
{
try {
this.wait (Math.min(NRD_RESOLUTION, timeout - duration));
this.wait(Math.min(NRD_RESOLUTION,
Math.max(waitFor / ONE_MILLION, 1L)));
} catch (InterruptedException ignored) { }
}
return (V) obj;
return obj;
}

@Override
Expand All @@ -186,9 +190,9 @@ public void run () {

public void gc () {
gc(0);
if (Instant.now().toEpochMilli() - lastLongGC > GCLONG) {
if (System.nanoTime() - lastLongGC > GCLONG) {
gc(1);
lastLongGC = Instant.now().toEpochMilli();
lastLongGC = System.nanoTime();
}
}

Expand Down Expand Up @@ -336,7 +340,7 @@ public void push (K key, V value, long timeout) {
throw new NullPointerException ("key=" + key + ", value=" + value);
Object v = value;
if (timeout > 0) {
v = new Expirable (value, Instant.now().toEpochMilli() + timeout);
v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
}
synchronized (this) {
List l = getList(key);
Expand Down Expand Up @@ -373,7 +377,7 @@ public void put (K key, V value, long timeout) {
throw new NullPointerException ("key=" + key + ", value=" + value);
Object v = value;
if (timeout > 0) {
v = new Expirable (value, Instant.now().toEpochMilli() + timeout);
v = new Expirable (value, System.nanoTime() + (timeout * ONE_MILLION));
}
synchronized (this) {
List l = new LinkedList();
Expand All @@ -399,14 +403,15 @@ public boolean existAny (K[] keys) {

@Override
public boolean existAny (K[] keys, long timeout) {
Instant now = Instant.now();
long duration;
while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) {
long now = System.nanoTime();
long to = now + (timeout * ONE_MILLION);
long waitFor;
while ((waitFor = (to - System.nanoTime())) >= 0) {
if (existAny (keys))
return true;
synchronized (this) {
try {
wait (timeout - duration);
this.wait(Math.max(waitFor / ONE_MILLION, 1L));
} catch (InterruptedException e) { }
}
}
Expand Down Expand Up @@ -517,19 +522,24 @@ private void unregisterExpirable(Object k) {

static class Expirable implements Comparable, Serializable {

static final long serialVersionUID = 0xA7F22BF5;
private static final long serialVersionUID = 0xA7F22BF5;

Object value;

/**
* When to expire, in the future, as given by monotonic System.nanoTime().<br>
* IMPORTANT: always use a nanosec offset from System.nanoTime()!
*/
long expires;

public Expirable (Object value, long expires) {
Expirable (Object value, long expires) {
super();
this.value = value;
this.expires = expires;
}

public boolean isExpired () {
return expires < Instant.now().toEpochMilli();
boolean isExpired () {
return (System.nanoTime() - expires) > 0;
}

@Override
Expand All @@ -540,20 +550,16 @@ public String toString() {
+ ",expired=" + isExpired ();
}

public Object getValue() {
Object getValue() {
return isExpired() ? null : value;
}

@Override
public int compareTo (Object obj) {
Expirable other = (Expirable) obj;
long otherExpires = other.expires;
if (otherExpires == expires)
return 0;
else if (expires < otherExpires)
return -1;
else
return 1;
public int compareTo (Object other) {
long diff = this.expires - ((Expirable)other).expires;
return diff > 0 ? 1 :
diff < 0 ? -1 :
0;
}
}

Expand Down
5 changes: 2 additions & 3 deletions jpos/src/test/java/org/jpos/space/TSpacePerformanceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.jpos.space;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -184,9 +183,9 @@ public void testDeadLockWithNotify() throws Throwable {
for (int i=0; i<size; i++)
es.execute(new WriteSpaceWithNotifyTask("WriteTask2-"+i,sp2,sp1));

Instant stamp = Instant.now();
Duration stamp = Duration.ofNanos(System.nanoTime());
while (((ThreadPoolExecutor)es).getActiveCount() > 0) {
if (Duration.between(stamp, Instant.now()).toMillis() < 10000){
if (Duration.ofNanos(System.nanoTime()).minus(stamp).toMillis() < 10000){
ISOUtil.sleep(100);
continue;
}
Expand Down
28 changes: 15 additions & 13 deletions jpos/src/test/java/org/jpos/space/TSpaceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.time.Duration;
import java.util.AbstractSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -42,6 +43,7 @@
@SuppressWarnings("unchecked")
@ExtendWith(MockitoExtension.class)
public class TSpaceTest {
static final long EXPIRE_OFFSET = Duration.ofDays(9999).toNanos();

@Test
public void testConstructor() throws Throwable {
Expand Down Expand Up @@ -103,31 +105,33 @@ public void testExpirableConstructor() throws Throwable {

@Test
public void testExpirableGetValue() throws Throwable {
String result = (String) new TSpace.Expirable("", 9184833384926L).getValue();
String result = (String) new TSpace.Expirable("", System.nanoTime() + EXPIRE_OFFSET).getValue();
assertEquals("", result, "result");
}

@Test
public void testExpirableGetValue1() throws Throwable {
Object result = new TSpace.Expirable(null, 9184833384926L).getValue();
Object result = new TSpace.Expirable(null, System.nanoTime() + EXPIRE_OFFSET).getValue();
assertNull(result, "result");
}

@Test
public void testExpirableGetValue2() throws Throwable {
Object result = new TSpace.Expirable(new Object(), 100L).getValue();
// using negative offset to ensure expiration (literally, object is born already expired)
Object result = new TSpace.Expirable(new Object(), System.nanoTime() - EXPIRE_OFFSET).getValue();
assertNull(result, "result");
}

@Test
public void testExpirableIsExpired() throws Throwable {
boolean result = new TSpace.Expirable("", 9184833384926L).isExpired();
boolean result = new TSpace.Expirable("", System.nanoTime() + EXPIRE_OFFSET).isExpired();
assertFalse(result, "result");
}

@Test
public void testExpirableIsExpired1() throws Throwable {
boolean result = new TSpace.Expirable(new Object(), 100L).isExpired();
// using negative offset to ensure expiration (literally, object is born already expired)
boolean result = new TSpace.Expirable(new Object(), System.nanoTime() - EXPIRE_OFFSET).isExpired();
assertTrue(result, "result");
}

Expand All @@ -140,7 +144,7 @@ public void testExpirableToString() throws Throwable {
@Test
public void testExpirableToStringThrowsNullPointerException() throws Throwable {
try {
new TSpace.Expirable(null, 100L).toString();
new TSpace.Expirable(null,100L).toString();
fail("Expected NullPointerException to be thrown");
} catch (NullPointerException ex) {
if (isJavaVersionAtMost(JAVA_14)) {
Expand All @@ -153,7 +157,6 @@ public void testExpirableToStringThrowsNullPointerException() throws Throwable {

@Test
public void testGc() throws Throwable {

TSpace tSpace = new TSpace();
tSpace.gc();
assertEquals(0, tSpace.entries.size(), "tSpace.entries.size()");
Expand Down Expand Up @@ -207,6 +210,7 @@ public void testInp1() throws Throwable {
"tSpace.entries.get(\"\").get(0) had \"testString\" removed");
assertEquals("testString", result, "result");
assertEquals(3, tSpace.entries.size(), "tSpace.entries.size()");
tSpace.dump(System.out, ">>");
}

@Test
Expand Down Expand Up @@ -239,12 +243,10 @@ public void testNotifyReaders() {
final Space sp = new TSpace();
final AtomicInteger ai = new AtomicInteger(10);
for (int i=0; i<10; i++) {
new Thread() {
public void run() {
if (sp.rd("TEST", 5000L) != null)
ai.decrementAndGet();
}
}.start();
new Thread(()->{
if (sp.rd("TEST", 5000L) != null)
ai.decrementAndGet();
}).start();
}
sp.out("TEST", Boolean.TRUE);
ISOUtil.sleep(500L);
Expand Down
13 changes: 6 additions & 7 deletions jpos/src/test/java/org/jpos/space/TSpaceTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.jpos.space;

import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;

Expand Down Expand Up @@ -289,28 +288,28 @@ public void run() {
sp.out("KA", Boolean.TRUE);
}
}.start();
Instant now = Instant.now();
Duration now = Duration.ofNanos(System.nanoTime());
assertTrue(sp.existAny(new String[] { "KA", "KB" }, 2000L), "existAnyWithTimeout ([KA,KB], delay)");
long elapsed = Duration.between(now, Instant.now()).toMillis();
long elapsed = Duration.ofNanos(System.nanoTime()).minus(now).toMillis();
assertTrue(elapsed > 900L, "delay was > 1000");
}

@Test
public void testNRD() {
Instant now = Instant.now();
Duration now = Duration.ofNanos(System.nanoTime());
sp.out("NRD", "NRDTEST", 1000L);
sp.nrd("NRD");
long elapsed = Duration.between(now, Instant.now()).toMillis();
long elapsed = Duration.ofNanos(System.nanoTime()).minus(now).toMillis();
assertTrue(elapsed >= 1000L, "Invalid elapsed time " + elapsed);
}
@Test
public void testNRDWithDelay() {
Instant now = Instant.now();
Duration now = Duration.ofNanos(System.nanoTime());
sp.out("NRD", "NRDTEST", 1000L);
Object obj = sp.nrd("NRD", 500L);
assertNotNull(obj, "Object should not be null");
obj = sp.nrd("NRD", 5000L);
long elapsed = Duration.between(now, Instant.now()).toMillis();
long elapsed = Duration.ofNanos(System.nanoTime()).minus(now).toMillis();
assertTrue(elapsed >= 1000L && elapsed <= 2000L, "Invalid elapsed time " + elapsed);
assertNull(obj, "Object should be null");
}
Expand Down
Loading