Skip to content

Commit

Permalink
Merge pull request #54 from tzachshabtay/connect
Browse files Browse the repository at this point in the history
kafka-connect support
  • Loading branch information
tzachshabtay authored Apr 5, 2021
2 parents 2365bdf + e30de2d commit 3d1a6a4
Show file tree
Hide file tree
Showing 11 changed files with 428 additions and 21 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

# Krowser

Web UI to browse [kafka](https://kafka.apache.org/) and [schema registry](https://docs.confluent.io/current/schema-registry/index.html).
Web UI to browse [kafka](https://kafka.apache.org/), [schema registry](https://docs.confluent.io/current/schema-registry/index.html) and [kafka-connect](https://docs.confluent.io/platform/current/connect/).

## Features

- View kafka's topics, partitions, messages, consumer groups and brokers.
- View schema registry's subjects and schemas.
- View kafka-connect's connectors and tasks.
- The grid view destructures the messages/schemas (even nested json) into separate columns, and each of the columns can be (client-side) filtered and sorted.
- Support for avro auto-detecting and decoding (via the schema registry). Different subject messages in a topic are supported as well (and as you filter for a specific event type it auto-hides all of the irrelevant columns belonging to the other messages).
- The raw view shows the data in json format, and allows easy copying to clipboard
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"kafka-avro": "^3.2.0",
"kafkajs": "^1.15.0",
"long": "^4.0.0",
"node-fetch": "^2.6.1",
"qs": "^6.9.4",
"react": "^16.13.1",
"react-dom": "^16.13.1",
Expand Down
16 changes: 16 additions & 0 deletions src/client/common/grid.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ export const Grid: React.FunctionComponent<GridProps> = (props) => {
onFilterChanged={props.onFilterChanged}
enableCellTextSelection={true}
pagination={true}
columnTypes={
{
connectorState: { cellStyle: params => {
switch (params.value) {
case `RUNNING`:
return {color: theme === `dark` ? `lightgreen` : `darkgreen` }
case `FAILED`:
return {color: theme === `dark` ? `lightcoral` : `red`}
case `PAUSED`:
return {color: theme === `dark` ? `orange` : `darkorange`}
default:
return {}
}
}},
}
}
>
</AgGridReact>
</div>
Expand Down
15 changes: 13 additions & 2 deletions src/client/common/toolbar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,19 @@ const useStyles = makeStyles((theme) => ({
},
}));

const ThemeToggle: React.FunctionComponent = () => {
interface ThemeProps {
OnThemeChanged?: (theme: string) => void
}

const ThemeToggle: React.FunctionComponent<ThemeProps> = (props) => {
const {theme, saveTheme} = useTheme()

const handleTheme = (event: React.MouseEvent<HTMLElement>, newTheme: string) => {
if (newTheme !== null) {
saveTheme(newTheme)
if (props.OnThemeChanged) {
props.OnThemeChanged(newTheme)
}
}
};

Expand Down Expand Up @@ -108,6 +115,7 @@ interface Props {
onSearch?: React.ChangeEventHandler<HTMLTextAreaElement | HTMLInputElement>;
searchText?: string;
url: Url;
OnThemeChanged?: (theme: string) => void
}

export const KafkaToolbar: React.FunctionComponent<Props> = (props) => {
Expand Down Expand Up @@ -162,6 +170,9 @@ export const KafkaToolbar: React.FunctionComponent<Props> = (props) => {
<Link href="/schema-registry/subjects" color="inherit">
<MenuItem>Schema-Registry (subjects)</MenuItem>
</Link>
<Link href="/kafka-connect/connectors" color="inherit">
<MenuItem>Kafka-Connect (connectors)</MenuItem>
</Link>
<Divider/>
<Link href="/topics/messages" color="inherit">
<MenuItem><SearchIcon /> Search across topics</MenuItem>
Expand All @@ -175,7 +186,7 @@ export const KafkaToolbar: React.FunctionComponent<Props> = (props) => {
<Typography className={classes.title} variant="h6" noWrap>
{props.title}
</Typography>
<ThemeToggle/>
<ThemeToggle OnThemeChanged={props.OnThemeChanged}/>
{props.onSearch && (
<div className={classes.search}>
<div className={classes.searchIcon}>
Expand Down
2 changes: 1 addition & 1 deletion src/client/common/url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ export class Url {
const url = query ? `${this.BaseUrl}?${query}` : this.BaseUrl

//We're using window.history and not the router history because we don't want to navigate away, this is just for sharing url purposes.
window.history.replaceState(null, document.title, url)
window.history.replaceState(null, document.title, url || "?") //if base url was not set and they are no properties on the url it will be an empty string which seems to be ignored by replaceState so we're defaulting to "?"
}
}
158 changes: 158 additions & 0 deletions src/client/kafka-connect/connectors.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import React from "react";
import CircularProgress from '@material-ui/core/CircularProgress';
import { RouteComponentProps } from "react-router-dom";
import { KafkaToolbar} from '../common/toolbar';
import { DataView} from '../common/data_view';
import { ErrorMsg} from '../common/error_msg';
import { Url } from "../common/url";
import { ConnectorConfig, ConnectorState, GetConnectorConfigResult, GetConnectorsResult, GetConnectorStatusResult } from "../../shared/api";
import { CellButton, CellProps } from "../common/cell_button";
import { ColDef, GridApi, GridReadyEvent } from "ag-grid-community";
import { History } from 'history';

type State = {
search: string;
loading: boolean;
error?: string;
customCols: {cols: {}};
rows: Connector[];
}

type Connector = {
name: string;
state: ConnectorState | "Loading";
workerId: string | "Loading";
numTasks?: number;
type: string | "Loading";
config?: ConnectorConfig;
history: History<unknown>;
}

class ViewTasksButton extends React.Component<CellProps, {}> {
render() {
return <CellButton getUrl={() => `/kafka-connect/tasks/${this.props.data.name}`} {...this.props} />
}
}

//this is needed for ag-grid to correctly detect the field mappings (it interprets dots as deep references)
export function ReplaceDots(data: ConnectorConfig): ConnectorConfig {
const res: ConnectorConfig = {}
for (const prop in data) {
res[prop.split('.').join('->')] = data[prop]
}
return res
}

export class Connectors extends React.Component<RouteComponentProps, State> {
state: State = { loading: true, customCols: {cols: {}}, rows: [], search: "", error: "" }
gridApi: GridApi | null = null;
url: Url;

constructor(props: RouteComponentProps) {
super(props);
this.url = new Url(props.location.search, ``);
}

onGridReady = (params: GridReadyEvent) => {
this.gridApi = params.api;
}

async componentDidMount() {
const response = await fetch(`/api/kafka-connect/connectors`)
const data: GetConnectorsResult = await response.json()
if (data.error) {
this.setState({loading: false, error: data.error})
return
}
const rows: Connector[] = data.map(c => ({name: c, state: "Loading", workerId: "Loading", type: "Loading", history: this.props.history}))
const search = this.url.Get(`search`) || ``
this.setState({ loading: false, rows, search })
for (const connector of rows) {
await this.fetchConnector(connector)
}
}

async fetchConnector(connector: Connector) {
await this.fetchConnectorStatus(connector)
await this.fetchConnectorConfig(connector)
}

async fetchConnectorStatus(connector: Connector) {
const response = await fetch(`/api/kafka-connect/connector/${connector.name}/status`)
const data: GetConnectorStatusResult = await response.json()
if (data.error) {
this.setState({loading: false, error: data.error})
return
}
connector.numTasks = data.tasks.length
connector.state = data.connector.state
connector.workerId = data.connector.worker_id
connector.type = data.type
if (this.gridApi) {
this.gridApi.refreshCells()
}
this.forceUpdate();
}

async fetchConnectorConfig(connector: Connector) {
const response = await fetch(`/api/kafka-connect/connector/${connector.name}/config`)
let data: GetConnectorConfigResult = await response.json()
if (data.error) {
this.setState({loading: false, error: data.error})
return
}
data = ReplaceDots(data)
connector.config = data
this.state.customCols.cols = {...this.state.customCols.cols, ...data}
this.refreshGrid()
}

refreshGrid() {
if (this.gridApi) {
this.gridApi.refreshCells({force: true})
}
this.forceUpdate();
}

getColumnDefs() {
const cols: ColDef[] = [
{ headerName: "Name", field: "name" },
{ headerName: "State", field: "state", type: "connectorState" },
{ headerName: "Worker ID", field: "workerId" },
{ headerName: "Type", field: "type" },
{ headerName: "#Tasks", field: "numTasks", filter: "agNumberColumnFilter", cellRendererFramework: ViewTasksButton },
]
for (const prop in this.state.customCols.cols) {
const field = `config.${prop}`
const orig = prop.split('->').join('.')
const headerName = `config.${orig}`
cols.push({headerName, field})
}
return cols
}

render() {
return (
<>
<KafkaToolbar
title="Connectors"
url={this.url}
searchText={this.state.search}
onSearch={e => this.setState({ search: e.target.value })}
OnThemeChanged={ _ => this.refreshGrid()}>
</KafkaToolbar>
{this.state.loading && <><CircularProgress /><div>Loading...</div></>}
<ErrorMsg error={this.state.error} prefix="Failed to fetch connectors. Error: "></ErrorMsg>
{!this.state.loading && <DataView
searchQuery={this.state.search}
search={r => r.host.includes(this.state.search)}
rows={this.state.rows}
raw={this.state.rows}
url={this.url}
columnDefs={this.getColumnDefs()}
onGridReady={this.onGridReady}>
</DataView>}
</>
)
}
}
119 changes: 119 additions & 0 deletions src/client/kafka-connect/tasks.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import React from "react";
import CircularProgress from '@material-ui/core/CircularProgress';
import { RouteComponentProps } from "react-router-dom";
import { KafkaToolbar} from '../common/toolbar';
import { DataView} from '../common/data_view';
import { ErrorMsg} from '../common/error_msg';
import { Url } from "../common/url";
import { ConnectorConfig, ConnectorState, GetConnectorTasksResult, GetConnectorTaskStatusResult } from "../../shared/api";
import { ColDef, GridApi, GridReadyEvent } from "ag-grid-community";
import { ReplaceDots } from "./connectors";

type State = {
search: string;
loading: boolean;
error?: string;
customCols: {cols: {}};
rows: Task[];
}

type Task = {
id: number;
state: ConnectorState | "Loading";
workerId: string | "Loading";
config: ConnectorConfig;
}

export class Tasks extends React.Component<RouteComponentProps<{ connector: string }>, State> {
state: State = { loading: true, customCols: {cols: {}}, rows: [], search: "", error: "" }
gridApi: GridApi | null = null;
url: Url;

constructor(props: RouteComponentProps<{ connector: string }>) {
super(props);
this.url = new Url(props.location.search, ``);
}

onGridReady = (params: GridReadyEvent) => {
this.gridApi = params.api;
}

async componentDidMount() {
const response = await fetch(`/api/kafka-connect/connector/${this.props.match.params.connector}/tasks`)
const data: GetConnectorTasksResult = await response.json()
if (data.error) {
this.setState({loading: false, error: data.error})
return
}
const rows: Task[] = []
const search = this.url.Get(`search`) || ``
for (const c of data) {
const config = ReplaceDots(c.config)
rows.push({id: c.id.task, state: "Loading", workerId: "Loading", config })
this.state.customCols.cols = {...this.state.customCols.cols, ...config}
}
this.setState({ loading: false, rows, search })
for (const c of rows) {
await this.fetchTaskStatus(c)
}
}

async fetchTaskStatus(task: Task) {
const response = await fetch(`/api/kafka-connect/connector/${this.props.match.params.connector}/tasks/${task.id}/status`)
const data: GetConnectorTaskStatusResult = await response.json()
if (data.error) {
this.setState({loading: false, error: data.error})
return
}
task.state = data.state
task.workerId = data.worker_id
this.refreshGrid()
}

refreshGrid() {
if (this.gridApi) {
this.gridApi.refreshCells()
}
this.forceUpdate();
}

getColumnDefs() {
const cols: ColDef[] = [
{ headerName: "ID", field: "id" },
{ headerName: "State", field: "state", type: "connectorState" },
{ headerName: "Worker ID", field: "workerId" },
]
for (const prop in this.state.customCols.cols) {
const field = `config.${prop}`
const orig = prop.split('->').join('.')
const headerName = `config.${orig}`
cols.push({headerName, field})
}
return cols
}

render() {
return (
<>
<KafkaToolbar
title={`Tasks for connector: ${this.props.match.params.connector}`}
url={this.url}
searchText={this.state.search}
onSearch={e => this.setState({ search: e.target.value })}
OnThemeChanged={_ => this.refreshGrid()}>
</KafkaToolbar>
{this.state.loading && <><CircularProgress /><div>Loading...</div></>}
<ErrorMsg error={this.state.error} prefix="Failed to fetch tasks. Error: "></ErrorMsg>
{!this.state.loading && <DataView
searchQuery={this.state.search}
search={r => r.host.includes(this.state.search)}
rows={this.state.rows}
raw={this.state.rows}
url={this.url}
columnDefs={this.getColumnDefs()}
onGridReady={this.onGridReady}>
</DataView>}
</>
)
}
}
Loading

0 comments on commit 3d1a6a4

Please sign in to comment.