Skip to content

Commit

Permalink
[Improve] custom-code job read conf from jar support (#4055)
Browse files Browse the repository at this point in the history
* [Improve] custom-code job read conf from jar support

* [Improve] read conf bug fixed.

* [Improve] execution-runtime-mode bug fixed.
  • Loading branch information
wolfboys authored Sep 14, 2024
1 parent b9ca0d6 commit 28c9fe8
Show file tree
Hide file tree
Showing 60 changed files with 349 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,20 @@ object FileUtils {
buffer.toString()
}

@throws[IOException]
def readString(in: InputStream): String = {
require(in != null)
val scanner = new Scanner(in)
val buffer = new mutable.StringBuilder()
if (scanner.hasNextLine) {
buffer.append(scanner.nextLine())
}
while (scanner.hasNextLine) {
buffer.append("\r\n")
buffer.append(scanner.nextLine())
}
Utils.close(scanner)
buffer.toString()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ public interface SavepointService extends IService<Savepoint> {
String getSavePointPath(Application app) throws Exception;

String processPath(String path, String jobName, Long jobId);

void saveSavePoint(Savepoint savepoint);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ public void cancel(Application appParam) throws Exception {
savepoint.setType(CheckPointType.SAVEPOINT.get());
savepoint.setCreateTime(new Date());
savepoint.setTriggerTime(triggerTime);
savepointService.save(savepoint);
savepointService.saveSavePoint(savepoint);
}

if (application.isKubernetesModeJob()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.net.URI;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -111,15 +112,12 @@ public class SavepointServiceImpl extends ServiceImpl<SavepointMapper, Savepoint
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
ThreadUtils.threadFactory("streampark-flink-savepoint-trigger"));

@Autowired private SavepointMapper savepointMapper;

@Override
public void expire(Long appId) {
Savepoint savepoint = new Savepoint();
savepoint.setLatest(false);
LambdaQueryWrapper<Savepoint> queryWrapper =
new LambdaQueryWrapper<Savepoint>().eq(Savepoint::getAppId, appId);
this.update(savepoint, queryWrapper);
savepointMapper.cleanLatest(appId);
}

private void expire(Savepoint entity) {
Expand Down Expand Up @@ -226,12 +224,13 @@ public Savepoint getLatest(Long id) {
LambdaQueryWrapper<Savepoint> queryWrapper =
new LambdaQueryWrapper<Savepoint>()
.eq(Savepoint::getAppId, id)
.eq(Savepoint::getLatest, true);
Savepoint savepoint = this.baseMapper.selectOne(queryWrapper);
if (savepoint == null) {
savepoint = this.baseMapper.findLatestByTime(id);
.eq(Savepoint::getLatest, true)
.orderByDesc(Savepoint::getCreateTime);
List<Savepoint> savepointList = this.baseMapper.selectList(queryWrapper);
if (!savepointList.isEmpty()) {
return savepointList.get(0);
}
return savepoint;
return this.baseMapper.findLatestByTime(id);
}

@Override
Expand Down Expand Up @@ -315,11 +314,10 @@ public String processPath(String path, String jobName, Long jobId) {
}

@Override
public boolean save(Savepoint savepoint) {
public void saveSavePoint(Savepoint savepoint) {
this.expire(savepoint);
this.expire(savepoint.getAppId());
this.cleanLatest(savepoint.getAppId());
return super.save(savepoint);
super.save(savepoint);
}

private void cleanLatest(Long appId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Long appId) {
savepoint.setPath(checkPoint.getExternalPath());
savepoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
savepoint.setCreateTime(new Date());
savepointService.save(savepoint);
savepointService.saveSavePoint(savepoint);
}

public static class Counter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,16 @@ span {
.ant-modal-content,
.ant-tree-checkbox-inner,
.ant-table,
.ant-card,
.ant-alert,
.bold-tag,
.bold-tag > .ant-tag,
.ant-tabs-tab,
.ant-btn-group,
textarea.ant-input,
.ant-select-selector,
.streampark-page-wrapper,
.streampark-page-wrapper-content,
.ant-upload.ant-upload-drag {
border-radius: 0 !important;
border-radius: 1px !important;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@
padding: 16px;
overflow: auto;
line-height: 1.45;
border-radius: 6px;
border-radius: 2px;
}
}

.swal2-container,
.swal2-popup,
.swal2-confirm,
.swal2-styled {
border-radius: 1px !important;
}

[data-theme="dark"] {

.swal2-popup.swal2-toast {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
@multiple-height: 30px;

// headers
@header-height: 50px;
@header-height: 64px;

// logo width
@logo-width: 48px;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import { useDesign } from '/@/hooks/web/useDesign';
import { useLayoutHeight } from '../content/useContentViewHeight';
const HEADER_HEIGHT = 48;
const HEADER_HEIGHT = 64;
const TABS_HEIGHT = 32;
export default defineComponent({
name: 'LayoutMultipleHeader',
components: { LayoutHeader, MultipleTabs },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,13 @@ export const useFlinkApplication = (openStartModal: Fn) => {
],
content: () => {
return (
<Form class="!pt-40px">
<Form
class="!pt-40px"
layout='vertical'
baseColProps = {{ span: 20, offset: 2 }}
>
<Form.Item
label="Job Name"
labelCol={{ lg: { span: 5 }, sm: { span: 5 } }}
wrapperCol={{ lg: { span: 18 }, sm: { span: 18 } }}
validateStatus={unref(validateStatus)}
help={help}
rules={[{ required: true }]}
Expand Down Expand Up @@ -319,8 +321,8 @@ export const useFlinkApplication = (openStartModal: Fn) => {
class="!pt-40px"
ref={mappingRef}
name="mappingForm"
labelCol={{ lg: { span: 5 }, sm: { span: 5 } }}
wrapperCol={{ lg: { span: 18 }, sm: { span: 18 } }}
baseColProps = {{ span: 20, offset: 2 }}
layout='vertical'
v-model:model={formValue}
>
<Form.Item label="Job Name">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.streampark.common.util.{Logger, Utils}
import org.apache.streampark.flink.client.`trait`.KubernetesNativeClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.client.tool.FlinkSessionSubmitHelper
import org.apache.streampark.flink.core.FlinkKubernetesClient
import org.apache.streampark.flink.deployment.FlinkKubernetesClient
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import org.apache.streampark.flink.kubernetes.model.ClusterKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.streampark.flink.client.impl

import org.apache.streampark.flink.client.`trait`.YarnClientTrait
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.YarnClusterDescriptorWrapper
import org.apache.streampark.flink.deployment.YarnClusterDescriptorWrapper
import org.apache.streampark.flink.util.FlinkUtils

import org.apache.flink.client.program.PackagedProgram
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package org.apache.streampark.flink.client.`trait`

import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode}
import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode}
import org.apache.streampark.common.util.{DeflaterUtils, Logger, PropertiesUtils, Utils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
import org.apache.streampark.flink.deployment.FlinkClusterClient

import com.google.common.collect.Lists
import org.apache.commons.cli.{CommandLine, Options}
Expand All @@ -36,7 +35,6 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.client.program.{ClusterClient, PackagedProgram, PackagedProgramUtils}
import org.apache.flink.configuration._
import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions}
import org.apache.flink.util.FlinkException
import org.apache.flink.util.Preconditions.checkNotNull

import java.io.File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
import org.apache.streampark.flink.core.{FlinkTableInitializer, TableContext}

import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.table.api.TableConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import scala.util.Try

object EnhancerImplicit {
private[flink] object EnhancerImplicit {

implicit class EnhanceParameterTool(parameterTool: ParameterTool) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.core.conf
package org.apache.streampark.flink.core

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration

case class FlinkConfiguration(
private[flink] case class FlinkConfiguration(
parameter: ParameterTool,
envConfig: Configuration,
tableConfig: Configuration)
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable
import scala.util.Try

object FlinkSqlExecutor extends Logger {
private[flink] object FlinkSqlExecutor extends Logger {

private[this] val lock = new ReentrantReadWriteLock().writeLock

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.flink.table.planner.delegation.FlinkSqlParserFactories

import scala.util.{Failure, Try}

object FlinkSqlValidator extends Logger {
private[flink] object FlinkSqlValidator extends Logger {

private[this] val FLINK112_CALCITE_PARSER_CLASS =
"org.apache.flink.table.planner.calcite.CalciteParser"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.streampark.flink.core

import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.flink.core.EnhancerImplicit._

import EnhancerImplicit._
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.api.common.{JobExecutionResult, RuntimeExecutionMode}
import org.apache.flink.api.common.cache.DistributedCache
Expand Down
Loading

0 comments on commit 28c9fe8

Please sign in to comment.