Skip to content

Commit

Permalink
[INLONG-10566][Manager] fix auth failed and cronexpression field type
Browse files Browse the repository at this point in the history
  • Loading branch information
Aloys Zhang committed Jul 4, 2024
1 parent 8f75bf6 commit 518e876
Show file tree
Hide file tree
Showing 17 changed files with 54 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class ScheduleEntity implements Serializable {
private String inlongGroupId;
// schedule type, support [normal, crontab], 0 for normal and 1 for crontab
private Integer scheduleType;
// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
private String scheduleUnit;
private Integer scheduleInterval;
// schedule start time, long type timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ public abstract class InlongGroupInfo extends BaseInlongGroup {
@ApiModelProperty("Schedule type")
private Integer scheduleType;

// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
@ApiModelProperty("TimeUnit for schedule interval")
private String scheduleUnit;

Expand All @@ -165,8 +165,8 @@ public abstract class InlongGroupInfo extends BaseInlongGroup {
@ApiModelProperty("Schedule task parallelism")
private Integer taskParallelism;

@ApiModelProperty("Schedule task parallelism")
private Integer crontabExpression;
@ApiModelProperty("Cron expression")
private String crontabExpression;

public abstract InlongGroupRequest genRequest();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
@ApiModelProperty("Schedule type")
private Integer scheduleType;

// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
@ApiModelProperty("TimeUnit for schedule interval")
private String scheduleUnit;

Expand All @@ -158,7 +158,7 @@ public abstract class InlongGroupRequest extends BaseInlongGroup {
@ApiModelProperty("Schedule task parallelism")
private Integer taskParallelism;

@ApiModelProperty("Schedule task parallelism")
private Integer crontabExpression;
@ApiModelProperty("Cron expression")
private String crontabExpression;

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class ScheduleInfo {
@ApiModelProperty("Schedule type")
private Integer scheduleType;

// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
@ApiModelProperty("TimeUnit for schedule interval")
private String scheduleUnit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public class ScheduleInfoRequest {
@ApiModelProperty("Schedule type")
private Integer scheduleType;

// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
// time unit for offline task schedule interval, support [month, week, day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
@ApiModelProperty("TimeUnit for schedule interval")
private String scheduleUnit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public enum ScheduleUnit {
HOUR("H"),
MINUTE("I"),
SECOND("S"),
ONE_WAY("O");
ONE_ROUND("O");

final String unit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public class QuartzScheduleEngine implements ScheduleEngine {
@Value("${server.port:8083}")
private int port;

@Value("${inlong.inner.secrete.id:admin}")
private String secretId;
@Value("${default.admin.user:admin}")
private String username;

@Value("${inlong.inner.secrete.key:87haw3VYTPqK5fK0}")
private String secretKey;
@Value("${default.admin.password:inlong}")
private String password;

private final Scheduler scheduler;
private final Set<String> scheduledJobSet = new HashSet<>();
Expand All @@ -83,7 +83,7 @@ public void start() {
scheduler.getListenerManager().addSchedulerListener(new QuartzSchedulerListener(this));
scheduler.start();
LOGGER.info("Quartz scheduler engine started, inlong manager host {}, port {}, secretId {}",
host, port, secretId);
host, port, username);
} catch (SchedulerException e) {
throw new QuartzScheduleException("Failed to start quartz scheduler ", e);
}
Expand Down Expand Up @@ -112,7 +112,7 @@ public boolean handleRegister(ScheduleInfo scheduleInfo, Class<? extends Job> cl
if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
throw new QuartzScheduleException("Group " + scheduleInfo.getInlongGroupId() + " is already registered");
}
JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz, host, port, secretId, secretKey);
JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz, host, port, username, password);
Trigger trigger = genQuartzTrigger(jobDetail, scheduleInfo);
try {
scheduler.scheduleJob(jobDetail, trigger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static Trigger genQuartzTrigger(JobDetail jobDetail, ScheduleInfo schedul
}
}

// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
public static ScheduleBuilder<SimpleTrigger> genSimpleQuartzScheduleBuilder(int interval, String scheduleUnit) {
if (StringUtils.isBlank(scheduleUnit)) {
throw new QuartzScheduleException("Schedule unit cannot be empty");
Expand Down Expand Up @@ -143,11 +143,11 @@ public static ScheduleBuilder<SimpleTrigger> genSimpleQuartzScheduleBuilder(int
.simpleSchedule()
.withIntervalInSeconds(interval)
.repeatForever();
case ONE_WAY:
case ONE_ROUND:
return SimpleScheduleBuilder
.simpleSchedule()
.withIntervalInSeconds(interval)
.withRepeatCount(1);
.withRepeatCount(0);
default:
throw new QuartzScheduleException("Not supported schedule interval" + scheduleUnit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected long calculateScheduleTimes(ScheduleInfo scheduleInfo, boolean isCron)
return timeSpanInMs / 1000 / 60 / interval;
case SECOND:
return timeSpanInMs / 1000 / interval;
case ONE_WAY:
case ONE_ROUND:
return 1;
default:
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class MockJob implements Job {
public class MockQuartzJob implements Job {

private static final Logger LOGGER = LoggerFactory.getLogger(MockJob.class);
private static final Logger LOGGER = LoggerFactory.getLogger(MockQuartzJob.class);

public static CountDownLatch countDownLatch;
private static AtomicInteger counter = new AtomicInteger(0);
Expand All @@ -38,7 +38,9 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
if (countDownLatch.getCount() > 0) {
countDownLatch.countDown();
}
LOGGER.info("MockJob executed " + counter.incrementAndGet());
LOGGER.info("MockJob executed {} times ", counter.incrementAndGet());
LOGGER.info("Fire time: {}, previous fire time: {} next fire time: {}",
context.getScheduledFireTime(), context.getPreviousFireTime(), context.getNextFireTime());
}

public static void setCount(int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ private void testRegister(ScheduleInfo scheduleInfo, boolean isCrontab) throws E
// cal total schedule times
long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab);
// set countdown latch
MockJob.setCount((int) expectCount);
MockQuartzJob.setCount((int) expectCount);
// register schedule info
scheduleEngine.handleRegister(scheduleInfo, MockJob.class);
scheduleEngine.handleRegister(scheduleInfo, MockQuartzJob.class);
// check job exist
assertEquals(1, scheduleEngine.getScheduledJobSet().size());
JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId());
boolean exist = scheduleEngine.getScheduler().checkExists(jobKey);
assertTrue(exist);
MockJob.countDownLatch.await();
MockQuartzJob.countDownLatch.await();

// not job exist after scheduled
await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
Expand All @@ -92,15 +92,15 @@ private void testUnRegister(ScheduleInfo scheduleInfo, boolean isCrontab) throws
// cal total schedule times
long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab);

MockJob.setCount((int) (expectCount / 2));
MockQuartzJob.setCount((int) (expectCount / 2));
// register schedule info
scheduleEngine.handleRegister(scheduleInfo, MockJob.class);
scheduleEngine.handleRegister(scheduleInfo, MockQuartzJob.class);
// check job exist
assertEquals(1, scheduleEngine.getScheduledJobSet().size());
JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId());
boolean exist = scheduleEngine.getScheduler().checkExists(jobKey);
assertTrue(exist);
MockJob.countDownLatch.await();
MockQuartzJob.countDownLatch.await();

// un-register before trigger finalized
scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId());
Expand Down Expand Up @@ -130,27 +130,27 @@ public void testUpdate(ScheduleInfo scheduleInfo, ScheduleInfo scheduleInfoToUpd
throws Exception {
// cal total schedule times
long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab);
MockJob.setCount((int) (expectCount / 2));
MockQuartzJob.setCount((int) (expectCount / 2));
// register schedule info
scheduleEngine.handleRegister(scheduleInfo, MockJob.class);
scheduleEngine.handleRegister(scheduleInfo, MockQuartzJob.class);
// check job exist
assertEquals(1, scheduleEngine.getScheduledJobSet().size());
JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId());
boolean exist = scheduleEngine.getScheduler().checkExists(jobKey);
assertTrue(exist);
MockJob.countDownLatch.await();
MockQuartzJob.countDownLatch.await();

// update schedule before trigger finalized
expectCount = calculateScheduleTimes(scheduleInfoToUpdate, isCrontab);
MockJob.setCount((int) expectCount);
scheduleEngine.handleUpdate(scheduleInfoToUpdate, MockJob.class);
MockQuartzJob.setCount((int) expectCount);
scheduleEngine.handleUpdate(scheduleInfoToUpdate, MockQuartzJob.class);

// job scheduled after updated
assertEquals(1, scheduleEngine.getScheduledJobSet().size());
exist = scheduleEngine.getScheduler().checkExists(jobKey);
assertTrue(exist);

MockJob.countDownLatch.await();
MockQuartzJob.countDownLatch.await();

// not job exist after scheduled
await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.schedule.BaseScheduleTest;
import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
import org.apache.inlong.manager.schedule.quartz.MockJob;
import org.apache.inlong.manager.schedule.quartz.MockQuartzJob;

import org.junit.jupiter.api.Test;
import org.quartz.CronScheduleBuilder;
Expand All @@ -39,7 +39,7 @@
import static org.apache.inlong.manager.schedule.ScheduleUnit.HOUR;
import static org.apache.inlong.manager.schedule.ScheduleUnit.MINUTE;
import static org.apache.inlong.manager.schedule.ScheduleUnit.MONTH;
import static org.apache.inlong.manager.schedule.ScheduleUnit.ONE_WAY;
import static org.apache.inlong.manager.schedule.ScheduleUnit.ONE_ROUND;
import static org.apache.inlong.manager.schedule.ScheduleUnit.WEEK;
import static org.apache.inlong.manager.schedule.ScheduleUnit.YEAR;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -76,7 +76,7 @@ public void testGenScheduleBuilder() {
assertNotNull(builder);
assertInstanceOf(SimpleScheduleBuilder.class, builder);

builder = ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, ONE_WAY.getUnit());
builder = ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, ONE_ROUND.getUnit());
assertNotNull(builder);
assertInstanceOf(SimpleScheduleBuilder.class, builder);

Expand All @@ -102,7 +102,8 @@ public void testGenScheduleBuilder() {
@Test
public void testGenJobDetail() {
ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, MockJob.class, null, null, null, null);
JobDetail jobDetail =
ScheduleUtils.genQuartzJobDetail(scheduleInfo, MockQuartzJob.class, null, null, null, null);
assertNotNull(jobDetail);

JobKey jobKey = jobDetail.getKey();
Expand All @@ -116,7 +117,8 @@ public void testGenJobDetail() {
public void testGenCronTrigger() {
// normal
ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, MockJob.class, null, null, null, null);
JobDetail jobDetail =
ScheduleUtils.genQuartzJobDetail(scheduleInfo, MockQuartzJob.class, null, null, null, null);

Trigger trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo);
assertNotNull(trigger);
Expand All @@ -139,7 +141,7 @@ public void testGenCronTrigger() {

// cron
scheduleInfo = genDefaultCronScheduleInfo();
jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, MockJob.class, null, null, null, null);
jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, MockQuartzJob.class, null, null, null, null);

trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo);
assertNotNull(trigger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public boolean accept(WorkflowContext context) {
return false;
}

log.info("add startup group listener for groupId [{}]", groupId);
log.info("add group schedule resource listener for groupId [{}]", groupId);
return InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated',
`schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab',
`schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway',
`schedule_unit` varchar(64) DEFAULT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround',
`schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule',
`end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'End time for schedule',
Expand Down
2 changes: 1 addition & 1 deletion inlong-manager/manager-web/sql/apache_inlong_manager.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated',
`schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab',
`schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway',
`schedule_unit` varchar(64) DEFAULT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround',
`schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule',
`end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'End time for schedule',
Expand Down
2 changes: 1 addition & 1 deletion inlong-manager/manager-web/sql/changes-1.13.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated',
`schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab',
`schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway',
`schedule_unit` varchar(64) DEFAULT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround',
`schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule',
`end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'End time for schedule',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,3 @@ inlong.encrypt.key.value1="I!N@L#O$N%G^"

# clients (e.g. agent and dataproxy) must be authenticated by secretId and secretKey if turned on
openapi.auth.enabled=false

# the secreteId and secreteKey for inlong sub-system communication
# used for offline job schedule now:
# 1. when register schedule info, secreteId and secreteKey will be registered to schedule engine
# and the schedule instance will call back to submit offline job with secreteId and secreteKey
inlong.inner.secrete.id=admin
inlong.inner.secrete.key=87haw3VYTPqK5fK0

0 comments on commit 518e876

Please sign in to comment.