Closest Airport
Nearest Neighbor
{
"namespace": "com.objectpartners.buesing.avro",
"name": "Aircraft",
"type": "record",
"fields": [
{
"name": "transponder",
"type": "string"
},
{
"name": "callsign",
"type": "string"
}
]
}
{
"namespace": "com.objectpartners.buesing.avro",
"name": "Location",
"type": "record",
"fields": [
{
"name": "latitude",
"type": "double"
},
{
"name": "longitude",
"type": "double"
}
]
}
{
"namespace": "com.objectpartners.buesing.avro",
"name": "Record",
"type": "record",
"fields": [
{
"name": "origin",
"type": ["null", "string"],
"default": null
},
{
"name": "aircraft",
"type": "com.objectpartners.buesing.avro.Aircraft"
},
{
"name": "location",
"type": "com.objectpartners.buesing.avro.Location"
},
{
"name": "altitude",
"type": ["null", "double"],
"default": null
},
{
"name": "onGround",
"type": "boolean"
},
{
"name": "velocity",
"type": ["null", "double"],
"default": null
},
{
"name": "verticalRate",
"type": ["null", "double"],
"default": null
}
]
}
{
"namespace": "com.objectpartners.buesing.avro",
"name": "Distance",
"type": "record",
"fields": [
{
"name": "red",
"type": "com.objectpartners.buesing.avro.Record"
},
{
"name": "blue",
"type": "com.objectpartners.buesing.avro.Record"
},
{
"name": "distance",
"type": "double"
}
]
}
{
"namespace": "com.objectpartners.buesing.avro",
"name": "NearestAirport",
"type": "record",
"fields": [
{
"name": "airport",
"type": "string"
},
{
"name": "latitude",
"type": "double"
},
{
"name": "longitude",
"type": "double"
}
]
}
{
"namespace": "com.objectpartners.buesing.avro",
"name": "Count",
"type": "record",
"fields": [
{
"name": "label",
"type": "string"
},
{
"name": "value",
"type": "int"
}
]
}
@Bean
public KStream<String, Record> red() {
return streamsBuilder
.stream("red",
Consumed.with(Serdes.String(), recordSerde));
}
@Bean
public KStream<String, Record> blue() {
return streamsBuilder
.stream("blue",
Consumed.with(Serdes.String(), recordSerde));
}
@Bean
public KStream<String, NearestAirport> nearestAirport() {
return red().map((key, value) -> {
String airport = geolocation.closestAirport(
value.getLocation().getLatitude(),
value.getLocation().getLongitude()
).getCode();
return KeyValue.pair(
key,
new NearestAirport(
airport,
value.getLocation().getLatitude(),
value.getLocation().getLongitude()
)
);
}).through(
"red.nearest.airport",
Produced.with(Serdes.String(), nearestAirportSerde)
);
}
@Bean
public KStream<String, Count> nearestAirportCount() {
KStream<String, Count> bean = nearestAirport()
.selectKey((key, value) -> value.getAirport())
.groupByKey()
.windowedBy(TimeWindows.of(WINDOW))
.aggregate(
() -> 0,
(key, value, aggregate) -> aggregate + 1,
Materialized.with(Serdes.String(), Serdes.Integer())
).toStream((wk, v) -> wk.key())
.mapValues(Count::new)
.through("red.nearest.airport.count", Produced.with(Serdes.String(), countSerde)
);
return bean;
}
create stream red with (kafka_topic='red', value_format='avro');
@Bean
public KStream<String, Record> red() {
return streamsBuilder
.stream("red",
Consumed.with(Serdes.String(), recordSerde));
}
create stream blue with (kafka_topic='blue', value_format='avro');
@Bean
public KStream<String, Record> blue() {
return streamsBuilder
.stream("blue",
Consumed.with(Serdes.String(), recordSerde));
}
create stream \
ksql_red_nearest_airport with (PARTITIONS=8) \
as select \
aircraft->transponder transponder, \
closestAirport(location->latitude, location->longitude) as airport, \
location \
from red \
partition by transponder;
@Bean
public KStream<String, NearestAirport> nearestAirport() {
return red().map((key, value) -> {
String airport = geolocation.closestAirport(
value.getLocation().getLatitude(),
value.getLocation().getLongitude()
).getCode();
return KeyValue.pair(
key,
new NearestAirport(
airport,
value.getLocation().getLatitude(),
value.getLocation().getLongitude()
)
);
}).through(
"red.nearest.airport",
Produced.with(Serdes.String(), nearestAirportSerde)
);
}
create table \
ksql_red_nearest_airport_count with (PARTITIONS=8) \
as select \
airport, \
count(*) as count \
from ksql_red_nearest_airport window tumbling (size 4 hours) \
group by airport;
@Bean
public KStream<String, Count> nearestAirportCount() {
KStream<String, Count> bean = nearestAirport()
.selectKey((key, value) -> value.getAirport())
.groupByKey()
.windowedBy(TimeWindows.of(WINDOW))
.aggregate(
() -> 0,
(key, value, aggregate) -> aggregate + 1,
Materialized.with(Serdes.String(), Serdes.Integer())
).toStream((wk, v) -> wk.key())
.mapValues(Count::new)
.through("red.nearest.airport.count", Produced.with(Serdes.String(), countSerde)
);
return bean;
}
SET 'auto.offset.reset' = 'earliest';
create stream red with (kafka_topic='red', value_format='avro');
create stream blue with (kafka_topic='blue', value_format='avro');
create stream \
ksql_red_nearest_airport with (PARTITIONS=8) \
as select \
aircraft->transponder transponder, \
closestAirport(location->latitude, location->longitude) as airport, \
location \
from red \
partition by transponder;
create table \
ksql_red_nearest_airport_count with (PARTITIONS=8) \
as select \
airport, \
count(*) as count \
from ksql_red_nearest_airport window tumbling (size 4 hours) \
group by airport;
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [red])
--> KSTREAM-MAP-0000000016, KSTREAM-MAP-0000000002
Processor: KSTREAM-MAP-0000000016 (stores: [])
--> KSTREAM-KEY-SELECT-0000000017
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000017 (stores: [])
--> KSTREAM-FILTER-0000000021
<-- KSTREAM-MAP-0000000016
Processor: KSTREAM-FILTER-0000000021 (stores: [])
--> KSTREAM-SINK-0000000020
<-- KSTREAM-KEY-SELECT-0000000017
Processor: KSTREAM-MAP-0000000002 (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-SOURCE-0000000000
Sink: KSTREAM-SINK-0000000003 (topic: red.nearest.airport)
<-- KSTREAM-MAP-0000000002
Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-KEY-SELECT-0000000017-repartition)
<-- KSTREAM-FILTER-0000000021
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [red])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> KSTREAM-TRANSFORMVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-TRANSFORMVALUES-0000000002 (stores: [])
--> KSTREAM-MAPVALUES-0000000003
<-- KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
--> KSTREAM-FILTER-0000000004
<-- KSTREAM-TRANSFORMVALUES-0000000002
Processor: KSTREAM-FILTER-0000000004 (stores: [])
--> KSTREAM-KEY-SELECT-0000000005
<-- KSTREAM-MAPVALUES-0000000003
Processor: KSTREAM-KEY-SELECT-0000000005 (stores: [])
--> KSTREAM-MAPVALUES-0000000006
<-- KSTREAM-FILTER-0000000004
Processor: KSTREAM-MAPVALUES-0000000006 (stores: [])
--> KSTREAM-MAPVALUES-0000000007
<-- KSTREAM-KEY-SELECT-0000000005
Processor: KSTREAM-MAPVALUES-0000000007 (stores: [])
--> KSTREAM-SINK-0000000008
<-- KSTREAM-MAPVALUES-0000000006
Sink: KSTREAM-SINK-0000000008 (topic: KSQL_RED_NEAREST_AIRPORT)
<-- KSTREAM-MAPVALUES-0000000007
->
select aircraft->transponder, location from red;
select location->latitude, location->longitude from blue;
select * from blue where aircraft->callsign='DAL665 ';
@UdfDescription(name = "closestAirport", description = "return airport code for closest airport.")
public class ClosestAirport {
private static final String HOST = "http://localhost:9080";
private Geolocation geolocation = Feign.builder()
.options(new Request.Options(200, 200))
.encoder(new JacksonEncoder())
.decoder(new JacksonDecoder())
.target(Geolocation.class, HOST);
@Udf(description = "find closest airport to given location.")
public String closestAirport(final Double latitude, final Double longitude) {
return geolocation.closestAirport(latitude, longitude).getCode();
}
}
create stream blueBucket \
as select \
bucketLocation(location->latitude, location->longitude, 3.0) as bucket, \
aircraft->transponder as transponder, aircraft->callsign as callsign, \
location->latitude as latitude, location->longitude as longitude \
from blue partition by bucket;
create stream blueBucket_w \
as select \
bucketLocation(location->latitude, location->longitude, 3.0, 'w') as bucket, \
aircraft->transponder as transponder, aircraft->callsign as callsign, \
location->latitude as latitude, location->longitude as longitude \
from blue partition by bucket;
insert into blueBucket select * from blueBucket_w;
create stream redBlueJoin with (PARTITIONS=8) \
as select \
redBucket.transponder rt, \
redBucket.latitude as rlat, \
redBucket.longitude as rlong, \
blueBucket.transponder as bt, \
blueBucket.latitude as blat, \
blueBucket.longitude as blong \
from redBucket \
inner join blueBucket within 4 hours \
on redBucket.bucket = blueBucket.bucket;
SET 'auto.offset.reset' = 'earliest';
Could not find
class io.confluent.ksql.avro_schemas.KsqlDataSourceSchema