-
Notifications
You must be signed in to change notification settings - Fork 707
Using the distributed cache
The distributed cache is simply hadoop's method for allowing each node local access to a specific file. In the example, I am mapping ip addresses to geographical locations (country, city, etc.). The heavy lifting is done my Maxmind's geoip LookupService. LookupService requires random access to a local file, GeoLiteCity.dat, which defines a mapping from ip ranges to a location object.
The DistributedCacheFile object provides a simple API for dealing with registering files to be copied into the file cache at configuration time, and accessing them later during map/reduce time.
Firstly, we need to place the file in question on the hadoop file system. I used s3distcp to put a copy of GeoLiteCity.dat to hdfs://cache/GeoLiteCity.dat. If your copy is not on s3, you can use the normal distcp command.
So now GeoLiteCity.dat is distributed across the data nodes. So let now go to the scalding code.
import com.twitter.scalding.filecache.DistributedCacheFile
val geoIPDataFile = DistributedCacheFile("hdfs://path/to/maxmind.dat")
This handles registering the file in the DistributedCache for us at configuration time, and will work in both local and hdfs modes (in local mode, it simply uses the local path and creates no symlinks).
Next, we need to be very careful about how we instantiate LookupService. Lookup service is not serializable, so we cannot be holding a reference when the job get serialized to the nodes. Also we don't want to be initializing it more than once per task attempt.
@transient private lazy val lookupService =
new LookupService(geoIPDataFile.path, LookupService.GEOIP_MEMORY_CACHE)
By making lookupService transient, we avoid serialization, but it means that it will be null when received by the nodes. Making it lazy ensures that the nodes will initalize it when needed. We use the auto-generated symlink path created by DistributedCacheFile. This does not require the jobConf as it will not be availible, when lookupService is lazily evaluated.
Lastly, we write a function to be used by our mappers to do the lookup. This is included for completeness:
def getLocationInfo(ip:String) : (String, String, String, String, String, Int) = {
val unknown = ("", "", "", "", "", 0)
if (ip == "" || ip == null)
return unknown
val location = lookupService.getLocation(ip)
if (location == null)
return unknown
(location.countryName,
location.countryCode,
location.region,
location.city,
location.postalCode,
location.metro_code)
}
- Scaladocs
- Getting Started
- Type-safe API Reference
- SQL to Scalding
- Building Bigger Platforms With Scalding
- Scalding Sources
- Scalding-Commons
- Rosetta Code
- Fields-based API Reference (deprecated)
- Scalding: Powerful & Concise MapReduce Programming
- Scalding lecture for UC Berkeley's Analyzing Big Data with Twitter class
- Scalding REPL with Eclipse Scala Worksheets
- Scalding with CDH3U2 in a Maven project
- Running your Scalding jobs in Eclipse
- Running your Scalding jobs in IDEA intellij
- Running Scalding jobs on EMR
- Running Scalding with HBase support: Scalding HBase wiki
- Using the distributed cache
- Unit Testing Scalding Jobs
- TDD for Scalding
- Using counters
- Scalding for the impatient
- Movie Recommendations and more in MapReduce and Scalding
- Generating Recommendations with MapReduce and Scalding
- Poker collusion detection with Mahout and Scalding
- Portfolio Management in Scalding
- Find the Fastest Growing County in US, 1969-2011, using Scalding
- Mod-4 matrix arithmetic with Scalding and Algebird
- Dean Wampler's Scalding Workshop
- Typesafe's Activator for Scalding