Dr. Dobb's is part of the Informa Tech Division of Informa PLC

This site is operated by a business or businesses owned by Informa PLC and all copyright resides with them. Informa PLC's registered office is 5 Howick Place, London SW1P 1WG. Registered in England and Wales. Number 8860726.


Channels ▼
RSS

Building Grid-Enabled Data-Mining Applications


December, 2005: Building Grid-Enabled Data-Mining Applications

Alex Depoutovitch is the technical team lead and Alex Wainstein is director of technology and production at Generation5 Mathematical Technology. They can be contacted at [email protected] and [email protected], respectively.


Data mining is the process of extracting information from large volumes of data. By its very nature, it is a computationally intensive exercise. Because data volumes are doubling annually (according to a Winter Corporation survey, http://www.wintercorp.com/WhitePapers/ TTP2003_WP.pdf), it can be difficult to achieve even small performance improvements simply by tweaking known data-mining algorithms. Of course, sampling the input data can help, but you pay the price of reduced accuracy, which is unacceptable for many tasks. Increasing the power of hardware doesn't offer much help; CPU clock frequencies and hard-drive data-transfer rates both have fixed upper boundaries, and no matter how much money you spend, you cannot overcome them.

While you cannot improve the speed of a single CPU, you can increase their number. This is why multithreaded processor cores, multicore chips, and multiprocessor servers are becoming common. Although this leads to better performance, the total number of CPUs on a single server is still limited. Although today's dual-CPU server price is reasonable, the cost of the 32-CPU servers can exceed the price of a private jet airplane. Still, there are network-based alternatives which hold promise for high-end data mining, foremost among them "computing grids" that enable cooperative processing of single or multiple tasks by different computers.

The term "computing grid" designates a group of connected computers that combine resources to work together, appearing to users as a single virtual computer. The main difference between "grids" and "clusters" is that computing clusters usually contain only homogenous computers, whereas computers in a grid may have totally different hardware and software configurations.

There are two ways you can use computing grids:

  • To evenly balance incoming requests among individual computers so that each request is served by one computer.
  • To speed up the processing time of incoming requests by executing each of them in parallel on several computers in a grid.

Computing grids have centralized or decentralized scheduling. With centralized scheduling, a dedicated computer distributes requests and collects results. Decentralized scheduling, on the other hand, involves a cooperative process among computers in a grid. Decentralized scheduling provides better availability and fault tolerance, but produces more network traffic, is less flexible, and more difficult to implement.

G5 MWM Grid from Generation5 is a data analytics application that lets you build and manage computing grids specifically for data-mining tasks. G5 MWM Grid achieves considerable performance improvements by executing various data-mining tasks on a computing grid in multiuser environment.

G5 MWM Grid, which uses centralized scheduling, consists of the components in Figure 1. The Central node is responsible for the scheduling and coordination of processes running on the Calculation nodes. There is only one Central node in a grid. Users prepare "jobs" and communicate with the Central node through grid client software (Figure 2). The client software lets users monitor and control job execution progress and examine and change grid configurations. User requests are sent to the Central node, which divides them into chunks that can be executed in parallel, sending these chunks for execution to all available Calculation nodes. Calculation nodes are responsible for the execution of tasks received from Central node. They report back to the Central node about execution progress and completion. Exchanging large chunks of data among the Central node, Calculation nodes, and client software requires shared data storage, such as a database server or shared filesystem.

Parallelization of Algorithms

In general, designing parallel algorithms is complex and not always possible. That said, data mining is one area that often lends itself to parallelization.

The most important measure of how well a given algorithm can be parallelized is "scalability" or the dependency between performance of a grid and the number of nodes in it. While the ideal case is a straight line, in real life there is a threshold value after which adding more nodes to the grid does not improve performance (Figure 3). This threshold value differs greatly, depending on the specific algorithm.

When designing parallel algorithms, you often encounter situations where part of the calculations can be done only sequentially, and to proceed further, you need the results of the calculation. In this case, you have two options: execute this critical part on one node and make other nodes wait for its completion, or redo this part on each node. If the nonparallelizable part is small, it is a good idea to repeat it on each node to achieve better scalability.

Another question that comes up when designing parallel algorithms is how to provide effective concurrent access to shared data. If any node may update any part of the data at any time, data access must be synchronized and the scalability of algorithms is significantly affected. Even if all nodes only read data, the data-reading device becomes a bottleneck sooner or later. However, there are techniques that address this problem. If you do not update the data and the time required to copy the data from the shared storage is not significant, you can create a temporary local copy of the shared data on each node on which chunks are processed. In this case, data access speed is no longer a limiting factor for scalability. Even when some data must be updated by several nodes, it often is possible to do it without synchronizing access. For example, consider a situation when you need to calculate the average value of some array using a grid of n calculation nodes. You can divide this array into n equal chunks. Each node will be given its chunk and calculate the sum of values in this chunk and report back to the Central node. After all nodes are finished, the Central node sums up all received values and divides by the array size. This way, you avoid any data that must be shared between nodes.

With this in mind, requirements for parallel algorithms include:

  • Avoid parallel chunks that depend upon the results of other chunks.
  • Shared data access must be minimized (especially write access).
  • Message passing between the Central node and Calculation nodes must be minimized.

Parallelization of Data-Mining Algorithms

In data mining, "prediction" is the task of producing unknown data values based on some previously observed data. Input data for prediction is divided into two parts—one containing training data and the second containing target data that you need to predict. Both parts have the same columns (variables). Variables are divided into two groups (Figure 4): independent and dependent. Independent variable values are provided in both training and target parts, while dependent variables are known only in the training part. The goal is to predict dependent variables in target data. There are many prediction methods available, including:

  • Decision trees in which you process training data to receive a set of rules such as: "if age more than 30 and income more than 90K and number of children less than 2, then most probable car of choice would be Mercedes". Using these rules data is predicted.
  • Linear regression, which is based on training data you build and linear formulas to calculate dependent variables in target data using independent variables.
  • Nearest neighbor method. This method, by maintaining a dataset of known records, finds records (neighbors) similar to a target record and uses the neighbors for classification and prediction. This method is memory-based and requires no model to be fit.

The first two methods require lots of preparation time and cannot be parallelized without making big changes in their algorithms. However, the third method is attractive because you can do minimal preprocessing and predict each record independent of others, so no modification is needed. With real-world problems, the number of records varies from thousands to hundred of millions. This method is particularly well suited for scalability.

The algorithm is described like this:

  1. Divide the target data into the desired number of nonoverlapping chunks. The number of these chunks must be great enough to take advantage of all the computational nodes available. On the other side, the chunks should be big enough so that the scheduling and preprocessing overheads are not significant compared to single data chunk processing time.
  2. Copy training data to each Calculation node.
  3. Process training data on each node.
  4. Assign a chunk of target data to each Calculation node and perform predictions against the assigned chunk.

Feature selection algorithms are used to determine the minimal subset of variables upon which the variables of interest depend. Currently, we do not know any methods that can effectively parallelize feature selection for a single dependent variable. But often it is the case that you have several dependent variables. In this case, each dependent variable can be processed on separate Calculation nodes, in parallel.

Validation lets you estimate how good the prediction is. To do this, you only need training data. You pretend that you do not know some part of it and try to predict it based on the rest of the records. Then you estimate the error of prediction. This is usually tried several times, predicting different parts of training data. Although it is possible to parallelize Validation the same way as with Prediction, we found that with typical parameters—several thousands of rows and 5-20 different tries—it is more effective to execute different tries on different nodes. Here, depending on the number of tries, we achieve scalability up to 5-20 Calculation nodes.

Implementation

The main goal of the MWM Grid design was to build a simple, but flexible, framework for grid computations that support a variety of data-mining algorithms. The framework needed to be easily extensible for adding new algorithms without changing core functionality, while reusing as much of the existing libraries and OS mechanisms as possible.

We implemented G5 MWM Grid on Windows using Microsoft .NET for the front end and C++ for the mathematical algorithms and Central and Calculation nodes. The UI consists of: the Project Editor, a wizard-like application for creating data-mining projects to be executed as jobs; the Monitor, for remote monitoring and managing job execution; and the Grid Administration, an application for grid management and configuration. Both the Central node and Calculation nodes operate as Windows services. During startup of the Central node, the program reads the cluster configuration from the disk, and starts and connects the Calculation nodes. All communications are done through a SOAP-compliant protocol over named pipes for easy integration with other products. From the developer's perspective, the Central node consists of five different parts—a User Authentication Module, Job Splitter, Task Scheduler, Communication Library, and Results Summarizer.

Because we've integrated our security model with Windows, the system administrator defines an access control list (ACL) of groups of users with associated permissions—execution/termination of jobs, adding/removal of Calculation nodes from the grid, and so on. The system will identify the user who attempts to connect to the Central node, identify the groups it belongs to and determine privileges. Windows-integrated security eliminates the development issues of securely storing passwords; it also eliminates the sysadmin hassle of using yet another set of tools and maintaining yet another list of users; and the users are happy they don't need to remember and enter different passwords for their Windows and grid accounts.

The Job Splitter module is responsible for splitting jobs into chunks. Each type of job— prediction, feature selection, and so on—has its own implementation. The general principles, according to which it works, were described earlier.

One of the most important parts of the grid is the Task Scheduler. Its purpose is to:

  • Simultaneously handle jobs with different priorities.
  • Support dynamic change of a job priority.
  • Allocate specific resources for any job.
  • Support heterogeneous grid configurations (when the nodes have different processing power).
  • Support dynamic changes in the grid configurations (add/remove nodes)
  • Limit the number of active nodes, reserving the rest for other activities, if required.

The first three items give us flexible job management, such as bumping up priority of an urgent job or ensuring that some job will finish by a fixed deadline. The rest gives the opportunity to use the idle time of the existing computers for the grid purposes and share grid computer resources with other applications.

The elementary resource that the Scheduler operates on is the Processing Unit (PU), which represents a virtual processor on the Calculation node. Each node may have one or more PUs. The number of PUs for each node can be bigger or smaller than the physical number of CPUs. For example, if in addition to the grid computations, you want to be able to execute some other programs on a multiCPU Calculation node, you specify a PU number that is less than the physical number of processors. When you have processors with hyperthreading or you are going to execute I/O-intensive jobs, then you can specify more PUs than physical CPUs available.

The elementary task that the Scheduler assigns to resources is a job chunk. The Scheduler has six different priorities for jobs. For each priority, there is a separate queue of jobs. When a job is assigned priority, it is placed into a corresponding queue. As soon as there is a free PU, the scheduler checks job queues starting with queue with highest priority. A chunk from the first job found is sent for execution to the free PU and the job is placed to the end of the queue (Figure 6).

In addition to priority, a job may have the maximum number of PUs available for a particular job, which means that the job will have no more than the specified number of chunks executing in parallel. In this case, if you want some job to finish by a fixed time, you make its priority higher than other jobs and limit the number of PUs available for it. This way, a fixed set of resources is secured for the job.

All communication between the client software and the Central node, as well as between the Central node and Calculation nodes, are done using SOAP. The underlying transport protocol is the Windows implementation of named pipes. It is a message-oriented protocol with guaranteed delivery and integrated authentication. Our original implementation was one named pipe using the "pull" approach. Thus, when the client wants to refresh its information, it queries the server about its current state (from the point of view of the Calculation node, the Central node is the client because it sends commands). But this approach turned out to be inefficient and not too scalable as the number of Calculation nodes increases: The delays with response were dozens of seconds when the number of nodes exceeded 10. Whenever the user needs to see a complete picture of the cluster state, it issues a request to the Central node; and the Central node sends requests to all Calculation nodes, waits for answers, and then sends a summary back to client.

Because of this, we implemented a new "push" approach so that whenever the state of a Calculation or Central node changes, it fires off a notification for interested parties. This proved to be effective and produced no noticeable delays, but required the use of two pipes—one for commands and responses, and one for asynchronous events. These events include completion of a chunk or job, cluster configuration change, and so on.

This architecture resembles DCOM, but it has the advantages of being an open, high-level protocol that is easily extensible and does not require complex configuration. For convenience, the SOAP protocol implementation can be hidden behind COM or C++ classes.

Results Summarizer

The Results Summarizer implementation depends on the data-mining task. In any case, it is crucial to provide shared storage (such as a database server or shared disk). For example, with prediction, the Summarizer uses a database server hosting the target data to update it from different Calculation nodes. With feature selection, the results for each chunk are stored in the filesystem at a shared location, and after all chunks are done, the results are consolidated into the final report. Because it is not known in advance on which node the job will execute, each should have access to the shared storage.

Because there's always a risk that one computer in the grid will fail, the G5 MWM Grid provides a way to recover from this situation. In case of a node failure, it removes the defective node from the grid and transfers the task to another node to redo, whenever possible.

The new nodes can be dynamically added to and removed from the grid. Adding is straightforward: After registration through the UI, it immediately becomes available to the Scheduler. The removal is more complex. If the node is idling, it can be removed immediately upon request. However, when some of its PUs are occupied by the task, no more tasks will be assigned to it and it will be removed upon completion of all tasks currently being executed.

Performance

To evaluate performance and scalability of G5 MWM Grid, we executed several prediction projects on a grid with a number of Calculation nodes varying from 1 to 10. We used two different sets of data (Model 1 and Model 2) that differ by size of training and target datasets, and the number of variables. All parameters are listed in Table 1. The speed of execution is measured in predicted values per minute. We calculated it with a different number of Calculation nodes in the grid. Computers that we used were single-CPU 1-GHz Pentium III's workstations running Windows 2000 Professional. The Central node and the database shared one Pentium 4 1.8 GHz. The computers were on 1-GB network.

The results are summarized in Figure 7 and demonstrate the nearly linear scalability. It does not show any sign of saturation. Our measurements showed that the average CPU load on the Central node was around 30 percent and the network utilization under 5 percent, meaning the grid size can still be increased several times, even with hardware that can be found in any office.

Model 3 (see Figure 7) was used for the high-end benchmark. The grid was implemented with six Calculation nodes, each equipped with dual Intel Xeon 3.0-GHz CPU on the 1-GB network. The database server was a 2.4-GHz quad-CPU server. In this configuration the grid was able to predict 15 million values in 5 hours, at the rate of around 50,000 predicted values per minute. Without grid technology, a similar performance can be achieved by using a million dollar's worth of mainframes. The G5 MWM Grid did that on less than $100,000 worth of hardware.

Conclusion

Computing grids bring many advantages to data mining. This is because data-mining applications are particularly resource hungry; the amount of data available (and so the computational power required to process it) continues to grow exponentially. In an attempt to improve accuracy, the algorithms become more complex and require even more computational power. And grid computing provides the computational power required. Because many data-mining tasks can be effectively parallelized, the grid is their natural execution environment.

DDJ


Related Reading


More Insights






Currently we allow the following HTML tags in comments:

Single tags

These tags can be used alone and don't need an ending tag.

<br> Defines a single line break

<hr> Defines a horizontal line

Matching tags

These require an ending tag - e.g. <i>italic text</i>

<a> Defines an anchor

<b> Defines bold text

<big> Defines big text

<blockquote> Defines a long quotation

<caption> Defines a table caption

<cite> Defines a citation

<code> Defines computer code text

<em> Defines emphasized text

<fieldset> Defines a border around elements in a form

<h1> This is heading 1

<h2> This is heading 2

<h3> This is heading 3

<h4> This is heading 4

<h5> This is heading 5

<h6> This is heading 6

<i> Defines italic text

<p> Defines a paragraph

<pre> Defines preformatted text

<q> Defines a short quotation

<samp> Defines sample computer code text

<small> Defines small text

<span> Defines a section in a document

<s> Defines strikethrough text

<strike> Defines strikethrough text

<strong> Defines strong text

<sub> Defines subscripted text

<sup> Defines superscripted text

<u> Defines underlined text

Dr. Dobb's encourages readers to engage in spirited, healthy debate, including taking us to task. However, Dr. Dobb's moderates all comments posted to our site, and reserves the right to modify or remove any content that it determines to be derogatory, offensive, inflammatory, vulgar, irrelevant/off-topic, racist or obvious marketing or spam. Dr. Dobb's further reserves the right to disable the profile of any commenter participating in said activities.

 
Disqus Tips To upload an avatar photo, first complete your Disqus profile. | View the list of supported HTML tags you can use to style comments. | Please read our commenting policy.