2626
2727import java .io .IOException ;
2828import java .time .Duration ;
29+ import java .util .HashSet ;
2930import java .util .Iterator ;
3031import java .util .List ;
32+ import java .util .Set ;
3133import java .util .concurrent .CountDownLatch ;
34+ import java .util .concurrent .ThreadLocalRandom ;
3235import java .util .concurrent .TimeUnit ;
3336import java .util .concurrent .TimeoutException ;
3437import java .util .concurrent .atomic .AtomicBoolean ;
3538import java .util .concurrent .atomic .AtomicInteger ;
36- import java .util .concurrent .atomic .AtomicReference ;
3739
3840import static io .nats .client .api .ConsumerConfiguration .builder ;
3941import static io .nats .client .support .ApiConstants .*;
@@ -1309,6 +1311,10 @@ private static void _overflowCheck(JetStreamSubscription sub, PullRequestOptions
13091311
13101312 @ Test
13111313 public void testPrioritized () throws Exception {
1314+ // PriorityPolicy.Prioritized
1315+ // start a priority 1 (#1) and a priority 2 (#2) consumer, #1 should get messages, #2 should get none
1316+ // close the #1, #2 should get messages
1317+ // start another priority 1 (#3), #2 should stop getting messages #3 should get messages
13121318 ListenerForTesting l = new ListenerForTesting ();
13131319 Options .Builder b = Options .builder ().errorListener (l );
13141320 jsServer .run (b , TestBase ::atLeast2_12 , nc -> {
@@ -1339,31 +1345,41 @@ public void testPrioritized() throws Exception {
13391345 };
13401346
13411347 AtomicInteger count2 = new AtomicInteger ();
1348+ CountDownLatch latch2 = new CountDownLatch (20 );
13421349 MessageHandler handler2 = msg -> {
13431350 msg .ack ();
13441351 count2 .incrementAndGet ();
1352+ latch2 .countDown ();
13451353 };
13461354
1347- ConsumeOptions co1 = ConsumeOptions .builder ()
1355+ AtomicInteger count3 = new AtomicInteger ();
1356+ MessageHandler handler3 = msg -> {
1357+ msg .ack ();
1358+ count3 .incrementAndGet ();
1359+ };
1360+
1361+ ConsumeOptions coP1 = ConsumeOptions .builder ()
13481362 .batchSize (10 )
13491363 .group (group )
13501364 .priority (1 )
13511365 .build ();
1352- ConsumeOptions co2 = ConsumeOptions .builder ()
1366+ ConsumeOptions coP2 = ConsumeOptions .builder ()
13531367 .batchSize (10 )
13541368 .group (group )
13551369 .priority (2 )
13561370 .build ();
13571371
1358- MessageConsumer mc1 = consumerContext1 .consume (co1 , handler1 );
1359- MessageConsumer mc2 = consumerContext2 .consume (co2 , handler2 );
1372+ MessageConsumer mc1 = consumerContext1 .consume (coP1 , handler1 );
1373+ MessageConsumer mc2 = consumerContext2 .consume (coP2 , handler2 );
13601374
1375+ AtomicBoolean pub = new AtomicBoolean (true );
13611376 Thread t = new Thread (() -> {
13621377 int count = 0 ;
1363- while (++count <= 200 ) {
1378+ while (pub .get ()) {
1379+ ++count ;
13641380 try {
13651381 js .publish (tsc .subject (), ("x" + count ).getBytes ());
1366- Thread . sleep (20 );
1382+ sleep (20 );
13671383 }
13681384 catch (Exception e ) {
13691385 fail (e );
@@ -1374,18 +1390,33 @@ public void testPrioritized() throws Exception {
13741390 t .start ();
13751391
13761392 if (!latch1 .await (5 , TimeUnit .SECONDS )) {
1377- fail ("Didn't get messages" );
1393+ fail ("Didn't get messages consumer 1 " );
13781394 }
1395+ assertEquals (0 , count2 .get ());
13791396 mc1 .close ();
1397+
1398+ if (!latch2 .await (5 , TimeUnit .SECONDS )) {
1399+ fail ("Didn't get messages consumer 2" );
1400+ }
1401+ MessageConsumer mc3 = consumerContext2 .consume (coP1 , handler3 );
1402+
1403+ Thread .sleep (200 );
1404+ pub .set (false );
13801405 t .join ();
13811406 mc2 .close ();
1407+ mc3 .close ();
1408+
13821409 assertTrue (count1 .get () >= 20 );
13831410 assertTrue (count2 .get () >= 20 );
1411+ assertTrue (count3 .get () > 0 );
13841412 });
13851413 }
13861414
13871415 @ Test
13881416 public void testPinnedClient () throws Exception {
1417+ // have 3 consumers in the same group all PriorityPolicy.PinnedClient
1418+ // start consuming, tracking pin ids and counts
1419+ // unpin 10 times and make sure that new pins are made
13891420 ListenerForTesting l = new ListenerForTesting ();
13901421 Options .Builder b = Options .builder ().errorListener (l );
13911422 jsServer .run (b , TestBase ::atLeast2_12 , nc -> {
@@ -1406,59 +1437,58 @@ public void testPinnedClient() throws Exception {
14061437 StreamContext streamContext = nc .getStreamContext (tsc .stream );
14071438 ConsumerContext consumerContext1 = streamContext .createOrUpdateConsumer (cc );
14081439 ConsumerContext consumerContext2 = streamContext .getConsumerContext (consumer );
1440+ ConsumerContext consumerContext3 = streamContext .getConsumerContext (consumer );
14091441
14101442 //noinspection resource
14111443 assertThrows (IOException .class , () -> consumerContext1 .fetchMessages (10 ));
14121444
1445+ Set <String > pinIds = new HashSet <>();
14131446 AtomicInteger count1 = new AtomicInteger ();
1414- CountDownLatch latch1 = new CountDownLatch ( 20 );
1415- AtomicReference < String > pinId1 = new AtomicReference <> ();
1447+ AtomicInteger count2 = new AtomicInteger ( );
1448+ AtomicInteger count3 = new AtomicInteger ();
14161449 MessageHandler handler1 = msg -> {
14171450 msg .ack ();
14181451 assertNotNull (msg .getHeaders ());
14191452 String natsPinId = msg .getHeaders ().getFirst (NATS_PIN_ID_HDR );
14201453 assertNotNull (natsPinId );
1421- String pid = pinId1 .get ();
1422- if (pid == null ) {
1423- pinId1 .set (natsPinId );
1424- }
1425- else {
1426- assertEquals (pid , natsPinId );
1427- }
1454+ pinIds .add (natsPinId );
14281455 count1 .incrementAndGet ();
1429- latch1 .countDown ();
14301456 };
1431-
1432- AtomicInteger count2 = new AtomicInteger ();
1433- AtomicReference <String > pinId2 = new AtomicReference <>();
14341457 MessageHandler handler2 = msg -> {
14351458 msg .ack ();
14361459 assertNotNull (msg .getHeaders ());
14371460 String natsPinId = msg .getHeaders ().getFirst (NATS_PIN_ID_HDR );
14381461 assertNotNull (natsPinId );
1439- String pid = pinId2 .get ();
1440- if (pid == null ) {
1441- pinId2 .set (natsPinId );
1442- }
1443- else {
1444- assertEquals (pid , natsPinId );
1445- }
1462+ pinIds .add (natsPinId );
14461463 count2 .incrementAndGet ();
14471464 };
1465+ MessageHandler handler3 = msg -> {
1466+ msg .ack ();
1467+ assertNotNull (msg .getHeaders ());
1468+ String natsPinId = msg .getHeaders ().getFirst (NATS_PIN_ID_HDR );
1469+ assertNotNull (natsPinId );
1470+ pinIds .add (natsPinId );
1471+ count3 .incrementAndGet ();
1472+ };
14481473
14491474 ConsumeOptions co = ConsumeOptions .builder ()
14501475 .batchSize (10 )
1476+ .expiresIn (1000 )
14511477 .group (group )
14521478 .build ();
14531479
14541480 MessageConsumer mc1 = consumerContext1 .consume (co , handler1 );
1481+ MessageConsumer mc2 = consumerContext2 .consume (co , handler2 );
1482+ MessageConsumer mc3 = consumerContext3 .consume (co , handler3 );
14551483
1484+ AtomicBoolean pub = new AtomicBoolean (true );
14561485 Thread t = new Thread (() -> {
14571486 int count = 0 ;
1458- while (++count <= 100 ) {
1487+ while (pub .get ()) {
1488+ ++count ;
14591489 try {
14601490 js .publish (tsc .subject (), ("x" + count ).getBytes ());
1461- Thread . sleep (20 );
1491+ sleep (20 );
14621492 }
14631493 catch (Exception e ) {
14641494 fail (e );
@@ -1468,27 +1498,46 @@ public void testPinnedClient() throws Exception {
14681498 });
14691499 t .start ();
14701500
1471- MessageConsumer mc2 = consumerContext2 .consume (co , handler2 );
1472-
1473- if (!latch1 .await (5 , TimeUnit .SECONDS )) {
1474- fail ("Didn't get messages" );
1501+ int unpins = 0 ;
1502+ while (unpins ++ < 10 ) {
1503+ sleep (650 );
1504+ switch (ThreadLocalRandom .current ().nextInt (0 , 4 )) {
1505+ case 0 :
1506+ assertTrue (consumerContext1 .unpin (group ));
1507+ break ;
1508+ case 1 :
1509+ assertTrue (consumerContext2 .unpin (group ));
1510+ break ;
1511+ case 2 :
1512+ assertTrue (consumerContext3 .unpin (group ));
1513+ break ;
1514+ case 3 :
1515+ assertTrue (jsm .unpinConsumer (tsc .stream , consumer , group ));
1516+ break ;
1517+ }
1518+ assertTrue (consumerContext1 .unpin (group ));
14751519 }
1476- assertTrue ( consumerContext1 . unpin ( group ) );
1520+ sleep ( 650 );
14771521
1522+ pub .set (false );
14781523 t .join ();
14791524 mc1 .close ();
14801525 mc2 .close ();
1481- assertTrue (count1 .get () >= 20 );
1482- assertTrue (count2 .get () >= 20 );
1483- assertNotEquals (pinId1 .get (), pinId2 .get ());
1484-
1485- // while I'm here, test variations of unpin validation
1486- assertThrows (JetStreamApiException .class , () -> consumerContext1 .unpin ("not-a-group" ));
1487- assertThrows (JetStreamApiException .class , () -> jsm .unpinConsumer (tsc .stream , tsc .subject (), "not-a-group" ));
1488-
1489- assertThrows (IllegalArgumentException .class , () -> jsm .unpinConsumer (null , null , null ));
1490- assertThrows (IllegalArgumentException .class , () -> jsm .unpinConsumer ("sn" , null , null ));
1491- assertThrows (IllegalArgumentException .class , () -> jsm .unpinConsumer ("sn" , "cn" , null ));
1526+ mc3 .close ();
1527+
1528+ assertTrue (pinIds .size () > 3 );
1529+ int c1 = count1 .get ();
1530+ int c2 = count2 .get ();
1531+ int c3 = count3 .get ();
1532+ if (c1 > 0 ) {
1533+ assertTrue (c2 > 0 || c3 > 0 );
1534+ }
1535+ else if (c2 > 0 ) {
1536+ assertTrue (c3 > 0 );
1537+ }
1538+ else {
1539+ fail ("At least 2 consumers should have gotten messages" );
1540+ }
14921541 });
14931542 }
14941543}
0 commit comments