Skip to content

Commit

Permalink
Merge pull request cockroachdb#12240 from knz/fix-using-join
Browse files Browse the repository at this point in the history
Miscellaneous JOIN improvements.
  • Loading branch information
knz authored Dec 10, 2016
2 parents 87064e4 + 95ae758 commit df70a9d
Show file tree
Hide file tree
Showing 5 changed files with 535 additions and 781 deletions.
139 changes: 81 additions & 58 deletions pkg/sql/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,47 @@ type sourceAlias struct {
columnRange columnRange
}

func (src *dataSourceInfo) String() string {
var buf bytes.Buffer
for i := range src.sourceColumns {
if i > 0 {
buf.WriteByte('\t')
}
fmt.Fprintf(&buf, "%d", i)
}
buf.WriteString("\toutput column positions\n")
for i, c := range src.sourceColumns {
if i > 0 {
buf.WriteByte('\t')
}
if c.hidden {
buf.WriteByte('*')
}
buf.WriteString(c.Name)
}
buf.WriteString("\toutput column names\n")
for _, a := range src.sourceAliases {
for i := range src.sourceColumns {
if i > 0 {
buf.WriteByte('\t')
}
for _, j := range a.columnRange {
if i == j {
buf.WriteString("x")
break
}
}
}
if a.name == anonymousTable {
buf.WriteString("\t<anonymous table>")
} else {
fmt.Fprintf(&buf, "\t'%s'", a.name.String())
}
fmt.Fprintf(&buf, " - %q\n", a.columnRange)
}
return buf.String()
}

type sourceAliases []sourceAlias

// srcIdx looks up a source by qualified name and returns the index of the
Expand Down Expand Up @@ -622,40 +663,44 @@ func (sources multiSourceInfo) findColumn(c *parser.ColumnItem) (srcIdx int, col
c.TableName.DatabaseName = tableName.DatabaseName
}

findColHelper := func(src *dataSourceInfo, iSrc, srcIdx, colIdx int, idx int) (int, int, error) {
col := src.sourceColumns[idx]
if parser.ReNormalizeName(col.Name) == colName {
if colIdx != invalidColIdx {
return invalidSrcIdx, invalidColIdx, fmt.Errorf("column reference %q is ambiguous", c)
}
srcIdx = iSrc
colIdx = idx
}
return srcIdx, colIdx, nil
}

colIdx = invalidColIdx
for iSrc, src := range sources {
findColHelper := func(src *dataSourceInfo, iSrc, srcIdx, colIdx int, idx int) (int, int, error) {
col := src.sourceColumns[idx]
if parser.ReNormalizeName(col.Name) == colName {
if colIdx != invalidColIdx {
return invalidSrcIdx, invalidColIdx, fmt.Errorf("column reference %q is ambiguous", c)
}
srcIdx = iSrc
colIdx = idx
colRange, ok := src.sourceAliases.columnRange(tableName)
if !ok {
// The data source "src" has no column for table tableName.
// Try again with the net one.
continue
}
for _, idx := range colRange {
srcIdx, colIdx, err = findColHelper(src, iSrc, srcIdx, colIdx, idx)
if err != nil {
return srcIdx, colIdx, err
}
return srcIdx, colIdx, nil
}
}

if tableName.Table() == "" {
if colIdx == invalidColIdx && tableName.Table() == "" {
// Try harder: unqualified column names can look at all
// columns, not just columns of the anonymous table.
for iSrc, src := range sources {
for idx := 0; idx < len(src.sourceColumns); idx++ {
srcIdx, colIdx, err = findColHelper(src, iSrc, srcIdx, colIdx, idx)
if err != nil {
return srcIdx, colIdx, err
}
}
} else {
colRange, ok := src.sourceAliases.columnRange(tableName)
if !ok {
// The data source "src" has no column for table tableName.
// Try again with the net one.
continue
}
for _, idx := range colRange {
srcIdx, colIdx, err = findColHelper(src, iSrc, srcIdx, colIdx, idx)
if err != nil {
return srcIdx, colIdx, err
}
}
}
}

Expand All @@ -666,55 +711,33 @@ func (sources multiSourceInfo) findColumn(c *parser.ColumnItem) (srcIdx int, col
return srcIdx, colIdx, nil
}

// concatDataSourceInfos creates a new dataSourceInfo that represents
// the side-by-side concatenation of the two data sources described by
// its arguments.
func concatDataSourceInfos(left *dataSourceInfo, right *dataSourceInfo) *dataSourceInfo {
aliases := make(sourceAliases, 0, len(left.sourceAliases)+len(right.sourceAliases))
aliases = append(aliases, left.sourceAliases...)

nColsLeft := len(left.sourceColumns)
for _, alias := range right.sourceAliases {
newRange := make(columnRange, len(alias.columnRange))
for i, idx := range alias.columnRange {
newRange[i] = idx + nColsLeft
}
aliases = append(aliases, sourceAlias{
name: alias.name,
columnRange: newRange,
})
}

columns := make(ResultColumns, 0, len(left.sourceColumns)+len(right.sourceColumns))
columns = append(columns, left.sourceColumns...)
columns = append(columns, right.sourceColumns...)

return &dataSourceInfo{sourceColumns: columns, sourceAliases: aliases}
}

// findTableAlias returns the first table alias providing the column
// index given as argument. The index must be valid.
func (src *dataSourceInfo) findTableAlias(colIdx int) parser.TableName {
func (src *dataSourceInfo) findTableAlias(colIdx int) (parser.TableName, bool) {
for _, alias := range src.sourceAliases {
for _, idx := range alias.columnRange {
if colIdx == idx {
return alias.name
return alias.name, true
}
}
}
panic(fmt.Sprintf("no alias for position %d in %v", colIdx, src.sourceAliases))
return anonymousTable, false
}

func (src *dataSourceInfo) FormatVar(buf *bytes.Buffer, f parser.FmtFlags, colIdx int) {
if f == parser.FmtQualify {
tableAlias := src.findTableAlias(colIdx)
if tableAlias.TableName != "" {
if tableAlias.DatabaseName != "" {
parser.FormatNode(buf, f, tableAlias.DatabaseName)
tableAlias, found := src.findTableAlias(colIdx)
if found {
if tableAlias.TableName != "" {
if tableAlias.DatabaseName != "" {
parser.FormatNode(buf, f, tableAlias.DatabaseName)
buf.WriteByte('.')
}
parser.FormatNode(buf, f, tableAlias.TableName)
buf.WriteByte('.')
}
parser.FormatNode(buf, f, tableAlias.TableName)
buf.WriteByte('.')
} else {
buf.WriteString("_.")
}
}
buf.WriteString(src.sourceColumns[colIdx].Name)
Expand Down
Loading

0 comments on commit df70a9d

Please sign in to comment.