Импорт больших dbf файлов в Ms SQL server 2008 с помощью SqlBulkCopy
В этой статье я расскажу как загрузить много огромных dbf файлов, состоящих из миллионов записей в вашу базу данных на ms sql сервере за приемлемое время.Задача на первый взгляд тривиальна. Можно использовать мастер в sql management studio или функцию OPENROWSET через запрос.Но первый вариант после нескольких попыток отпал из за разных глюков и необходимости загрузки множества файлов в одну таблицу (около 100 файлов). К то муже при продолжительной загрузке вылетала ошибка.
Второй вариант тоже не подошел из за различной разрядности драйверов и разрядности сервера.
Так как файл просто огромный, то было решено читать его через поток и записывать в базу. Далее после прочтения строки в файле надо эту строку записать в табличку. Первое что пришло на ум это использовать insert, но запись в этом случае заняла бы слишком много время.
И тут я вспомнил про другой механизм записи через SqlBulkCopy, который позволяет заливать огромное число записей без запросов insert.На деле это использование класса SqlBulkCopy, для осуществления записи через который надо реализовать один лишь интерфейс IDataReader.
Итак начнем с реализации интерфейса
public class BDFBulkReader: IDataReader Начнем с функции, которая возвращает значение текущей записи:
public object GetValue (int i) { return R[FieldIndex[i]]; }
Обращу ваше внимание на то что поля в файле и поля в таблице могут быть в разном порядке. А по индексу хотелось бы получать значение для соответствующего поля таблицы. Поэтому я использовал дополнительно словарь FieldIndex, где сопоставление имен полей номеру в таблице sql. По номеру берется имя поля, по имени из словаря R берется значение из прочитанной строки dbf файла. В итоге для n го индекса в бд GetValue вернет соответствующее значение.
Dictionary
Итак, конструктор:
System.IO.FileStream FS;
byte[] buffer;
int _FieldCount;
int FieldsLength;
System.Globalization.DateTimeFormatInfo dfi = new System.Globalization.CultureInfo («en-US», false).DateTimeFormat;
System.Globalization.NumberFormatInfo nfi = new System.Globalization.CultureInfo («en-US», false).NumberFormat;
string[] FieldName;
string[] FieldType;
byte[] FieldSize;
byte[] FieldDigs;
int RowsCount;
int ReadedRow = 0;
Dictionary
public BDFBulkReader (string FileName, Dictionary
this.FieldIndex = FieldIndex; } Его задачи это открыть файл, определить имена полей, их число и их типы. Второй параметр конструктора, как я писал выше, это словарь соответствий, чтобы, например, по первому номеру поля мы гарантированно получили нужное поле из файла.
Теперь перейдем к реализации bool Read (). Она вернет true в случае если строка успешно прочитана. И false в случае если строка не была прочитана и в то же время был достигнут конец данных.
public bool Read () { if (ReadedRow >= RowsCount) return false;
R.Clear (); buffer = new byte[FieldsLength]; FS.ReadByte (); FS.Read (buffer, 0, buffer.Length); int Index = 0; for (int i = 0; i < FieldCount; i++) { string l = System.Text.Encoding.GetEncoding(866).GetString(buffer, Index, FieldSize[i]).TrimEnd(new char[] { (char)0x00 }).TrimEnd(new char[] { (char)0x20 }); Index = Index + FieldSize[i]; object Tr; if (l.Trim() != "") { switch (FieldType[i]) { case "L": Tr = l == "T" ? true : false; break; case "D": Tr = DateTime.ParseExact(l, "yyyyMMdd", dfi); break; case "N": { if (FieldDigs[i] == 0) Tr = int.Parse(l, nfi); else Tr = decimal.Parse(l, nfi); break; } case "F": Tr = double.Parse(l, nfi); break; default: Tr = l; break; }
} else { Tr = DBNull.Value; } R.Add (FieldName[i], Tr); } ReadedRow++; return true; } Еще раз напомню, что после ее вызова прочитанная строка запишется в словарь R, для последующего чтения reader’ом.Итак, осталось реализовать, метод возвращающий число полей:
public int FieldCount { get { return _FieldCount; } } И заглушки для интерфейса:
public void Dispose () { FS.Close (); } public int Depth { get { return -1; } } public bool IsClosed { get { return false; } } public Object this[int i] { get { return new object (); } } public Object this[string name] { get { return new object (); } } public int RecordsAffected { get { return -1; } }
public void Close () { } public bool NextResult () { return true; } public bool IsDBNull (int i) { return false; } public string GetString (int i) { return »; } public DataTable GetSchemaTable () { return null; } public int GetOrdinal (string name) { return -1; } public string GetName (int i) { return »; } public long GetInt64(int i) { return -1; } public int GetInt32(int i) { return -1; } public short GetInt16(int i) { return -1; } public Guid GetGuid (int i) { return new Guid (); } public float GetFloat (int i) { return -1; } public Type GetFieldType (int i) { return typeof (string); } public double GetDouble (int i) { return -1; } public decimal GetDecimal (int i) { return -1; } public DateTime GetDateTime (int i) { return new DateTime (); } public string GetDataTypeName (int i) { return »; } public IDataReader GetData (int i) { return this; } public long GetChars (int i, long fieldoffset, char[] buffer, int bufferoffset, int length) { return -1; } public char GetChar (int i) { return ' '; } public long GetBytes (int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) { return -1; } public byte GetByte (int i) { return 0×00; } public bool GetBoolean (int i) { return false; } public int GetValues (Object[] values) { return -1; } Где в Dispose () я просто закрываю файл.
После того как интерфейс реализован, можно написать метод для загрузки файла:
void SaveToTable (FileInfo dir, string TableName, string connestionString, Dictionary
Dictionary
Полезные ссылки: Описание SqlBulkCopyВнутренности dbf