Real-time customer profiles¶
This example uses Quick to create real-time customer profiles for a music streaming service. These profiles will include user metrics, charts of the most-streamed albums, artists and tracks, and recommendations based on the user's playlist.
What this will demonstrate¶
- the use of topics, of course
- analytics on an incoming stream
- integration of a recommendation service
- a global GraphQL schema forming the customer profile
Visit the demo website to see the example up and running. This visualizes the real-time profiles in a front-end.
The code can be found in Quick's example repository. The example uses the real world data set LFM-1b. The Kafka Streams application is written based on our open source streams-bootstrap library.
Finally, there is a video explaining this example:
Input: Listening events¶
Every time a customer listens to a song, the system emits a listening event containing the ids of album, artist and track. The system additionally attaches metadata such as the timestamp to the event. Later, a Kafka Streams application processes it for the customer profile creation.
{"userId": 402, "artistId": 7, "albumId": 17147, "trackId": 44975, "timestamp": 1568052379}
{"userId": 703, "artistId": 64, "albumId": 17148, "trackId": 44982, "timestamp": 1568052379}
{"userId": 4234, "artistId": 3744, "albumId": 34424, "trackId": 105501, "timestamp": 1568052382}
{"userId": 2843, "artistId": 71, "albumId": 315, "trackId": 2425, "timestamp": 1568052383}
{"userId": 1335, "artistId": 13866, "albumId": 29007, "trackId": 83201, "timestamp": 1568052385}
Quick configuration¶
Global GraphQL schema¶
First, define the global schema with GraphQL.
The query called getUserProfile
combines six metrics for the customer profile:
- total listening events
- the first and last time a user listened to a song
- charts with user's most listened albums, artists and tracks
Quick retrieves all that data from different topics via the @topic
directive.
Still, the charts contain solely ids and not the names of the corresponding music data.
You can let Quick resolve those ids transparently.
For that, use the topics (artists, albums, tracks) containing the mapping from ids to names and reference them in the GraphQL schema.
The creation of the metrics topics (counts, firstlisten, lastlisten) is described below.
The GraphQL user profile schema (schema-user-profile.gql
)
type Query {
getUserProfile(userId: Long!): UserProfile
}
type UserProfile {
totalListenCount: Long! @topic(name: "counts", keyArgument: "userId")
firstListenEvent: Long! @topic(name: "firstlisten", keyArgument: "userId")
lastListenEvent: Long! @topic(name: "lastlisten", keyArgument: "userId")
artistCharts: NamedArtistCharts! @topic(name: "topartists", keyArgument: "userId")
albumCharts: NamedAlbumCharts! @topic(name: "topalbums", keyArgument: "userId")
trackCharts: NamedTrackCharts! @topic(name: "toptracks", keyArgument: "userId")
}
type NamedArtistCharts {
topK: [NamedArtistCount!]!
}
type NamedAlbumCharts {
topK: [NamedAlbumCount!]!
}
type NamedTrackCharts {
topK: [NamedTrackCount!]!
}
type Item {
id: Long!
name: String!
}
type NamedArtistCount {
id: Long!
artist: Item! @topic(name: "artists", keyField: "id")
countPlays: Long!
}
type NamedAlbumCount {
id: Long!
album: Item! @topic(name: "albums", keyField: "id")
countPlays: Long!
}
type NamedTrackCount {
id: Long!
track: Item! @topic(name: "tracks", keyField: "id")
countPlays: Long!
}
...
This is all you need to do to integrate data with Quick.
For the Quick setup, please refer to the getting started guide. To avoid redundancy, we only show the setup for integral parts here. You find all steps in the justfile.
Gateway¶
Create a new gateway and apply the GraphQL schema.
quick gateway create profiles
quick gateway apply profiles -f schema-user-profile.gql
Topics¶
Then, create the input topics for the artist, album and track data.
quick topic create albums --key long --value schema -s profiles.Item
quick topic create artists --key long --value schema -s profiles.Item
quick topic create tracks --key long --value schema -s profiles.Item
quick topic create listeningevents --key long --value schema \
--schema profiles.ListeningEvent
The command expects the topic name and the type or schema of key and value.
Since the topics contain complex values, they are referenced via <gateway name>.<type name>
.
This uses the definition in the global GraphQL schema you previously applied to the gateway.
Analytics¶
With gateway and input topics in place, you can now take care of the analytics. Kafka Streams apps will process the data and compute the respective parts of the profiles.
Metrics¶
The user profile has the following metrics:
- first listening event
- last listening event
-
total number of listening events
-
Create topics that later store the corresponding data:
quick topic create firstlisten --key long --value long quick topic create lastlisten --key long --value long quick topic create counts --key long --value long
-
Deploy the applications:
quick app deploy firstlisten \ --registry bakdata \ --image quick-demo-profile-listenings-activity \ --tag "1.0.0" \ --args input-topics=listeningevents, output-topic=firstlisten, kind=FIRST, productive=false
Quick supports running dockerized applications.
You can deploy those applications with the command quick app deploy [...]
.
For details, call quick app deploy -h
or see the reference.
The bakdata image registry can be found here.
Charts¶
Similar to the metrics, you can add support for a user's charts in the profile.
-
Create the topics:
quick topic create topartists --key long \ --value schema -s profiles.Charts quick topic create topalbums --key long \ --value schema -s profiles.Charts quick topic create toptracks --key long \ --value schema -s profiles.Charts
-
Deploy the applications:
quick app deploy topartists \ --registry bakdata \ --image quick-demo-profile-listenings-charts \ --tag "1.0.0" \ --args input-topics=listeningevents output-topic=topartists productive=false
Recommendations¶
Finally, the recommendations are integrated into the profiles.
Therefore, add the getArtistRecommendations
query, which is backed by an external service, to the schema.
Schema extension with getArtistRecommendations
type Query {
getArtistRecommendations(
userId: Long!,
field: FieldType! = ARTIST,
limit: Int,
walks: Int,
walkLength: Int,
resetProbability: Float
): Recommendations @rest(
url: "http://recommender/recommendation",
pathParameter: ["userId", "field"],
queryParameter: ["limit", "walks", "walkLength", "resetProbability"]
)
}
enum FieldType {
ARTIST
ALBUM
TRACK
}
type Recommendations {
recommendations: [Recommendation!]!
}
type Recommendation {
id: Long!
artist: Item @topic(name: "artists", keyField: "id")
}
getArtistRecommendations
takes several parameters:
userId
is mandatory: It tells the service for which user it should compute the recommendations.field
defines which type of recommendation to create:ARTIST
,ALBUM
orTRACK
is possible. The schema sets it toARTIST
by default.- The remaining parameters come from the underlying recommendation algorithm SALSA and have sensible default values.
To leverage the external service, you can use the @rest
directive.
This directive integrates a REST services into your global schema.
In this example, the recommendation service returns the result for a particular user id as a list of ids.
Quick resolves these ids with the names from the artistis
topic in the type recommendation
.
You can deploy the recommendation service via Quick as well:
quick app deploy recommender \
--registry bakdata \
--image quick-demo-profile-recommender \
--tag "1.0.0" \
--port 8080 \
--args input-topics=listeningevents productive=false
Finally, everything is in place to query the artist recommendation.
query{
getArtistRecommendations(userId: 32226961){
recommendations{
id
name
}
}
}
{
"data": {
"getArtistRecommendations": {
"recommendations": [
{
"id": 2041,
"name": "Leevi and the Leavings"
},
{
"id": 1825,
"name": "Neil Young"
},
{
"id": 1871,
"name": "Mogwai"
},
{
"id": 2353,
"name": "The National"
},
...
]
}
}
}